Overview
RDK integrates seamlessly with FastAPI. This guide shows how to set up tracing in a production FastAPI application.
Basic Setup
Use FastAPI’s lifespan handler to initialize and shutdown RDK:
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI
from rdk import init, shutdown
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
init(
endpoint=os.environ["RDK_ENDPOINT"],
api_key=os.environ["RDK_API_KEY"],
batch_size=10,
flush_interval=5.0,
)
yield # App runs here
# Shutdown
shutdown()
app = FastAPI(title="My LLM API", lifespan=lifespan)
Complete Example
Here’s a full FastAPI application with RDK tracing:
import os
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI, HTTPException
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_core.tools import tool
from pydantic import BaseModel
from rdk import flush, init, observe, shutdown
@asynccontextmanager
async def lifespan(app: FastAPI):
init(
endpoint=os.environ["RDK_ENDPOINT"],
api_key=os.environ["RDK_API_KEY"],
batch_size=10,
flush_interval=5.0,
)
yield
shutdown()
app = FastAPI(title="LLM API with Tracing", lifespan=lifespan)
# Request/Response models
class ChatRequest(BaseModel):
message: str
model: str = "claude-sonnet-4-6"
class ChatResponse(BaseModel):
response: str
model: str
# Chat endpoint with tracing
@app.post("/chat", response_model=ChatResponse)
@observe(name="api-chat")
async def chat_endpoint(request: ChatRequest) -> ChatResponse:
try:
llm = ChatAnthropic(model=request.model)
response = await llm.ainvoke(request.message)
return ChatResponse(
response=response.content,
model=request.model,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Health check
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# Manual flush endpoint (useful for debugging)
@app.post("/flush")
async def flush_traces():
flush()
return {"status": "flushed"}
Agent Endpoint
Add an endpoint with tool calling:
class AgentRequest(BaseModel):
question: str
class AgentResponse(BaseModel):
answer: str
tools_used: list[str]
@tool
def search(query: str) -> str:
"""Search for information."""
return f"Search results for: {query}"
@tool
def calculate(expr: str) -> str:
"""Calculate math expression."""
return str(eval(expr))
tools = [search, calculate]
tool_map = {t.name: t for t in tools}
@app.post("/agent", response_model=AgentResponse)
@observe(name="api-agent")
async def agent_endpoint(request: AgentRequest) -> AgentResponse:
llm = ChatAnthropic(model="claude-sonnet-4-6")
llm_with_tools = llm.bind_tools(tools)
messages: list[Any] = [HumanMessage(content=request.question)]
tools_used = []
for _ in range(5): # Max iterations
response = await llm_with_tools.ainvoke(messages)
messages.append(response)
if not response.tool_calls:
return AgentResponse(
answer=response.content,
tools_used=tools_used,
)
for tc in response.tool_calls:
result = tool_map[tc["name"]].invoke(tc["args"])
tools_used.append(tc["name"])
messages.append(ToolMessage(
content=result,
tool_call_id=tc["id"]
))
return AgentResponse(
answer="Max iterations reached",
tools_used=tools_used,
)
Request Tracking
Add request IDs to traces for debugging:
import uuid
from fastapi import Request
from rdk import observe, get_current_trace_id
@app.post("/chat")
@observe(name="api-chat")
async def chat_with_request_id(
request: Request,
body: ChatRequest
) -> ChatResponse:
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
trace_id = get_current_trace_id()
# Correlate request_id with trace_id in logs
...
Error Handling
RDK captures errors automatically, but you can add context:
from fastapi import HTTPException
from rdk import observe
@app.post("/chat")
@observe(name="api-chat", tags=["chat"])
async def chat_endpoint(request: ChatRequest) -> ChatResponse:
try:
# Your logic
...
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail="Internal error")
Running the App
# Set environment variables
export RDK_ENDPOINT="https://collector.021labs.ai"
export RDK_API_KEY="your-api-key"
export ANTHROPIC_API_KEY="your-anthropic-key"
# Run with uvicorn
uvicorn app:app --reload
Testing
Disable tracing in tests using test_mode=True or enabled=False:
import pytest
from rdk import init, shutdown
@pytest.fixture(autouse=True)
def rdk_test():
init(
endpoint="https://placeholder",
api_key="test",
test_mode=True, # No HTTP calls, tool calls return "MOCKED"
)
yield
shutdown()
Production Considerations
Always call shutdown() in your lifespan handler to flush pending traces before the app exits.
- Set appropriate batch size — Higher values reduce network calls
- Configure flush interval — Balance latency vs. data freshness
- Use PII redaction — Required for compliance
- Monitor flush endpoint — Useful for debugging trace delivery
See Also