Create a Knowledge Retrieval Agent

Set up an end-to-end system for intelligent document retrieval and conversation by indexing your files, generating embeddings, and configuring an agent to deliver context-aware responses.

Introduction

Welcome! This guide will get you up and running with SeekrFlow's Agents SDK. It covers how to:

  • Create and manage a vector database
  • Leverage an embeddings model
  • Define a SeekrAgent
  • Attach a FileSearch tool to the agent
  • Proactively search through your documents and accurately answer questions about them

If you're interested in a more general overview of agents and their capabilities, find it here.



🏁

To begin, install the latest SeekrAI package using pip install seekrai --upgrade. You will also need an API key: See the SeekrFlow Quickstart guide for more information on how to get one.


Async usage note

With a couple of exceptions, all examples in this guide use synchronous code. To use asynchronous functionality:

  1. Import the asyncio module and AsyncSeekrFlow class
  2. Initialize the client with AsyncSeekrFlow() instead of SeekrFlow()
  3. Add await before any client method call
  4. Ensure your code runs within an async function
  5. Use asyncio.run() to execute your async function from synchronous code

Example:

# Synchronous (as shown in examples)
from seekrai import SeekrFlow

client = SeekrFlow()
result = client.vector_database.create_ingestion_job(database_id, files, method)

# Asynchronous equivalent
import asyncio
from seekrai import AsyncSeekrFlow

async def main():
    client = AsyncSeekrFlow()
    result = await client.vector_database.create_ingestion_job(database_id, files, method)
    return result

# Execute the async function
result = asyncio.run(main())

Create a FileSearch index

Seekr's Vector Database SDK provides advanced semantic search capabilities by transforming text into vector embeddings, making it possible to perform semantic searches that focus on meaning and context. This approach provides a smarter and more intuitive way to retrieve documents compared to traditional keyword-based methods.

Start the agent creation process by creating a new vector database, specifying the embedding model and dimensions:

Supported embedding models

Model NameDimensionsBest Used For
intfloat/e5-mistral-7b-instruct4096This model has some multilingual capability. However, since it was mainly trained on English data, we recommend using this model for English only.

Create an empty vector database

Start by creating a new vector database with specified embedding model.

from seekrai import SeekrFlow

# Initialize client
client = SeekrFlow(api_key="YOUR KEY HERE") # you can leave this empty if your key is stored as an environment variable

# Create vector database
vector_db = client.vector_database.create(
    model="intfloat/e5-mistral-7b-instruct",
    name="QuickStart_DB",
    description="Quick start example database"
)

database_id = vector_db.id
print(f"Created database: {vector_db.name} (ID: {database_id})")

Sample response:

Created database: Quickstart_DB123456789 (ID: b7123456789-09876-4567)

Upload files

Supported file types

  • PDF (.pdf)
  • Word documents (.docx)
  • Markdown (.md)

File size guidelines

Upload multiple files, up to 4GB each.

Markdown formatting guidelines

Before uploading, check that your Markdown files are properly formatted to avoid rejection:

  • All files must have correctly ordered headers (# followed by ##, and so on) with titles and meaningful content. For example:
# Customer service plan

Some content, separated from the header by a line. 

## Unaccompanied minor service

Some more content

### Etc.
  • Avoid using headers with more than 6 hashtags (e.g., ####### Pointlessly small md header)

Find some example Markdown files here.


Upload a file for ingestion

Next, upload your files for processing.

Note: If you already have file_ids from a separate ingestion job, you can skip this step and use the same file_ids.

# Upload a file 

# Windows (use raw string to avoid backslash issues):
file_path = r"C:\Users\username\Downloads\document.pdf" # Replace with your file path

# Mac/Linux
file_path = "/Users/username/Downloads/document.pdf" # Replace with your file path

upload_response = client.files.upload(file_path, purpose="alignment")
file_id = upload_response.id
print(f"Uploaded file with ID: {file_id}")

Once uploaded, each file is given a unique file_id to use for ingestion.

Upload a batch of files for ingestion

The endpoint accepts an array of file_ids as input.

from seekrai import SeekrFlow
client = SeekrFlow()

bulk_resp = client.files.bulk_upload(
    ["documentation.pdf", "policies.md", "guidelines.docx"], purpose="alignment"
)
print("Upload complete")

# Access and print the ID of each uploaded file
print("\nFile IDs:")
for resp in bulk_resp:
    print(resp.id)

Sample response:

Uploading file documentation.pdf: 100%|██████████████████████████████████████████████████████████████████████| 7.16M/7.16M [00:08<00:00, 857kB/s]
Uploading file policies.md: 100%|█████████████████████████████████████████████████████████████████| 1.26M/1.26M [00:08<00:00, 151kB/s]
Uploading file guidelines.docx: 100%|███████████████████████████████████████████████████████████| 21.9k/21.9k [00:08<00:00, 2.62kB/s]
Upload complete

File IDs:
file-457989bc-2cf5-11f0-8b3b-56f95a5e9ef4
file-45f909da-2cf5-11f0-8b3b-56f95a5e9ef4
file-46226d20-2cf5-11f0-8b3b-56f95a5e9ef4

Begin vector database ingestion

Next, create a job to ingest documents into your vector database. This step converts the files and creates embeddings from them.

The token_count parameter specifies the target size of each chunk, ensuring each chunk is neither too large (risking truncation by model limits) nor too small (losing semantic coherence).

Best practices:

  1. Common ranges: For embedding and retrieval, 200–500 tokens per chunk is a widely used range, balancing context and efficiency. The example here uses a token count of 512.
  2. Adjust for document type: If your documents are dense or have complex structure (e.g., legal, technical), consider slightly larger chunks; for conversational or highly variable content, smaller chunks may work better.

The overlap_tokens parameter creates overlapping regions between adjacent chunks at chunk boundaries, reducing the risk of missing relevant information that spans two chunks.

Adjust chunking parameters based on document characteristics:

Document TypeRecommended token_countRecommended overlap_tokens
Technical documentation384-51250-75
Legal documents512-76875-100
Conversational content256-38425-50

The method parameter determines how files are converted to text before chunking. The quality of this conversion directly affects chunking accuracy and downstream tasks. Setting it tobest specifies the best available method for document conversion.

# Create ingestion job
ingestion_job = client.vector_database.create_ingestion_job(
    database_id=database_id,
    files=[file_id],
    method="best",
    token_count=512,
    overlap_tokens=50
)

job_id = ingestion_job.id
print(f"Created ingestion job: {job_id}")

Sample response:

Created ingestion job: ij-d80bd45a-4bb5-4bac-bbf3-7e3345409bc8

Monitor ingestion status

After starting an ingestion job, you can check its status until completion:

import time

timeout = 300  # 5 minutes timeout
interval = 5   # Check every 5 seconds
start_time = time.time() # Track start time

while True:
    job_status = client.vector_database.retrieve_ingestion_job(database_id, job_id)
    status = job_status.status
    print(f"Ingestion job status: {status}")

    if status == "completed":
        print("Vector database ready!")
        break
    elif status == "failed":
        error = getattr(job_status, "error_message", "Unknown error")
        print(f"Ingestion job failed: {error}")
        break

    # Check if timeout was exceeded
    elapsed_time = time.time() - start_time
    if elapsed_time >= timeout:
      	print(f"Timeout reached after {timeout} seconds. Job status: {status}")
      	break
        
    time.sleep(interval)

Sample response:

Ingestion job status: completed
Vector database ready!

Complete example: Database creation > document ingestion

This example demonstrates the entire workflow for creating a vector database, adding files, and kicking off an ingestion job:

from seekrai import SeekrFlow
import time
import os

client = SeekrFlow()

# Step 1: Create vector database
print("Creating vector database...")
db_name = f"QuickStart_DB_{int(time.time())}"
vector_db = client.vector_database.create(
    model="intfloat/e5-mistral-7b-instruct",
    name=db_name,
    description="Quick start example database"
)
database_id = vector_db.id
print(f"Created database: {vector_db.name} (ID: {database_id})")

# Step 2: Upload file
print("Uploading file...")
file_path = "document.pdf"  # Replace with your file path
upload_response = client.files.upload(file_path, purpose="alignment")
file_id = upload_response.id
print(f"Uploaded file with ID: {file_id}")

# Step 3: Begin vector database ingestion
print("Creating ingestion job...")
ingestion_job = client.vector_database.create_ingestion_job(
    database_id=database_id,
    files=[file_id],
    method="best",
    token_count=512,
    overlap_tokens=50
)
job_id = ingestion_job.id
print(f"Created ingestion job with ID: {job_id}")

# Step 4: Wait for ingestion job to complete
print("Waiting for ingestion job to complete...")
timeout = 300  # 5 minutes timeout
interval = 5    # Check every 5 seconds

# Step 5: Monitor job status until completion
while True:
    job_status = client.vector_database.retrieve_ingestion_job(database_id, job_id)
    status = job_status.status
    print(f"Ingestion job status: {status}")
    
    if status == "completed":
        print(f"Vector database ready with ID: {database_id}")
        break
    elif status == "failed":
        error = getattr(job_status, "error_message", "Unknown error")
        print(f"Ingestion job failed: {error}")
        break
        
    time.sleep(interval)

print("Setup complete!")

Create an agent

Agents are stateful objects, meaning that once you create one, it will be perpetually available. This allows you to create an agent that not only listens to your requests, but can respond in real time to external tools.

The example code below demonstrates the creation of an expert financial products recommendation agent using the FileSearch tool.

Note: Agents can be configured to use any base or fine-tuned model available on the SeekrFlow platform.

from seekrai import SeekrFlow 
from seekrai.types import CreateAgentRequest, FileSearch, FileSearchEnv

client = SeekrFlow()

# Create the agent
agent = client.agents.create(
    CreateAgentRequest(
        name="SeekrBot",
        instructions="You are SeekrBot, an expert in delivering concise, accurate, up-to-date recommendations. Respond only with data returned from the file_search tool.",
        model_id="meta-llama/Llama-3.1-8B-Instruct",  # or your choice of deployed model
        tools=[FileSearch(
            tool_env=FileSearchEnv(
                file_search_index=database_id,
                document_tool_desc="Search documents related to financial products including credit cards, payment/payroll services, accounting services, and lending.",
              	top_k=5,
              	score_threshold=0.5,
            )
        )],
    ))

# Retrieve the agent to get its details
agent_info = client.agents.retrieve(agent_id=agent.id)

# Print the agent ID and status
print(f"Agent ID: {agent.id}")
print(f"Agent Status: {agent_info.status}")

Sample response:

Agent ID: agent-8b77e8b3-011d-4210-8dae-003f907a3a29
Agent Status: AgentStatus.ACTIVE

How to use FileSearch parameters

file_search_index: Refers to the index created in the ingestion step.

document_tool_desc: Tool description with information for the agent. Write clear, concise descriptions that specify when the tool should be invoked (e.g., “Use this tool to search internal company policies when a user asks about HR procedures.”). Include example queries or scenarios to help the agent understand the tool’s intended use, and clearly define the scope and limitations of the tool (e.g., “This tool only searches technical documentation, not customer support tickets.”)

top_k: Number of results to return from the search. A higher top_k increases the likelihood of including relevant results but may introduce more noise. A lower top_k gives more focused results but risks missing relevant information. Common settings are between 3 and 10. For exploratory queries or when context is broad, consider higher values; for targeted tasks, use lower values.

score_threshold: Minimum score for a result to be returned. Use to filter out weak or irrelevant matches, improving the overall quality of search results. Start with a moderate threshold (e.g., 0.5–0.7 for cosine similarity) and adjust based on observed retrieval quality and user satisfaction.


List agents and check their status

Agent creation deploys an agent, which may take a few minutes. It becomes usable when its status is Active. You can access all agents you've created along with their name and status using:

available_agents = client.agents.list_agents()

print("Available agents:")
for agent in available_agents:
    print(f"ID: {agent.id}, Name: {agent.name}, Status: {agent.status}")

Sample response:

ID: agent-8b77e8b3-011d-4210-8dae-003f907a3a29,
Name: SeekrBot,
Agent Status: AgentStatus.ACTIVE
...

Create a thread

Once an agent has been created and has an Active status, it can be used to generate responses to user questions.

Agents run on top of threads, which are sequences of messages that represent a conversation. They store messages independently from agent execution, which provides flexibility in handling interactions and managing agent workflows. When a thread is run, the agent processes the messages in the thread to generate a new response, which is then appended to the thread.

Thread message structure

Each message within a thread has the following structure:

class ThreadMessage(BaseModel):
    id: str
    object: str = 'thread.message'
    created_at: datetime
    thread_id: str
    role: str  # e.g., 'user', 'assistant', 'system', 'tool'
    content: ThreadMessageContentType
    agent_id: Optional[str]
    run_id: Optional[str]
    meta_data: dict[str, Any]

Create a new thread

The following example creates a new thread:

from seekrai import SeekrFlow
client = SeekrFlow()

thread = client.agents.threads.create()
print("Thread created: ", thread.id)
Thread created: f406664d-214c-42b1-8fff-3faa928ab816

Why threads are decoupled from agents:

Agents operate on threads, but are not tied directly to them; running an agent on a thread locks it temporarily.

Once the agent finishes, the thread unlocks and stores the resulting message(s). This allows for asynchronous, parallel execution; long-running tasks won't require maintaining open connections, and several different agents can operate sequentially on the same thread: an essential capability for multi-agent workflows.

Retrieve a specific thread

thread = client.agents.threads.retrieve(thread_id=thread.id)
print(f"Thread retrieved: ID={thread.id}, Status={thread.status}")

List threads

This example lists the 20 most recent threads with their status in descending order. Adjust as needed.

threads = client.agents.threads.list(limit=20, order="desc")
print("Available threads:")
for thread in threads:
    print(f"ID: {thread.id}, Status: {thread.status}")

Delete a thread

deleted_status = client.agents.threads.delete(thread_id=thread.id)
print(f"Thread {thread_id} deleted: {deleted_status}")

Add messages to the thread

Messages represent inputs from users or outputs from agents. Messages should always contain thread ID, role and content, but can include optional metadata such as message ID, agent ID, run ID, and creation timestamp.

message = client.agents.threads.create_message(
    thread_id=thread.id,
    role="user",
    content="What is your name?"
)
print(f"Message created! ID: {message.id}, Content: {message.content}")

Sample response:

Message created! ID: 8340c65e-fdb9-48f0-b11c-d197ede1a6d1, Content: What is your name?

Retrieve a message

message = client.agents.threads.retrieve_message(
    thread_id=thread.id,
    message_id=message.id
)
print("Message retrieved!")
print(f"Thread ID: {message.thread_id}, Message ID: {message.id}") # If you're working with multiple threads, printing both IDs can help with tracking

Sample response:

Message retrieved!
Thread ID: 25689092-81eb-4ba8-84da-8af75d2f3685, Message ID: 8340c65e-fdb9-48f0-b11c-d197ede1a6d1

List messages

thread_id = "your thread id"
message_id = "your message id"

# List a single message
message = client.agents.threads.retrieve_message(
    thread_id=thread_id,
    message_id=message_id
)
print(" Message retrieved!")
print(f"Message ID: {message.id}")
print(f"Role: {message.role}")
print(f"Content: {message.content}")

# List all messages in a thread
messages = client.agents.threads.list_messages(thread_id=thread_id)
print(f"Messages in thread {thread_id}:")
for message in messages:
    print(f"Message ID: {message.id}")
    print(f"Role: {message.role}")
    print(f"Content: {message.content}")
    print("-" * 10)

Sample response:

Messages in thread 25689092-81eb-4ba8-84da-8af75d2f3685:
Message ID: 3986ae4d-3963-4812-80ed-7fafd9e352d5
Role: user
Content: Who sang 9 to 5?
----------
Message ID: e443dfe0-bf8c-467f-81c5-9a0b490b2260
Role: user
Content: What is your name?
----------

Update a message

thread_id = "your thread id"
message_id = "your message id"

updated_message = client.agents.threads.update_message(
    thread_id=thread_id,
    message_id=message_id,
    content="What can you help me with?"
)
print(f"Message updated in thread {thread_id}!")
print(f"Message ID: {updated_message.id}")
print(f"New Content: {updated_message.content}")

Sample response:

Message updated in thread 25689092-81eb-4ba8-84da-8af75d2f3685!
Message ID: 8340c65e-fdb9-48f0-b11c-d197ede1a6d1
New Content: What can you help me with?

Delete a message

thread_id = "your thread id"
message_id = "your message id"

deleted_status = client.agents.threads.delete_message(
    thread_id=thread_id,
    message_id=message_id
)
print(f"Message {message_id} deleted from thread {thread_id}!")

Run inference on your agent

Running a thread initiates the conversation flow, enabling your agent to process user messages using your knowledge base. The agent retrieves relevant information from your vector database and generates contextually appropriate responses.

Using RunSteps (StreamChunks)

RunSteps provide detailed visibility into an agent's internal processing states:

  • They capture reasoning steps, tool invocations, intermediate results, and model interactions
  • RunSteps are transient and not stored in the thread, preventing context length inflation
  • They're specifically designed for real-time streaming and consumption

Each RunStep (StreamChunk) contains execution information that may include:

  • Reasoning outputs
  • Tool requests and responses
  • Intermediate model outputs and inputs

Important: Since RunSteps are not persisted in the system, they should be streamed and consumed in real-time.

Accessing RunSteps

To access RunSteps, you need to create a run and then attach to its stream:

agent_id = "your agent id"
thread_id = "your thread id"

# Create a run
run = client.agents.runs.run(
    agent_id,
    thread_id=thread_id,
    stream=False  # Must be False
)
run_id = run.run_id

# Then attach to the stream to access RunSteps
stream = client.agents.runs.attach(run_id, thread_id)
for chunk in stream:
    # Process each RunStep here
    print(f"RunStep: {chunk}")

Complete example: Run a thread with RunStep streaming

The following example demonstrates the complete process of creating a thread, sending a message, running an agent, capturing RunSteps, and retrieving the final response:

client = SeekrFlow()

# Create a new thread or use an existing one
thread_id = None  # Replace with your thread ID if you have one
if not thread_id:
    print("Creating new thread...")
    thread = client.agents.threads.create()
    thread_id = thread.id
    print(f"New thread created with ID: {thread_id}")
else:
    print(f"Using existing thread with ID: {thread_id}")

# Create a user message in the thread
print("Creating a user message...")
message = client.agents.threads.create_message(
    thread_id=thread_id,
    role="user",
    content="What can you help me with today?"
)
print(f"Message created! Message ID: {message.id}")

# Start a run with the agent (without streaming to get run_id)
print("Starting run...")
agent_id = None # Replace with your agent ID if you have one
run = client.agents.runs.run(
    agent_id,
    thread_id=thread_id,
    stream=False  # Important: must be False to get run_id
)
run_id = run.run_id
print(f"Run started! Run ID: {run_id}, Thread ID: {thread_id}")

# Attach to the stream to get RunSteps
print("Attaching to stream to receive intermediate outputs...")
stream = client.agents.runs.attach(run_id, thread_id)
print("Receiving stream chunks:")
chunk_count = 0
for chunk in stream:
    chunk_count += 1
    print(f"Chunk #{chunk_count}: {chunk}")
print(f"Streaming complete. Received {chunk_count} chunks total.")

# Retrieve the agent's response
print("Retrieving agent's response...")
messages = client.agents.threads.list_messages(thread_id)
# Find the agent's response (should be the message after our user message)
for i, msg in enumerate(messages):
    if msg.id == message.id and i > 0:
        agent_response = messages[i-1]
        print(f"Agent response retrieved. Message ID: {agent_response.id}")
        print(f"Response content: {agent_response.content}")
        break

Sample response:

Creating new thread...
New thread created with ID: 25689092-81eb-4ba8-84da-8af75d2f3685
Creating a user message...
Message created! Message ID: 256d4a0e-60e4-46ae-8d98-c788cb1b9661
Starting run...
Run started! Run ID: 897ff820-c54b-4c2d-922c-ecfd945c34d9, Thread ID: 25689092-81eb-4ba8-84da-8af75d2f3685
Attaching to stream to receive intermediate outputs...
Receiving stream chunks:
Chunk #1: ...
... 
Streaming complete. Received 1267 chunks total.
Retrieving agent's response...
Agent response retrieved. Message ID: 3f7a83d1-3626-4248-9bf2-dbfef348dce7
Response content: I can help you explore and decide which financial products are right for you, credit cards, payment/payroll services, accounting services, and lending.

Agent management

Here's some other things you might need:

Promote an agent

Note: Agent create requests will promote an agent automatically. Agents only need to be promoted after they've been demoted and are in an inactive state.

agent = client.agents.promote(agent.id)
print(f"Agent promoted. Agent ID: {agent.id}")

Demote an agent

Demoting an agent moves it to an inactive state. This is useful when you want to retain the agent's definition without allowing it to handle inference requests.

agent = client.agents.demote(agent.id)
print(f"Agent demoted. Agent ID: {agent.id}")

Delete an agent

This permanently removes an agent from the SeekrFlow platform.

del_response = client.agents.delete(agent.id)
print(f"Agent deleted. Agent ID: {agent.id}")

Vector database management

List all vector databases

databases = await client.vector_database.list()

for db in databases.data:
print(f"ID: {db.id}, Name: {db.name}")

Get a specific vector database

# Get vector database details
db_details = client.vector_database.retrieve(database_id)

print(f"Name: {db_details.name}")
print(f"Last updated: {db_details.updated_at}")

Delete a vector database

# Delete a vector database
client.vector_database.delete(database_id)
print(f"Successfully deleted database {database_id}")

List all files in a vector database

# List files in vector database
db_files = client.vector_database.list_files(database.id)

for file in db_files.data:
    print(f"ID: {file.id}, Filename: {file.filename}")

Delete a file from a vector database

# Delete a file from vector database
client.vector_database.delete_file(database.id, file.id)
print(f"Successfully deleted file {file.id} from {database.id}")

Miscellaneous file operations

List all uploaded files

# List all files
files_response = client.files.list()

for file in files_response.data:
    print(f"ID: {file.id}, Filename: {file.filename}")

Delete a file from the system

# Delete a file
client.files.delete(file.id)
print(f"Successfully deleted file {file.id}")

Troubleshooting

Document processing issues

IssuePossible causeSolution
Files fail to uploadFile exceeds size limitSplit large files or compress them
Invalid file formatEnsure file extension matches actual format
Network timeoutImplement retry logic with exponential backoff
Markdown parsing errorsImproper header hierarchyFix header structure (ensure proper nesting)
Unsupported Markdown syntaxUse standard Markdown formatting
PDF extraction issuesProtected PDFRemove password protection before uploading

Vector database issues

IssuePossible causeSolution
Slow ingestionComplex document structureAdjust chunking parameters
Resource constraintsMonitor system resources during ingestion
Large batch sizeBreak into smaller batches
Failed ingestion jobMalformed contentCheck files for compatibility issues
Service timeoutIncrease timeout settings

Retrieval issues

IssuePossible causeSolution
Irrelevant resultsPoor chunking strategyAdjust token_count and overlap_tokens
Missing informationContent spans chunksIncrease overlap_tokens
Information not in databaseVerify ingestion statusVerify document ingestion status; list files in vector database
Agent provides content not found in your vector storeInstruction leakageUpdate agent instructions to explicitly restrict responses to vector store content only; implement context-grounded fine-tuning

Next