OpenTelemetry Sampling [Rust]
What is sampling?
Sampling is a process that restricts the amount of traces that are generated by a system. In high-volume Rust applications, collecting 100% of traces can be expensive and unnecessary. Sampling allows you to collect a representative subset of traces while reducing costs and performance overhead, which is especially important for Rust's zero-cost abstractions philosophy.
Rust sampling
OpenTelemetry Rust SDK provides head-based sampling capabilities where the sampling decision is made at the beginning of a trace. By default, the tracer provider uses a ParentBased sampler with the AlwaysOnSampler. A sampler can be configured when creating the tracer provider.
Built-in samplers
AlwaysOnSampler
Samples every trace. Useful for development environments but be careful in production with significant traffic:
use opentelemetry_sdk::{trace, runtime::Tokio};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("https://api.uptrace.dev:4317")
)
.with_trace_config(
trace::Config::default()
.with_sampler(trace::Sampler::AlwaysOn)
)
.install_batch(Tokio)?;
// All traces will be sampled
run_application().await?;
opentelemetry::global::shutdown_tracer_provider();
Ok(())
}
AlwaysOffSampler
Samples no traces. Useful for completely disabling tracing:
use opentelemetry_sdk::trace;
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
.with_sampler(trace::Sampler::AlwaysOff)
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
TraceIdRatioBasedSampler
Samples a fraction of spans based on the trace ID. The fraction should be between 0.0 and 1.0:
use opentelemetry_sdk::trace;
// Sample 10% of traces
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
.with_sampler(trace::Sampler::TraceIdRatioBased(0.1))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
// Sample 50% of traces
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
.with_sampler(trace::Sampler::TraceIdRatioBased(0.5))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
ParentBasedSampler
A sampler decorator that behaves differently based on the parent of the span. If the span has no parent, the root sampler is used to make the sampling decision:
use opentelemetry_sdk::trace;
// ParentBased with TraceIdRatioBased root sampler
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
.with_sampler(trace::Sampler::ParentBased(Box::new(
trace::Sampler::TraceIdRatioBased(0.1)
)))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
// ParentBased with AlwaysOnSampler root sampler (default behavior)
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
.with_sampler(trace::Sampler::ParentBased(Box::new(
trace::Sampler::AlwaysOn
)))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
Configuration in Rust
Environment variables
You can configure sampling using environment variables:
# TraceIdRatio sampler with 50% sampling
export OTEL_TRACES_SAMPLER="traceidratio"
export OTEL_TRACES_SAMPLER_ARG="0.5"
# ParentBased with TraceIdRatio
export OTEL_TRACES_SAMPLER="parentbased_traceidratio"
export OTEL_TRACES_SAMPLER_ARG="0.1"
# Always sample
export OTEL_TRACES_SAMPLER="always_on"
# Never sample
export OTEL_TRACES_SAMPLER="always_off"
Programmatic config
use opentelemetry::{global, KeyValue};
use opentelemetry_sdk::{trace, Resource, runtime::Tokio};
struct TracingSetup;
impl TracingSetup {
pub async fn configure() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
// Create OTLP exporter
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("https://api.uptrace.dev:4317")
.with_metadata({
let mut metadata = tonic::metadata::MetadataMap::new();
if let Ok(dsn) = std::env::var("UPTRACE_DSN") {
metadata.insert("uptrace-dsn", dsn.parse().unwrap());
}
metadata
});
// Create resource
let resource = Resource::new(vec![
KeyValue::new("service.name", "my-rust-service"),
KeyValue::new("service.version", "1.0.0"),
]);
// Configure sampler based on environment
let sampler = Self::sampler_for_environment(&std::env::var("APP_ENV").unwrap_or_default());
// Create tracer provider
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(
trace::Config::default()
.with_resource(resource)
.with_sampler(sampler)
)
.install_batch(Tokio)?;
global::set_tracer_provider(tracer);
Ok(())
}
fn sampler_for_environment(env: &str) -> trace::Sampler {
match env {
"development" => trace::Sampler::AlwaysOn,
"production" => trace::Sampler::ParentBased(Box::new(
trace::Sampler::TraceIdRatioBased(0.1) // 10% sampling
)),
"testing" => trace::Sampler::AlwaysOff,
_ => trace::Sampler::ParentBased(Box::new(
trace::Sampler::TraceIdRatioBased(0.25) // 25% sampling
)),
}
}
}
// Usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
TracingSetup::configure().await?;
let tracer = global::tracer("my_app");
let span = tracer.span_builder("main-operation").start(&tracer);
let _guard = span.activate();
println!("Application running with configured sampling");
Ok(())
}
Env-based configuration
use opentelemetry_sdk::trace;
pub struct EnvironmentBasedSampling;
impl EnvironmentBasedSampling {
const SAMPLING_CONFIGS: &'static [(&'static str, SamplingConfig)] = &[
("production", SamplingConfig { rate: 0.01, description: "Conservative sampling for production load" }),
("staging", SamplingConfig { rate: 0.1, description: "Moderate sampling for staging validation" }),
("development", SamplingConfig { rate: 1.0, description: "Full sampling for development debugging" }),
("test", SamplingConfig { rate: 0.0, description: "Disabled sampling for test performance" }),
];
pub fn sampler_for_environment(env: &str) -> trace::Sampler {
let config = Self::SAMPLING_CONFIGS
.iter()
.find(|(e, _)| *e == env)
.map(|(_, config)| config)
.unwrap_or(&SamplingConfig { rate: 1.0, description: "Default full sampling" });
println!("Configuring OpenTelemetry sampling for {}: {}", env, config.description);
match config.rate {
rate if rate <= 0.0 => trace::Sampler::AlwaysOff,
rate if rate >= 1.0 => trace::Sampler::AlwaysOn,
rate => trace::Sampler::ParentBased(Box::new(
trace::Sampler::TraceIdRatioBased(rate)
)),
}
}
pub async fn configure_for_application() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let env = std::env::var("RUST_ENV").unwrap_or_else(|_| "development".to_string());
let sampler = Self::sampler_for_environment(&env);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
.with_sampler(sampler)
.with_resource(Resource::new(vec![
KeyValue::new("service.name", "rust-app"),
KeyValue::new("deployment.environment", env),
]))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
opentelemetry::global::set_tracer_provider(tracer);
Ok(())
}
}
struct SamplingConfig {
rate: f64,
description: &'static str,
}
// Usage in main.rs
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
EnvironmentBasedSampling::configure_for_application().await?;
// Your application code
run_app().await?;
Ok(())
}
Custom sampler
You can create custom sampling logic by implementing the Sampler trait. Note: The API has been updated in recent versions:
use opentelemetry::{
trace::{SpanKind, TraceId},
Context, KeyValue,
};
use opentelemetry_sdk::trace::{SamplingDecision, SamplingResult, Sampler};
pub struct CustomSampler {
high_priority_rate: f64,
default_rate: f64,
}
impl CustomSampler {
pub fn new(high_priority_rate: f64, default_rate: f64) -> Self {
Self {
high_priority_rate,
default_rate,
}
}
}
impl Sampler for CustomSampler {
fn should_sample(
&self,
_parent_context: Option<&Context>,
trace_id: TraceId,
_name: &str,
_span_kind: &SpanKind,
attributes: &[KeyValue],
_links: &[opentelemetry::trace::Link],
) -> SamplingResult {
// Sample high-priority operations at higher rate
let rate = if attributes.iter().any(|attr| {
attr.key.as_str() == "priority" && attr.value.as_str() == "high"
}) {
self.high_priority_rate
} else {
self.default_rate
};
// Use trace_id for deterministic sampling
let trace_id_bytes = trace_id.to_bytes();
let trace_id_u64 = u64::from_be_bytes([
trace_id_bytes[0], trace_id_bytes[1], trace_id_bytes[2], trace_id_bytes[3],
trace_id_bytes[4], trace_id_bytes[5], trace_id_bytes[6], trace_id_bytes[7],
]);
let sampling_threshold = (rate * (u64::MAX as f64)) as u64;
let decision = if trace_id_u64 < sampling_threshold {
SamplingDecision::RecordAndSample
} else {
SamplingDecision::Drop
};
SamplingResult {
decision,
attributes: Vec::new(),
trace_state: opentelemetry::trace::TraceState::default(),
}
}
}
// Usage - Note: Custom samplers are now passed directly to Config
async fn setup_custom_sampling() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let custom_sampler = CustomSampler::new(1.0, 0.1); // 100% for high priority, 10% for others
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
.with_sampler(trace::Sampler::Custom(Box::new(custom_sampler)))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
opentelemetry::global::set_tracer_provider(tracer);
Ok(())
}
Rate limit sampler
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
pub struct RateLimitSampler {
max_traces_per_second: u32,
state: Arc<Mutex<RateLimitState>>,
}
struct RateLimitState {
trace_count: u32,
last_reset: Instant,
}
impl RateLimitSampler {
pub fn new(max_traces_per_second: u32) -> Self {
Self {
max_traces_per_second,
state: Arc::new(Mutex::new(RateLimitState {
trace_count: 0,
last_reset: Instant::now(),
})),
}
}
}
impl Sampler for RateLimitSampler {
fn should_sample(
&self,
_parent_context: Option<&Context>,
_trace_id: TraceId,
_name: &str,
_span_kind: &SpanKind,
_attributes: &[KeyValue],
_links: &[opentelemetry::trace::Link],
) -> SamplingResult {
let mut state = self.state.lock().unwrap();
let current_time = Instant::now();
// Reset counter every second
if current_time.duration_since(state.last_reset) >= Duration::from_secs(1) {
state.trace_count = 0;
state.last_reset = current_time;
}
let decision = if state.trace_count < self.max_traces_per_second {
state.trace_count += 1;
SamplingDecision::RecordAndSample
} else {
SamplingDecision::Drop
};
SamplingResult {
decision,
attributes: Vec::new(),
trace_state: opentelemetry::trace::TraceState::default(),
}
}
}
// Usage
async fn setup_rate_limit_sampling() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let rate_limit_sampler = RateLimitSampler::new(50); // Max 50 traces per second
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
.with_sampler(trace::Sampler::Custom(Box::new(rate_limit_sampler)))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
opentelemetry::global::set_tracer_provider(tracer);
Ok(())
}
Debugging sampling
Check sampling
use opentelemetry::{global, trace::TraceContextExt};
async fn log_sampling_decision(span_name: &str) {
let tracer = global::tracer("debug-sampler");
let span = tracer.span_builder(span_name).start(&tracer);
let _guard = span.activate();
let span_context = span.span_context();
if span_context.is_valid() {
println!("Span '{}' - Trace ID: {}", span_name, span_context.trace_id());
println!("Sampled: {}", span_context.trace_flags().is_sampled());
println!("Recording: {}", span.is_recording());
} else {
println!("Span '{}' - Invalid context (likely dropped)", span_name);
}
}
// Test sampling
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
// Setup sampling (e.g., 50% rate)
setup_sampling().await?;
for i in 0..10 {
log_sampling_decision(&format!("test-span-{}", i)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Ok(())
}
Monitor sampling rates
use std::sync::{Arc, Mutex};
use std::time::Instant;
#[derive(Default)]
pub struct SamplingMetrics {
total_spans: u64,
sampled_spans: u64,
start_time: Option<Instant>,
}
impl SamplingMetrics {
pub fn new() -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self {
start_time: Some(Instant::now()),
..Default::default()
}))
}
pub fn record_span(&mut self, is_sampled: bool) {
self.total_spans += 1;
if is_sampled {
self.sampled_spans += 1;
}
}
pub fn sampling_rate(&self) -> f64 {
if self.total_spans == 0 {
0.0
} else {
self.sampled_spans as f64 / self.total_spans as f64
}
}
pub fn stats(&self) -> SamplingStats {
let elapsed = self.start_time
.map(|start| start.elapsed())
.unwrap_or_default();
SamplingStats {
total_spans: self.total_spans,
sampled_spans: self.sampled_spans,
sampling_rate: self.sampling_rate(),
elapsed_seconds: elapsed.as_secs_f64(),
spans_per_second: if elapsed.as_secs_f64() > 0.0 {
self.total_spans as f64 / elapsed.as_secs_f64()
} else {
0.0
},
}
}
}
#[derive(Debug)]
pub struct SamplingStats {
pub total_spans: u64,
pub sampled_spans: u64,
pub sampling_rate: f64,
pub elapsed_seconds: f64,
pub spans_per_second: f64,
}
// Usage
async fn monitor_sampling_example() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let metrics = SamplingMetrics::new();
let tracer = global::tracer("monitored-service");
for i in 0..100 {
let span = tracer.span_builder(&format!("test-span-{}", i)).start(&tracer);
let _guard = span.activate();
let is_sampled = span.span_context().trace_flags().is_sampled();
metrics.lock().unwrap().record_span(is_sampled);
// Simulate work
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
let stats = metrics.lock().unwrap().stats();
println!("Sampling statistics: {:?}", stats);
Ok(())
}
Sampling middleware
use axum::{extract::Request, http::StatusCode, middleware::Next, response::Response};
pub async fn sampling_debug_middleware(request: Request, next: Next) -> Result<Response, StatusCode> {
static METRICS: std::sync::OnceLock<Arc<Mutex<SamplingMetrics>>> = std::sync::OnceLock::new();
let metrics = METRICS.get_or_init(|| SamplingMetrics::new());
// Process request
let response = next.run(request).await;
// Check current span
let current_context = opentelemetry::Context::current();
let span = current_context.span();
if span.span_context().is_valid() {
let is_sampled = span.span_context().trace_flags().is_sampled();
metrics.lock().unwrap().record_span(is_sampled);
tracing::debug!(
trace_id = %span.span_context().trace_id(),
sampled = is_sampled,
recording = span.is_recording(),
"Request sampling info"
);
}
// Log stats periodically
let stats = metrics.lock().unwrap().stats();
if stats.total_spans % 100 == 0 {
tracing::info!("Sampling stats: {:?}", stats);
}
Ok(response)
}
Production considerations
Sampling in microservices
In a microservices architecture, sampling decisions should be made at the root of the trace and propagated to all services. This ensures consistent sampling across the entire distributed trace.
use opentelemetry_sdk::trace;
// Configuration for microservices
pub struct MicroserviceTracingConfig;
impl MicroserviceTracingConfig {
pub async fn setup_for_service(service_name: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
// Root services (entry points) use TraceIdRatioBased
// Downstream services respect parent sampling decision
.with_sampler(trace::Sampler::ParentBased(Box::new(
trace::Sampler::TraceIdRatioBased(0.1) // 10% at root
)))
.with_resource(Resource::new(vec![
KeyValue::new("service.name", service_name.to_string()),
]))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
opentelemetry::global::set_tracer_provider(tracer);
Ok(())
}
}
// Gateway service (entry point)
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let service_name = std::env::var("SERVICE_NAME").unwrap_or_else(|_| "api-gateway".to_string());
MicroserviceTracingConfig::setup_for_service(&service_name).await?;
// Run service
run_service().await?;
Ok(())
}
Performance impact
Sampling reduces the performance overhead of tracing in Rust applications:
- CPU usage: Fewer spans to process and export
- Memory usage: Smaller trace buffers, less heap allocation
- Network usage: Reduced data sent to backend
- Storage costs: Lower storage requirements
Cross-platform memory monitoring
use std::sync::Arc;
pub struct CrossPlatformMemorySampler {
base_rate: f64,
memory_threshold_bytes: u64,
}
impl CrossPlatformMemorySampler {
pub fn new(base_rate: f64, memory_threshold_mb: u64) -> Self {
Self {
base_rate,
memory_threshold_bytes: memory_threshold_mb * 1024 * 1024,
}
}
fn get_memory_usage_bytes(&self) -> u64 {
// Cross-platform memory usage detection
#[cfg(target_os = "linux")]
{
if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
for line in status.lines() {
if line.starts_with("VmRSS:") {
if let Ok(kb) = line
.split_whitespace()
.nth(1)
.and_then(|s| s.parse::<u64>())
{
return kb * 1024; // Convert KB to bytes
}
}
}
}
}
#[cfg(target_os = "macos")]
{
// macOS memory detection using task_info
// This is a simplified fallback
return 1024 * 1024 * 128; // 128MB default
}
#[cfg(target_os = "windows")]
{
// Windows memory detection using GetProcessMemoryInfo
// This is a simplified fallback
return 1024 * 1024 * 128; // 128MB default
}
// Fallback for other platforms
1024 * 1024 * 64 // 64MB default
}
}
impl Sampler for CrossPlatformMemorySampler {
fn should_sample(
&self,
_parent_context: Option<&Context>,
trace_id: TraceId,
_name: &str,
_span_kind: &SpanKind,
_attributes: &[KeyValue],
_links: &[opentelemetry::trace::Link],
) -> SamplingResult {
// Adjust sampling rate based on memory usage
let current_memory = self.get_memory_usage_bytes();
let rate = if current_memory > self.memory_threshold_bytes {
self.base_rate * 0.5 // Reduce sampling when memory is high
} else {
self.base_rate
};
// Use trace_id for deterministic sampling
let trace_id_bytes = trace_id.to_bytes();
let trace_id_u64 = u64::from_be_bytes([
trace_id_bytes[0], trace_id_bytes[1], trace_id_bytes[2], trace_id_bytes[3],
trace_id_bytes[4], trace_id_bytes[5], trace_id_bytes[6], trace_id_bytes[7],
]);
let sampling_threshold = (rate * (u64::MAX as f64)) as u64;
let decision = if trace_id_u64 < sampling_threshold {
SamplingDecision::RecordAndSample
} else {
SamplingDecision::Drop
};
SamplingResult {
decision,
attributes: Vec::new(),
trace_state: opentelemetry::trace::TraceState::default(),
}
}
}
// Usage
async fn setup_memory_aware_sampling() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let memory_sampler = CrossPlatformMemorySampler::new(0.1, 400); // 10% base rate, 400MB threshold
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::Config::default()
.with_sampler(trace::Sampler::Custom(Box::new(memory_sampler)))
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
opentelemetry::global::set_tracer_provider(tracer);
Ok(())
}
Conditional instrumentation
For performance-critical Rust code, combine sampling with conditional compilation:
// Conditional tracing based on sampling and build configuration
macro_rules! traced_operation {
($tracer:expr, $name:expr, $body:expr) => {
#[cfg(feature = "tracing")]
{
let span = $tracer.span_builder($name).start(&$tracer);
let _guard = span.activate();
if span.is_recording() {
// Only add attributes if actually recording
span.add_attribute("rust.optimized", true);
}
$body.await
}
#[cfg(not(feature = "tracing"))]
{
$body.await
}
};
}
// High-performance service with conditional tracing
pub struct HighPerformanceService {
tracer: Box<dyn opentelemetry::trace::Tracer + Send + Sync>,
}
impl HighPerformanceService {
pub fn new() -> Self {
Self {
tracer: opentelemetry::global::tracer("high-perf-service"),
}
}
pub async fn critical_operation(&self, data: &[u8]) -> Result<Vec<u8>, ProcessingError> {
traced_operation!(
&self.tracer,
"critical-operation",
self.process_data_inner(data)
)
}
async fn process_data_inner(&self, data: &[u8]) -> Result<Vec<u8>, ProcessingError> {
// High-performance processing without tracing overhead when disabled
Ok(data.to_vec())
}
}
#[derive(Debug)]
pub struct ProcessingError;