Run an agent

Threads, messages, and runs work together to run your agent. A thread holds the conversation context, with messages capturing your input and the agent's responses as it calls tools. A run starts the agent on a thread.

Create a thread

Endpoint: POST /v1/threads Create thread

from seekrai import SeekrFlow

client = SeekrFlow()

thread = client.agents.threads.create()
print("Thread created: ", thread.id)

Create a message

Endpoint: POST /v1/threads/{thread_id}/messages Create message

message = client.agents.threads.create_message(
    thread_id=thread.id,
    role="user",
    content="Explain the concept of derivatives for my calculus class"
)
print(f"Message created! ID: {message.id}, Content: {message.content}")

Start a run

Once your thread has a message, start a run against it. SeekrFlow supports synchronous and streaming runs.

Run synchronously

In synchronous mode, the agent processes the request and returns the complete response when generation is finished.

Endpoint: POST /v1/threads/{thread_id}/runs Run agent

import time

agent_id = "your_agent_id"
thread_id = "your_thread_id"

run = client.agents.runs.run(
    agent_id=agent_id,
    thread_id=thread_id,
    stream=False
)
run_id = run.run_id
print(f"✓ Run started → {run_id}")

while True:
    run = client.agents.runs.retrieve(run_id, thread_id=thread_id)
    if run.status in ['completed']:
        break
    time.sleep(1)

final_message = client.agents.threads.list_messages(thread_id)[0]
print(final_message.content)

Run with streaming

In streaming mode, the agent's response is sent back incrementally as it's generated, allowing your application to display results in real time.

Endpoint: POST /v1/threads/{thread_id}/runs/stream Run agent with streaming

agent_id = "your_agent_id"
thread_id = "your_thread_id"

run = client.agents.runs.run(
    agent_id=agent_id,
    thread_id=thread_id,
    stream=True
)
run_id = run.run_id
print(f"✓ Run started → {run_id}")

for _ in client.agents.runs.attach(run_id, thread_id):
    pass

messages = client.agents.threads.list_messages(thread_id)
for i, msg in enumerate(messages):
    if msg.id == message.id and i > 0:
        assistant_msg = messages[i - 1]
        print("\n=== Agent Response ==================================================")
        print(assistant_msg.content)
        print("====================================================================")
        break

Retrieve messages

Retrieve a message

Endpoint: GET /v1/threads/{thread_id}/messages/{message_id} Get message

message = client.agents.threads.retrieve_message(
    thread_id=thread.id,
    message_id=message.id
)
print("Message retrieved!")
print(f"Thread ID: {message.thread_id}, Message ID: {message.id}")

Retrieve all messages from a thread

Endpoint: GET /v1/threads/{thread_id}/messages List messages

thread_id = "your_thread_id"

messages = client.agents.threads.list_messages(thread_id=thread_id)
print(f"Messages in thread {thread_id}:")
for message in messages:
    print(f"Message ID: {message.id}")
    print(f"Role: {message.role}")
    print(f"Content: {message.content}")
    print("-" * 10)

Manage threads

List threads

Endpoint: GET /v1/threads List threads

threads = client.agents.threads.list(limit=20, order="desc")
print("Available threads:")

for thread in threads:
    print(f"ID: {thread.id}, Status: {thread.status}")
    if thread.agent_id is not None:
        print(f"Agent ID: {thread.agent_id}")

Retrieve a thread

Endpoint: GET /v1/threads/{thread_id} Get thread

thread = client.agents.threads.retrieve(thread_id=thread.id)
print(f"Thread retrieved: ID={thread.id}, Status={thread.status}")

Delete a thread

Endpoint: DELETE /v1/threads/{thread_id} Delete thread

deleted_status = client.agents.threads.delete(thread_id=thread.id)
print(f"Thread {thread.id} deleted: {deleted_status}")

Manage messages

Update a message

Endpoint: PATCH /v1/threads/{thread_id}/messages/{message_id} Update message

thread_id = "your_thread_id"
message_id = "your_message_id"

updated_message = client.agents.threads.update_message(
    thread_id=thread_id,
    message_id=message_id,
    content="What can you help me with?"
)
print(f"Message updated in thread {thread_id}!")
print(f"Message ID: {updated_message.id}")
print(f"New Content: {updated_message.content}")

Delete a message

Endpoint: DELETE /v1/threads/{thread_id}/messages/{message_id} Delete message

thread_id = "your_thread_id"
message_id = "your_message_id"

deleted_status = client.agents.threads.delete_message(
    thread_id=thread_id,
    message_id=message_id
)
print(f"Message {message_id} deleted from thread {thread_id}!")

Track file search sources

When an agent uses file search, each retrieved chunk's source tracking fields are appended to the assistant message content:

FieldDescription
chunk_idUnique chunk ID — use this to call the chunk endpoint for full Markdown provenance
pageSource document page number (null for native Markdown or JSON)
linesLine range in the ingested Markdown (e.g. 42-67)
sectionHeading hierarchy path from document root to the chunk

Use chunk_id with GET /v1/flow/vectordb/{database_id}/chunk/{chunk_id} to retrieve the file_id and download the original source document.

Understand data structures

Thread

class ThreadStatus(str, Enum):
    AVAILABLE = "available"
    LOCKED = "locked"

class Thread(BaseModel):
    id: str
    object: str
    created_at: datetime
    status: ThreadStatus
    active_run_id: Optional[str]
    meta_data: dict[str, Any]

Message

class ThreadMessage(BaseModel):
    id: str
    object: str = 'thread.message'
    created_at: datetime
    thread_id: str
    role: str  # e.g., 'user', 'assistant'
    content: ThreadMessageContentType
    agent_id: Optional[str]
    run_id: Optional[str]
    meta_data: dict[str, Any]