Skip to main content

Traces and Spans

RDK uses a hierarchical tracing model based on two core concepts:

Traces

A trace represents a complete operation in your application, such as handling a user request or processing a task. Each trace has:
  • A unique ID
  • A name (from your @observe decorator or trace() context manager)
  • A start time and optional end time
  • A collection of spans
  • Optional metadata (tags, user_id, session_id, version)

Spans

A span represents a single operation within a trace, such as an LLM call or tool execution. Each span has:
  • A unique ID
  • A reference to its parent trace
  • A type (LLM, CHAIN, TOOL, FUNCTION)
  • Input and output data
  • Timing information
  • Token usage (for LLM spans)
  • Status (RUNNING, SUCCESS, ERROR)
Trace: "customer-support-chat"
├── Span: anthropic.messages.create (LLM)
├── Span: search_database (TOOL)
├── Span: anthropic.messages.create (LLM)
└── Span: send_email (TOOL)

Span Types

RDK categorizes spans into four types:
TypeDescriptionExamples
LLMLanguage model callsClaude, GPT-4, Gemini
CHAINOrchestration logicLangChain chains, agents
TOOLTool/function executionDatabase queries, API calls
FUNCTIONCustom function spansYour business logic

Auto-instrumentation

When you initialize RDK, it automatically patches supported LLM SDKs to capture calls:
from rdk import init

init(endpoint="...", api_key="...")
# Anthropic, OpenAI, LangChain, Gemini are now instrumented
This means you don’t need to modify your existing LLM calls. RDK intercepts them and creates spans automatically.
BAML is not auto-instrumented. You must call b = instrument_baml(b) after init(). See BAML Integration.

How It Works

  1. RDK patches the SDK’s request methods
  2. When you call an LLM, RDK creates a span before the call
  3. The original call executes normally
  4. RDK captures the response, token usage, and timing
  5. The span is queued for batching
# Your code stays the same
response = client.messages.create(
    model="claude-sonnet-4-6",
    messages=[{"role": "user", "content": "Hello"}]
)

# But RDK has captured:
# - Model and parameters
# - Token usage
# - Duration
# - Status

Creating Traces

from rdk import observe

@observe(name="my-operation", tags=["production"])
def my_function(input: str) -> str:
    # All LLM calls here are grouped into this trace
    ...

With trace() (for scripts and notebooks)

from rdk import trace

with trace("my-operation", tags=["dev"]) as t:
    print(f"Trace ID: {t.id}")
    # All LLM calls here are grouped into this trace

Decorator Options

OptionTypeDefaultDescription
namestrfunction nameName for the trace
trace_idstr or callableauto-generatedCustom trace ID
tagslist[str][]Tags to attach to spans
metadatadict{}Custom metadata
user_idstr or callableNoneUser identifier
session_idstr or callableNoneSession identifier
versionstrNoneVersion string
capture_inputboolFalseStore function args in metadata
capture_outputboolFalseStore return value in metadata

Manual Spans

For custom code that isn’t automatically captured, use span():
from rdk import observe, span
from rdk.models import SpanType

@observe()
def pipeline(query: str):
    with span("vector-search", span_type=SpanType.TOOL, input_data={"query": query}) as s:
        results = vector_db.search(query)
        s.metadata["result_count"] = len(results)
See span() and the Manual Tracing guide.

Batching and Transport

RDK batches spans to reduce network overhead:
Spans Created → Queue → Batch (10 spans) → HTTP POST

              Flush Timer (5s) → HTTP POST
Configure batching with:
init(
    endpoint="...",
    api_key="...",
    batch_size=10,      # Max spans per batch
    flush_interval=5.0  # Max seconds between flushes
)

Thread Safety

RDK uses contextvars for thread-safe trace context:
  • Each thread/task has its own trace context
  • Async operations preserve context correctly
  • You can safely use RDK in multi-threaded applications
import asyncio
from rdk import observe

@observe(name="task-1")
async def task1():
    # Has its own trace
    ...

@observe(name="task-2")
async def task2():
    # Has its own trace
    ...

# Both run with separate traces
await asyncio.gather(task1(), task2())