Skip to content

Collector: Configuration

The collector is configured entirely through environment variables.

VariableDefaultDescription
GRPC_PORT4317Port for the gRPC server. The collector listens on all interfaces (0.0.0.0).
VariableDefaultDescription
CLICKHOUSE_HOSTlocalhostClickHouse server hostname or IP.
CLICKHOUSE_PORT8123ClickHouse HTTP interface port. The collector uses the HTTP protocol (not native TCP 9000).
CLICKHOUSE_DATABASEairesDatabase name. Must exist before the collector starts.
CLICKHOUSE_USERdefaultClickHouse username.
CLICKHOUSE_PASSWORD"" (empty)ClickHouse password.
# Local development
export GRPC_PORT=4317
export CLICKHOUSE_HOST=localhost
export CLICKHOUSE_PORT=8123
export CLICKHOUSE_DATABASE=aires
export CLICKHOUSE_USER=default
export CLICKHOUSE_PASSWORD=""

# Start the collector
mix run --no-halt
# Production (Docker)
docker run -d \
  -p 4317:4317 \
  -e GRPC_PORT=4317 \
  -e CLICKHOUSE_HOST=clickhouse.prod.internal \
  -e CLICKHOUSE_PORT=8123 \
  -e CLICKHOUSE_DATABASE=aires \
  -e CLICKHOUSE_USER=aires_writer \
  -e CLICKHOUSE_PASSWORD=secretpassword \
  ghcr.io/secondary-inc/aires-collector:latest

The Broadway pipeline controls how events are batched before insertion into ClickHouse. The default configuration is in AiresCollector.Pipeline:

batchers: [
  clickhouse: [
    concurrency: 4,
    batch_size: 1000,
    batch_timeout: 500
  ]
]
ParameterDefaultDescription
Processor concurrencySystem.schedulers_online()Number of concurrent processors. Defaults to the number of CPU cores. Each processor transforms events from Proto to row format.
Batcher concurrency4Number of concurrent batcher processes. Each batcher accumulates rows and triggers ClickHouse inserts. More batchers = more parallel inserts.
Batch size1000Maximum rows per ClickHouse INSERT. When the batcher accumulates this many rows, it flushes immediately.
Batch timeout500 msMaximum time to wait before flushing a partial batch. Even if batch_size hasn’t been reached, a flush happens after this interval.

For collectors handling > 100K events/sec:

# In pipeline.ex
batchers: [
  clickhouse: [
    concurrency: 8,        # more parallel inserts
    batch_size: 5000,       # larger batches = fewer INSERT statements
    batch_timeout: 200      # flush sooner under high load
  ]
]

Larger batch sizes are more efficient for ClickHouse (fewer INSERT statements, better compression), but increase latency before events become queryable.

For real-time dashboards where you need events queryable within seconds:

batchers: [
  clickhouse: [
    concurrency: 4,
    batch_size: 100,        # smaller batches
    batch_timeout: 100      # flush every 100ms
  ]
]

Smaller batches and shorter timeouts reduce the time between event ingestion and queryability, at the cost of more frequent (and less efficient) ClickHouse inserts.

The AiresCollector.Store GenServer maintains a single connection to ClickHouse via the ch library. For high-throughput deployments, you may want to increase the connection pool:

The Ch library supports connection pooling via db_connection. To configure pool size, modify the Store initialization:

# In store.ex
{:ok, conn} =
  Ch.start_link(
    hostname: System.get_env("CLICKHOUSE_HOST", "localhost"),
    port: String.to_integer(System.get_env("CLICKHOUSE_PORT", "8123")),
    database: System.get_env("CLICKHOUSE_DATABASE", "aires"),
    username: System.get_env("CLICKHOUSE_USER", "default"),
    password: System.get_env("CLICKHOUSE_PASSWORD", ""),
    pool_size: 10  # number of connections in the pool
  )
DeploymentPool SizeNotes
Development1-2Default is sufficient
Staging5Moderate load
Production (single collector)10-20Match batcher concurrency
Production (multiple collectors)5-10 per instanceAvoid overwhelming ClickHouse

The gRPC server is configured through the GRPC.Server.Supervisor:

# In application.ex
{GRPC.Server.Supervisor,
 endpoint: AiresCollector.Endpoint,
 port: port(),
 start_server: true}

The server uses grpc-elixir with these defaults:

  • Maximum message size: 4MB (Protobuf default)
  • Keepalive: enabled
  • Interceptors: GRPC.Server.Interceptors.Logger (logs every RPC call)

In high-throughput environments, the gRPC logger interceptor can be noisy. Remove it from the endpoint:

# In endpoint.ex
defmodule AiresCollector.Endpoint do
  use GRPC.Endpoint

  # Remove or comment out for production:
  # intercept(GRPC.Server.Interceptors.Logger)

  run(AiresCollector.Server)
end

For production deployments, tune the Erlang VM:

# Set in environment or release config
export ERL_AFLAGS="+P 1000000"         # max processes (default: 262144)
export ERL_AFLAGS="$ERL_AFLAGS +Q 65536"  # max ports
export ERL_AFLAGS="$ERL_AFLAGS +S 8:8"    # schedulers (match CPU cores)
export ERL_AFLAGS="$ERL_AFLAGS +sbwt very_short"  # scheduler busy-wait threshold

The collector exposes telemetry events that can be consumed by Prometheus, StatsD, or any :telemetry-compatible backend:

EventMeasurementsMetadata
[:aires, :ingest, :start]%{batch_size: n}
[:aires, :ingest, :stop]%{duration: ns}%{accepted: n, rejected: n}
[:aires, :insert, :start]%{row_count: n}
[:aires, :insert, :stop]%{duration: ns}%{row_count: n}
[:aires, :insert, :exception]%{duration: ns}%{reason: term}

Example telemetry handler:

:telemetry.attach("aires-logger", [:aires, :insert, :stop], fn
  _event, %{duration: duration}, %{row_count: count}, _config ->
    ms = System.convert_time_unit(duration, :native, :millisecond)
    Logger.info("inserted #{count} rows in #{ms}ms")
end, nil)