Skip to content

Python SDK: Usage

Initialize the SDK once at application startup:

import aires

aires.init(
    "my-service",
    "http://localhost:4317",
    environment="production",
    batch_size=256,
    queue_capacity=8192,
    tls=True,
    api_key="sk-aires-xxxx",
)
ParameterTypeRequiredDefaultDescription
servicestrYesService name
endpointstrYesCollector gRPC endpoint
environmentstrNo"production"Environment name
batch_sizeintNo256Events per batch
queue_capacityintNo8192Max buffered events
tlsboolNoTrueEnable TLS
api_keystrNoNoneAPI key

init() can only be called once. Calling it a second time raises RuntimeError("aires already initialized").

Six severity levels are available as module-level functions:

import aires

aires.trace("entering inner loop")
aires.debug("parsed 42 config entries")
aires.info("server listening on :8000")
aires.warn("connection pool at 80%")
aires.error("request handler raised exception")
aires.fatal("database unreachable")

All logging functions accept the message as the first positional argument and optional keyword arguments for context.

Any keyword argument passed to a logging function is processed as follows:

  • Known keys (trace_id, span_id, session_id, user_id, agent_id, category) are mapped to their corresponding event fields
  • All other keys are stored as string attributes
# Known keys are mapped to event fields
aires.info("user logged in",
    trace_id="trace-abc-123",
    span_id="span-001",
    session_id="sess-789",
    user_id="user-42",
    agent_id="planner-v2",
    category="auth",
)

# Unknown keys become attributes
aires.info("order processed",
    order_id="ord-456",
    amount="14999",
    currency="usd",
    payment_method="stripe",
)

All values must be strings. The SDK passes keyword arguments as HashMap<String, String> to the Rust core.

import aires

aires.init("order-service", "http://localhost:4317", environment="production")

# Simple log
aires.info("service started", port="8000", version="1.2.0")

# With tracing context
aires.info("processing order",
    trace_id="trace-abc-123",
    span_id="span-001",
    category="payment",
    order_id="ord-456",
    customer="cust-789",
)

# Error with context
try:
    result = db.execute("SELECT ...")
except Exception as e:
    aires.error(f"database query failed: {e}",
        trace_id="trace-abc-123",
        category="db",
        error_type=type(e).__name__,
        query="SELECT ...",
    )

Record metric values with aires.metric():

aires.metric("http.request.duration_ms", 47.2,
    method="POST",
    path="/api/tasks",
    status="201",
)

aires.metric("db.connections.active", 42.0,
    pool="primary",
)

aires.metric("queue.depth", 1523.0,
    queue="task-processing",
)
ParameterTypeRequiredDescription
namestrYesMetric name (dot-separated)
valuefloatYesNumeric value
**kwargsstrNoAdditional attributes (key-value pairs)
# settings.py
import aires

aires.init(
    "django-app",
    "http://localhost:4317",
    environment="production",
)

# middleware.py
import time
import uuid
import aires

class AiresMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        trace_id = request.headers.get("X-Trace-Id", str(uuid.uuid4()))
        start = time.perf_counter()

        response = self.get_response(request)

        duration_ms = (time.perf_counter() - start) * 1000

        aires.info(f"{request.method} {request.path}",
            trace_id=trace_id,
            category="http",
            method=request.method,
            path=request.path,
            status=str(response.status_code),
            duration_ms=f"{duration_ms:.1f}",
        )

        aires.metric("http.request.duration_ms", duration_ms,
            method=request.method,
            path=request.path,
            status=str(response.status_code),
        )

        return response
import time
import uuid
import aires
from fastapi import FastAPI, Request

app = FastAPI()

aires.init("fastapi-app", "http://localhost:4317", environment="production")

@app.middleware("http")
async def aires_middleware(request: Request, call_next):
    trace_id = request.headers.get("x-trace-id", str(uuid.uuid4()))
    start = time.perf_counter()

    response = await call_next(request)

    duration_ms = (time.perf_counter() - start) * 1000

    aires.info(f"{request.method} {request.url.path}",
        trace_id=trace_id,
        category="http",
        method=request.method,
        path=request.url.path,
        status=str(response.status_code),
        duration_ms=f"{duration_ms:.1f}",
    )

    aires.metric("http.request.duration_ms", duration_ms,
        method=request.method,
        path=request.url.path,
        status=str(response.status_code),
    )

    return response
import time
import uuid
import aires
from flask import Flask, request, g

app = Flask(__name__)

aires.init("flask-app", "http://localhost:4317", environment="production")

@app.before_request
def before_request():
    g.trace_id = request.headers.get("X-Trace-Id", str(uuid.uuid4()))
    g.request_start = time.perf_counter()

@app.after_request
def after_request(response):
    duration_ms = (time.perf_counter() - g.request_start) * 1000

    aires.info(f"{request.method} {request.path}",
        trace_id=g.trace_id,
        category="http",
        method=request.method,
        path=request.path,
        status=str(response.status_code),
        duration_ms=f"{duration_ms:.1f}",
    )

    aires.metric("http.request.duration_ms", duration_ms,
        method=request.method,
        path=request.path,
        status=str(response.status_code),
    )

    return response

The Python SDK uses a OnceLock<Aires> in Rust — the global instance is thread-safe. You can call aires.info(), aires.error(), etc. from any thread (including ThreadPoolExecutor workers) without synchronization.

from concurrent.futures import ThreadPoolExecutor
import aires

aires.init("worker", "http://localhost:4317")

def process_task(task_id):
    aires.info(f"processing task {task_id}", task_id=task_id)
    # ... work ...
    aires.info(f"completed task {task_id}", task_id=task_id)

with ThreadPoolExecutor(max_workers=8) as pool:
    pool.map(process_task, range(100))