- Processes and indexes your documents
- Searches across multiple sources to answer questions
- Provides confidence ratings with explanations
- Cites specific sources for each answer
- SeekrFlow API key
- Documents in PDF, DOCX, or Markdown format
- Python 3.8+
1. Setup and Installation
1. Setup and Installation
Install the necessary packages to begin.
# Import required libraries
import os
import time
import re
from datetime import datetime
import pickle
from typing import List, Dict, Any
from seekrai import SeekrFlow
from seekrai.types import CreateAgentRequest, FileSearch, FileSearchEnv
2. Configure your Environment
2. Configure your Environment
Load your API key and initialize the SeekrFlow client.
# Initialize the SeekrFlow client
from seekrai import SeekrFlow
# Create the SeekrFlow client
client = SeekrFlow(api_key=seekr_api_key)
3. Create a vector database
3. Create a vector database
Create an empty vector database for your documents.
def create_vector_database(name="Onboarding_DB", description="Database for onboarding document QA"):
"""Create a new vector database for document storage."""
# Create vector database with SeekrFlow's embeddings model
vector_db = client.vector_database.create(
model="intfloat/e5-mistral-7b-instruct", # Using SeekrFlow's recommended model
name="BennyBotDB",
description="A database containing employee guidelines and benefits",
)
print(f"Created vector database: {vector_db.name} (ID: {vector_db.id})")
return vector_db
# Create a new vector database
vector_db = create_vector_database()
database_id = vector_db.id
4. Upload your documents
4. Upload your documents
Now, upload your documents to SeekrFlow’s AI-Ready Data Engine.
def upload_documents(file_paths):
"""Upload documents to SeekrFlow."""
file_ids = []
# Upload each file
for file_path in file_paths:
print(f"Uploading {file_path}...")
upload_response = client.files.upload(file_path, purpose="alignment")
file_ids.append(upload_response.id)
print(f"Uploaded file with ID: {upload_response.id}")
return file_ids
5. Process and ingest documents
5. Process and ingest documents
Next, begin the ingestion process, where your documents are transformed to embeddings and stored in your new vector database.
def ingest_documents(database_id, file_ids, token_count=512, overlap_tokens=50):
"""Ingest documents into the vector database."""
# Create ingestion job
ingestion_job = client.vector_database.create_ingestion_job(
database_id=database_id,
files=file_ids,
method="best",
token_count=512,
overlap_tokens=50
)
job_id = ingestion_job.id
print(f"Created ingestion job: {job_id}")
# Wait for ingestion to complete
import time
timeout = 300 # 5 minutes timeout
interval = 5 # Check every 5 seconds
start_time = time.time()
while True:
if time.time() - start_time > timeout:
print("Timeout waiting for ingestion job to complete")
break
job_status = client.vector_database.retrieve_ingestion_job(database_id, job_id)
status = job_status.status
print(f"Ingestion job status: {status}")
if status == "completed":
print("Vector database ready!")
break
elif status == "failed":
error = getattr(job_status, "error_message", "Unknown error")
print(f"Ingestion job failed: {error}")
break
time.sleep(interval)
return job_id
file_paths = [
"company-guidebook.pdf",
"company-holidays.pdf",
"company-payroll-schedule.pdf",
]
database_id = "" # Replace with your actual database ID
# Upload documents and process them
file_ids = upload_documents(file_paths)
job_id = ingest_documents(database_id, file_ids)
6. Create your agent
6. Create your agent
Now create a SeekrFlow agent with citation capabilities built into the prompt, connect it to your vector database, and attach a FileSearch tool.
# Create the agent
def create_search_agent(database_id, agent_name="BennyBot"):
"""Create a SeekrFlow agent with FileSearch tool connected to the vector database."""
# Create the agent with the FileSearch tool
agent = client.agents.create(
CreateAgentRequest(
name="BennyBot",
instructions="""You are an expert onboarding assistant that provides reliable answers based on document search results.
For each question:
1. Search the document repository using the file_search tool
2. Always analyze information from multiple sources when available
3. Rate your confidence ONLY when you find relevant information in the search results on a scale of 1-10, where:
1 = Complete guess
5 = Moderately confident
10 = Absolutely certain
4. After your answer, on a new line, start with "Confidence: [X/10]" and briefly explain your confidence rating
5. Always cite your sources, including specific document names
Do not provide confidence scores when no information is found.
Only use information found in the search results. If information cannot be found, respond with "I could not find this information in your documents and do not provide a confidence score.""",
model_id="meta-llama/Llama-3.1-8B-Instruct",
tools=[
FileSearch(
tool_env=FileSearchEnv(
file_search_index=database_id,
document_tool_desc="Search Seekr documents to answer employee onboarding questions accurately.",
top_k=6, # Retrieve specified number of most relevant document chunks
score_threshold=0.5, # Only return results with score above 0.5
)
)
],
)
)
# Wait for agent to become active
print(f"Agent created with ID: {agent.id}")
print("Waiting for agent to become active...")
active = False
timeout = 300 # 5 minutes
start_time = time.time()
while not active and time.time() - start_time <Accordion timeout:
agent_info = client.agents.retrieve(agent_id=agent.id)
status = agent_info.status
print(f"Agent status: {status}")
if str(status).endswith("ACTIVE"):
active = True
print("Agent is now active!")
break
time.sleep(10) # Check every 10 seconds
if not active:
print("Timeout waiting for agent to become active.")
return agent.id
database_id=" " # Your database ID here
7. Create a thread and interact with your agent
7. Create a thread and interact with your agent
Now you can create a thread and add messages to it in order to test your agent. The parse_response function starting on line 237 is what extracts the confidence score and answers in addition to the answer.
print(f"Final agent ID: {agent_id}")
def create_thread():
"""Create a new conversation thread for the agent."""
thread = client.agents.threads.create()
print(f"Thread created with ID: {thread.id}")
return thread.id
def send_question(agent_id, thread_id, question):
"""Send a question to the agent and get the response.
This function handles the entire process of:
1. Adding your question to the conversation thread
2. Running the agent to process the question
3. Waiting for the agent to generate a response
4. Retrieving and returning the response
"""
# Step 1: Add your question to the thread
message = client.agents.threads.create_message(
thread_id=thread_id,
role="user", # This indicates it's a user message
content=question # The actual question text
)
print(f"Message sent with ID: {message.id}")
# Step 2: Run the agent on the thread to get a response
# Note: stream=False means we'll wait for the complete response
run = client.agents.runs.run(
agent_id,
thread_id=thread_id,
stream=False # Set to True if you want to see the response as it's generated
)
print(f"Run created with ID: {run.run_id}")
# Step 3: Wait for the agent to finish processing
run_id = run.run_id
completed = False
timeout = 300 # Maximum wait time (5 minutes)
start_time = time.time()
# Check the status periodically until complete or timeout
while not completed and time.time() - start_time < timeout:
# Get the current status of the run
run_status = client.agents.runs.retrieve(run_id=run_id, thread_id=thread_id)
status = run_status.status
print(f"Run status: {status}")
# If the run is completed, break out of the loop
if str(status).endswith("COMPLETED"):
completed = True
print("Run completed!")
break
# Wait before checking again
time.sleep(2) # Check every 2 seconds
# If we timed out, return None
if not completed:
print("Timeout waiting for run to complete.")
return None
# Step 4: Get the agent's response from the thread
messages = client.agents.threads.list_messages(thread_id=thread_id)
# Find messages from the assistant (the agent)
assistant_messages = [m for m in messages if m.role == "assistant"]
if assistant_messages:
# Get the most recent assistant message (the response to our question)
response = assistant_messages[0].content
return response
return None
def parse_response(response):
"""Extract answer, confidence score, and sources from agent response."""
if not response:
return {
"answer": "No response received from agent.",
"confidence": "0/10",
"explanation": "Agent did not provide a response.",
"sources": []
}
# Default values
answer = response
confidence = "Not provided"
confidence_explanation = ""
sources = []
# Extract confidence information if present
if "Confidence:" in response:
parts = response.split("Confidence:")
answer = parts[0].strip()
confidence_part = parts[1].strip()
# Try to extract the numeric rating and explanation
import re
rating_match = re.search(r'(\d+)(/10)?', confidence_part)
if rating_match:
confidence = f"{rating_match.group(1)}/10"
# Get explanation (everything after the number)
explanation_text = re.sub(r'^\d+(/10)?', '', confidence_part).strip()
if explanation_text:
confidence_explanation = explanation_text
# Try to extract sources if mentioned
source_patterns = [
r'Source[s]?:(.+?)(?=\n\n|\Z)',
r'From (.+?)(?=\n\n|\Z)',
r'According to (.+?)(?=\n\n|\Z)',
r'Based on (.+?)(?=\n\n|\Z)'
]
for pattern in source_patterns:
source_match = re.search(pattern, response, re.IGNORECASE | re.DOTALL)
if source_match:
source_text = source_match.group(1).strip()
# Split multiple sources if present
source_list = [s.strip() for s in re.split(r',|\n', source_text) if s.strip()]
sources.extend(source_list)
return {
"answer": answer,
"confidence": confidence,
"explanation": confidence_explanation,
"sources": sources
}
# Create a thread
thread_id = create_thread()
# Ask a question
question = "What is machine learning?"
response = send_question(agent_id, thread_id, question)
# Parse and display the response
result = parse_response(response)
print("\n" + "="*50)
print(f"Answer: {result['answer']}\n")
print(f"Confidence: {result['confidence']}")
if result.get('explanation'):
print(f"Explanation: {result['explanation']}\n")
if result.get('sources') and len(result['sources']) > 0:
print("\nSources:")
for i, source in enumerate(result['sources']):
print(f"{i+1}. {source}")
# Import required libraries
import os
import time
import re
from datetime import datetime
import pickle
from typing import List, Dict, Any
from seekrai import SeekrFlow
from seekrai.types import CreateAgentRequest, FileSearch, FileSearchEnv
# Initialize the SeekrFlow client
from seekrai import SeekrFlow
# Create the SeekrFlow client
client = SeekrFlow(api_key=seekr_api_key)
def create_vector_database(name="Onboarding_DB", description="Database for onboarding document QA"):
"""Create a new vector database for document storage."""
# Create vector database with SeekrFlow's embeddings model
vector_db = client.vector_database.create(
model="intfloat/e5-mistral-7b-instruct", # Using SeekrFlow's recommended model
name="BennyBotDB",
description="A database containing employee guidelines and benefits",
)
print(f"Created vector database: {vector_db.name} (ID: {vector_db.id})")
return vector_db
# Create a new vector database
vector_db = create_vector_database()
database_id = vector_db.id
def upload_documents(file_paths):
"""Upload documents to SeekrFlow."""
file_ids = []
# Upload each file
for file_path in file_paths:
print(f"Uploading {file_path}...")
upload_response = client.files.upload(file_path, purpose="alignment")
file_ids.append(upload_response.id)
print(f"Uploaded file with ID: {upload_response.id}")
return file_ids
def ingest_documents(database_id, file_ids, token_count=512, overlap_tokens=50):
"""Ingest documents into the vector database."""
# Create ingestion job
ingestion_job = client.vector_database.create_ingestion_job(
database_id=database_id,
files=file_ids,
method="best",
token_count=512,
overlap_tokens=50
)
job_id = ingestion_job.id
print(f"Created ingestion job: {job_id}")
# Wait for ingestion to complete
import time
timeout = 300 # 5 minutes timeout
interval = 5 # Check every 5 seconds
start_time = time.time()
while True:
if time.time() - start_time > timeout:
print("Timeout waiting for ingestion job to complete")
break
job_status = client.vector_database.retrieve_ingestion_job(database_id, job_id)
status = job_status.status
print(f"Ingestion job status: {status}")
if status == "completed":
print("Vector database ready!")
break
elif status == "failed":
error = getattr(job_status, "error_message", "Unknown error")
print(f"Ingestion job failed: {error}")
break
time.sleep(interval)
return job_id
file_paths = [
"company-guidebook.pdf",
"company-holidays.pdf",
"company-payroll-schedule.pdf",
]
database_id = "" # Replace with your actual database ID
# Upload documents and process them
file_ids = upload_documents(file_paths)
job_id = ingest_documents(database_id, file_ids)
# Create the agent
def create_search_agent(database_id, agent_name="BennyBot"):
"""Create a SeekrFlow agent with FileSearch tool connected to the vector database."""
# Create the agent with the FileSearch tool
agent = client.agents.create(
CreateAgentRequest(
name="BennyBot",
instructions="""You are an expert onboarding assistant that provides reliable answers based on document search results.
For each question:
1. Search the document repository using the file_search tool
2. Always analyze information from multiple sources when available
3. Rate your confidence ONLY when you find relevant information in the search results on a scale of 1-10, where:
1 = Complete guess
5 = Moderately confident
10 = Absolutely certain
4. After your answer, on a new line, start with "Confidence: [X/10]" and briefly explain your confidence rating
5. Always cite your sources, including specific document names
Do not provide confidence scores when no information is found.
Only use information found in the search results. If information cannot be found, respond with "I could not find this information in your documents and do not provide a confidence score.""",
model_id="meta-llama/Llama-3.1-8B-Instruct",
tools=[
FileSearch(
tool_env=FileSearchEnv(
file_search_index=database_id,
document_tool_desc="Search Seekr documents to answer employee onboarding questions accurately.",
top_k=6, # Retrieve specified number of most relevant document chunks
score_threshold=0.5, # Only return results with score above 0.5
)
)
],
)
)
# Wait for agent to become active
print(f"Agent created with ID: {agent.id}")
print("Waiting for agent to become active...")
active = False
timeout = 300 # 5 minutes
start_time = time.time()
while not active and time.time() - start_time <RequestExample timeout:
agent_info = client.agents.retrieve(agent_id=agent.id)
status = agent_info.status
print(f"Agent status: {status}")
if str(status).endswith("ACTIVE"):
active = True
print("Agent is now active!")
break
time.sleep(10) # Check every 10 seconds
if not active:
print("Timeout waiting for agent to become active.")
return agent.id
database_id=" " # Your database ID here
agent_id = create_search_agent(database_id, agent_name="BennyBot")
print(f"Final agent ID: {agent_id}")
def create_thread():
"""Create a new conversation thread for the agent."""
thread = client.agents.threads.create()
print(f"Thread created with ID: {thread.id}")
return thread.id
def send_question(agent_id, thread_id, question):
"""Send a question to the agent and get the response.
This function handles the entire process of:
1. Adding your question to the conversation thread
2. Running the agent to process the question
3. Waiting for the agent to generate a response
4. Retrieving and returning the response
"""
# Step 1: Add your question to the thread
message = client.agents.threads.create_message(
thread_id=thread_id,
role="user", # This indicates it's a user message
content=question # The actual question text
)
print(f"Message sent with ID: {message.id}")
# Step 2: Run the agent on the thread to get a response
# Note: stream=False means we'll wait for the complete response
run = client.agents.runs.run(
agent_id,
thread_id=thread_id,
stream=False # Set to True if you want to see the response as it's generated
)
print(f"Run created with ID: {run.run_id}")
# Step 3: Wait for the agent to finish processing
run_id = run.run_id
completed = False
timeout = 300 # Maximum wait time (5 minutes)
start_time = time.time()
# Check the status periodically until complete or timeout
while not completed and time.time() - start_time < timeout:
# Get the current status of the run
run_status = client.agents.runs.retrieve(run_id=run_id, thread_id=thread_id)
status = run_status.status
print(f"Run status: {status}")
# If the run is completed, break out of the loop
if str(status).endswith("COMPLETED"):
completed = True
print("Run completed!")
break
# Wait before checking again
time.sleep(2) # Check every 2 seconds
# If we timed out, return None
if not completed:
print("Timeout waiting for run to complete.")
return None
# Step 4: Get the agent's response from the thread
messages = client.agents.threads.list_messages(thread_id=thread_id)
# Find messages from the assistant (the agent)
assistant_messages = [m for m in messages if m.role == "assistant"]
if assistant_messages:
# Get the most recent assistant message (the response to our question)
response = assistant_messages[0].content
return response
return None
def parse_response(response):
"""Extract answer, confidence score, and sources from agent response."""
if not response:
return {
"answer": "No response received from agent.",
"confidence": "0/10",
"explanation": "Agent did not provide a response.",
"sources": []
}
# Default values
answer = response
confidence = "Not provided"
confidence_explanation = ""
sources = []
# Extract confidence information if present
if "Confidence:" in response:
parts = response.split("Confidence:")
answer = parts[0].strip()
confidence_part = parts[1].strip()
# Try to extract the numeric rating and explanation
import re
rating_match = re.search(r'(\d+)(/10)?', confidence_part)
if rating_match:
confidence = f"{rating_match.group(1)}/10"
# Get explanation (everything after the number)
explanation_text = re.sub(r'^\d+(/10)?', '', confidence_part).strip()
if explanation_text:
confidence_explanation = explanation_text
# Try to extract sources if mentioned
source_patterns = [
r'Source[s]?:(.+?)(?=\n\n|\Z)',
r'From (.+?)(?=\n\n|\Z)',
r'According to (.+?)(?=\n\n|\Z)',
r'Based on (.+?)(?=\n\n|\Z)'
]
for pattern in source_patterns:
source_match = re.search(pattern, response, re.IGNORECASE | re.DOTALL)
if source_match:
source_text = source_match.group(1).strip()
# Split multiple sources if present
source_list = [s.strip() for s in re.split(r',|\n', source_text) if s.strip()]
sources.extend(source_list)
return {
"answer": answer,
"confidence": confidence,
"explanation": confidence_explanation,
"sources": sources
}
# Create a thread
thread_id = create_thread()
# Ask a question
question = "What is machine learning?"
response = send_question(agent_id, thread_id, question)
# Parse and display the response
result = parse_response(response)
print("\n" + "="*50)
print(f"Answer: {result['answer']}\n")
print(f"Confidence: {result['confidence']}")
if result.get('explanation'):
print(f"Explanation: {result['explanation']}\n")
if result.get('sources') and len(result['sources']) > 0:
print("\nSources:")
for i, source in enumerate(result['sources']):
print(f"{i+1}. {source}")
print("="*50)
{"success":true}