Skip to main content

Overview

RDK integrates seamlessly with FastAPI. This guide shows how to set up tracing in a production FastAPI application.

Basic Setup

Use FastAPI’s lifespan handler to initialize and shutdown RDK:
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI
from rdk import init, shutdown

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    init(
        endpoint=os.environ["RDK_ENDPOINT"],
        api_key=os.environ["RDK_API_KEY"],
        batch_size=10,
        flush_interval=5.0,
    )

    yield  # App runs here

    # Shutdown
    shutdown()

app = FastAPI(title="My LLM API", lifespan=lifespan)

Complete Example

Here’s a full FastAPI application with RDK tracing:
import os
from contextlib import asynccontextmanager
from typing import Any

from fastapi import FastAPI, HTTPException
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_core.tools import tool
from pydantic import BaseModel

from rdk import flush, init, observe, shutdown


@asynccontextmanager
async def lifespan(app: FastAPI):
    init(
        endpoint=os.environ["RDK_ENDPOINT"],
        api_key=os.environ["RDK_API_KEY"],
        batch_size=10,
        flush_interval=5.0,
    )
    yield
    shutdown()


app = FastAPI(title="LLM API with Tracing", lifespan=lifespan)


# Request/Response models
class ChatRequest(BaseModel):
    message: str
    model: str = "claude-sonnet-4-6"


class ChatResponse(BaseModel):
    response: str
    model: str


# Chat endpoint with tracing
@app.post("/chat", response_model=ChatResponse)
@observe(name="api-chat")
async def chat_endpoint(request: ChatRequest) -> ChatResponse:
    try:
        llm = ChatAnthropic(model=request.model)
        response = await llm.ainvoke(request.message)

        return ChatResponse(
            response=response.content,
            model=request.model,
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


# Health check
@app.get("/health")
async def health_check():
    return {"status": "healthy"}


# Manual flush endpoint (useful for debugging)
@app.post("/flush")
async def flush_traces():
    flush()
    return {"status": "flushed"}

Agent Endpoint

Add an endpoint with tool calling:
class AgentRequest(BaseModel):
    question: str


class AgentResponse(BaseModel):
    answer: str
    tools_used: list[str]


@tool
def search(query: str) -> str:
    """Search for information."""
    return f"Search results for: {query}"


@tool
def calculate(expr: str) -> str:
    """Calculate math expression."""
    return str(eval(expr))


tools = [search, calculate]
tool_map = {t.name: t for t in tools}


@app.post("/agent", response_model=AgentResponse)
@observe(name="api-agent")
async def agent_endpoint(request: AgentRequest) -> AgentResponse:
    llm = ChatAnthropic(model="claude-sonnet-4-6")
    llm_with_tools = llm.bind_tools(tools)

    messages: list[Any] = [HumanMessage(content=request.question)]
    tools_used = []

    for _ in range(5):  # Max iterations
        response = await llm_with_tools.ainvoke(messages)
        messages.append(response)

        if not response.tool_calls:
            return AgentResponse(
                answer=response.content,
                tools_used=tools_used,
            )

        for tc in response.tool_calls:
            result = tool_map[tc["name"]].invoke(tc["args"])
            tools_used.append(tc["name"])
            messages.append(ToolMessage(
                content=result,
                tool_call_id=tc["id"]
            ))

    return AgentResponse(
        answer="Max iterations reached",
        tools_used=tools_used,
    )

Request Tracking

Add request IDs to traces for debugging:
import uuid
from fastapi import Request
from rdk import observe, get_current_trace_id

@app.post("/chat")
@observe(name="api-chat")
async def chat_with_request_id(
    request: Request,
    body: ChatRequest
) -> ChatResponse:
    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
    trace_id = get_current_trace_id()
    # Correlate request_id with trace_id in logs
    ...

Error Handling

RDK captures errors automatically, but you can add context:
from fastapi import HTTPException
from rdk import observe

@app.post("/chat")
@observe(name="api-chat", tags=["chat"])
async def chat_endpoint(request: ChatRequest) -> ChatResponse:
    try:
        # Your logic
        ...
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail="Internal error")

Running the App

# Set environment variables
export RDK_ENDPOINT="https://collector.021labs.ai"
export RDK_API_KEY="your-api-key"
export ANTHROPIC_API_KEY="your-anthropic-key"

# Run with uvicorn
uvicorn app:app --reload

Testing

Disable tracing in tests using test_mode=True or enabled=False:
import pytest
from rdk import init, shutdown

@pytest.fixture(autouse=True)
def rdk_test():
    init(
        endpoint="https://placeholder",
        api_key="test",
        test_mode=True,   # No HTTP calls, tool calls return "MOCKED"
    )
    yield
    shutdown()

Production Considerations

Always call shutdown() in your lifespan handler to flush pending traces before the app exits.
  1. Set appropriate batch size — Higher values reduce network calls
  2. Configure flush interval — Balance latency vs. data freshness
  3. Use PII redaction — Required for compliance
  4. Monitor flush endpoint — Useful for debugging trace delivery

See Also