OpenTelemetry Rust Metrics API
This document teaches you how to use the OpenTelemetry Rust Metrics API to measure application performance with metrics. To learn how to install and configure the OpenTelemetry Rust SDK, see Getting started with OpenTelemetry Rust.
If you are not familiar with metrics terminology such as timeseries or additive/synchronous/asynchronous instruments, read the introduction to OpenTelemetry Metrics first.
The OpenTelemetry Rust Metrics API is stable and ready for production use. The core concepts and APIs are well-established.
Prerequisites
Before using the Metrics API, ensure you have the required packages in your Cargo.toml:
[dependencies]
opentelemetry = "0.21"
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio", "metrics"] }
opentelemetry-otlp = { version = "0.14", features = ["metrics"] }
tokio = { version = "1.0", features = ["full"] }
Getting Started
To get started with metrics, you need to create a meter:
use opentelemetry::{global, KeyValue};
let meter = global::meter("my-rust-app");
Using the meter, you can create instruments to measure performance. The simplest Counter instrument looks like this:
use opentelemetry::{global, KeyValue};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let meter = global::meter("my-rust-app");
let counter = meter.u64_counter("requests_total")
.with_description("Total number of requests processed")
.build();
for i in 0..1000 {
counter.add(1, &[
KeyValue::new("status", "success"),
KeyValue::new("method", "GET"),
]);
if i % 10 == 0 {
// Allow metrics to be collected
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
Ok(())
}
Metric Instruments
OpenTelemetry provides several types of instruments to capture different kinds of measurements. Each instrument serves a specific purpose and has distinct characteristics.
Counter
Counter is a synchronous instrument that measures additive non-decreasing values, representing cumulative totals like the number of requests, errors, or completed tasks.
use opentelemetry::{global, KeyValue};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let meter = global::meter("http-server");
let http_requests_counter = meter.u64_counter("http_requests_total")
.with_description("Total number of HTTP requests")
.build();
let error_counter = meter.u64_counter("http_errors_total")
.with_description("Total number of HTTP errors")
.build();
// Handle requests
handle_request("GET", "/api/users", 200, &http_requests_counter, &error_counter).await;
handle_request("POST", "/api/users", 201, &http_requests_counter, &error_counter).await;
handle_request("GET", "/api/users/999", 404, &http_requests_counter, &error_counter).await;
Ok(())
}
async fn handle_request(
method: &str,
endpoint: &str,
status_code: u16,
requests_counter: &opentelemetry::metrics::Counter<u64>,
errors_counter: &opentelemetry::metrics::Counter<u64>,
) {
// Record successful request
requests_counter.add(1, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("endpoint", endpoint.to_string()),
KeyValue::new("status_code", status_code.to_string()),
]);
// Record error if applicable
if status_code >= 400 {
let error_type = if status_code < 500 { "client_error" } else { "server_error" };
errors_counter.add(1, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("endpoint", endpoint.to_string()),
KeyValue::new("error_type", error_type),
]);
}
}
UpDownCounter
UpDownCounter is a synchronous instrument that measures additive values that can both increase and decrease, such as the number of active connections or items in a queue.
use opentelemetry::{global, KeyValue};
use std::sync::Arc;
use tokio::sync::Mutex;
struct ConnectionPool {
active_connections: Arc<opentelemetry::metrics::UpDownCounter<i64>>,
pool_name: String,
}
impl ConnectionPool {
fn new(pool_name: String) -> Self {
let meter = global::meter("connection-pool");
let active_connections = meter.i64_up_down_counter("connections_active")
.with_description("Number of active database connections")
.build();
Self {
active_connections: Arc::new(active_connections),
pool_name,
}
}
async fn acquire_connection<T, F, Fut>(&self, database_name: &str, operation: F) -> T
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
{
// Connection established
self.active_connections.add(1, &[
KeyValue::new("database", database_name.to_string()),
KeyValue::new("pool", self.pool_name.clone()),
]);
let result = operation().await;
// Connection released
self.active_connections.add(-1, &[
KeyValue::new("database", database_name.to_string()),
KeyValue::new("pool", self.pool_name.clone()),
]);
result
}
}
struct TaskQueue {
queue_size: Arc<opentelemetry::metrics::UpDownCounter<i64>>,
queue_name: String,
}
impl TaskQueue {
fn new(queue_name: String) -> Self {
let meter = global::meter("task-queue");
let queue_size = meter.i64_up_down_counter("queue_size")
.with_description("Number of items in the task queue")
.build();
Self {
queue_size: Arc::new(queue_size),
queue_name,
}
}
async fn enqueue(&self, task: Task, priority: &str) {
// Add task to queue (implementation)
self.perform_enqueue(task).await;
self.queue_size.add(1, &[
KeyValue::new("queue", self.queue_name.clone()),
KeyValue::new("priority", priority.to_string()),
]);
}
async fn dequeue(&self, priority: &str) -> Option<Task> {
let task = self.perform_dequeue().await;
if task.is_some() {
self.queue_size.add(-1, &[
KeyValue::new("queue", self.queue_name.clone()),
KeyValue::new("priority", priority.to_string()),
]);
}
task
}
async fn perform_enqueue(&self, _task: Task) {
// Queue implementation
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
async fn perform_dequeue(&self) -> Option<Task> {
// Queue implementation
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
Some(Task { id: 1 })
}
}
// Mock task
struct Task {
id: u64,
}
// Usage example
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = ConnectionPool::new("main".to_string());
pool.acquire_connection("users_db", || async {
// Database operations
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
"Query result"
}).await;
let queue = TaskQueue::new("tasks".to_string());
let task = Task { id: 1 };
queue.enqueue(task, "high").await;
let _processed_task = queue.dequeue("high").await;
Ok(())
}
Histogram
Histogram is a synchronous instrument that measures the statistical distribution of values, such as request latencies or response sizes, grouping them into buckets.
use opentelemetry::{global, KeyValue};
use std::time::Instant;
struct HttpHandler {
request_duration: Arc<opentelemetry::metrics::Histogram<f64>>,
response_size: Arc<opentelemetry::metrics::Histogram<u64>>,
}
impl HttpHandler {
fn new() -> Self {
let meter = global::meter("http-handler");
let request_duration = meter.f64_histogram("request_duration_seconds")
.with_description("HTTP request duration")
.with_unit("s")
.build();
let response_size = meter.u64_histogram("response_size_bytes")
.with_description("HTTP response size")
.with_unit("By")
.build();
Self {
request_duration: Arc::new(request_duration),
response_size: Arc::new(response_size),
}
}
async fn handle_request(&self, method: &str, endpoint: &str) -> Result<HttpResponse, HttpError> {
let start_time = Instant::now();
let result = self.process_request(method, endpoint).await;
// Record metrics
let duration = start_time.elapsed().as_secs_f64();
let (status_code, response_data) = match &result {
Ok(response) => (response.status_code, &response.body),
Err(_) => (500, &String::new()),
};
self.request_duration.record(duration, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("endpoint", endpoint.to_string()),
KeyValue::new("status_code", status_code.to_string()),
]);
self.response_size.record(response_data.len() as u64, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("endpoint", endpoint.to_string()),
KeyValue::new("content_type", "application/json"),
]);
result
}
async fn process_request(&self, _method: &str, _endpoint: &str) -> Result<HttpResponse, HttpError> {
// Simulate request processing
let processing_time = tokio::time::Duration::from_millis(rand::random::<u64>() % 500 + 10);
tokio::time::sleep(processing_time).await;
// Simulate response
let response_data = "x".repeat((rand::random::<usize>() % 5000) + 100);
Ok(HttpResponse {
status_code: 200,
body: response_data,
})
}
}
struct HttpResponse {
status_code: u16,
body: String,
}
#[derive(Debug)]
struct HttpError;
// Usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let handler = HttpHandler::new();
// Process multiple requests
for _ in 0..10 {
let _ = handler.handle_request("GET", "/api/users").await;
let _ = handler.handle_request("POST", "/api/users").await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Ok(())
}
Observable Gauge
Observable Gauge is an asynchronous instrument that measures non-additive values at a point in time, such as CPU usage, memory consumption, or temperature readings.
use opentelemetry::{global, KeyValue};
use std::sync::Arc;
use tokio::sync::Mutex;
struct SystemMetrics {
_gauge_registration: opentelemetry::metrics::ObservableGauge<f64>,
system_stats: Arc<Mutex<SystemStats>>,
}
#[derive(Clone)]
struct SystemStats {
cpu_usage: f64,
memory_usage_mb: f64,
active_threads: f64,
objects_total: f64,
}
impl SystemMetrics {
fn new() -> Self {
let meter = global::meter("system-metrics");
let system_stats = Arc::new(Mutex::new(SystemStats {
cpu_usage: 0.0,
memory_usage_mb: 0.0,
active_threads: 0.0,
objects_total: 0.0,
}));
let stats_for_callback = system_stats.clone();
let system_gauge = meter.f64_observable_gauge("system_resource_usage")
.with_description("System resource utilization")
.with_callback(move |observer| {
// Note: Observable callbacks are sync, so we use try_lock
if let Ok(stats) = stats_for_callback.try_lock() {
observer.observe(stats.cpu_usage, &[
KeyValue::new("resource", "cpu"),
KeyValue::new("unit", "percent"),
]);
observer.observe(stats.memory_usage_mb, &[
KeyValue::new("resource", "memory"),
KeyValue::new("unit", "megabytes"),
]);
observer.observe(stats.active_threads, &[
KeyValue::new("resource", "threads"),
KeyValue::new("unit", "count"),
]);
observer.observe(stats.objects_total, &[
KeyValue::new("resource", "objects"),
KeyValue::new("unit", "count"),
]);
}
})
.build();
Self {
_gauge_registration: system_gauge,
system_stats,
}
}
async fn update_stats(&self) {
let mut stats = self.system_stats.lock().await;
// Simulate reading system metrics
stats.cpu_usage = rand::random::<f64>() * 100.0;
stats.memory_usage_mb = 512.0 + rand::random::<f64>() * 1024.0;
stats.active_threads = (rand::random::<u8>() % 50 + 10) as f64;
stats.objects_total = (rand::random::<u32>() % 10000 + 5000) as f64;
}
}
// Usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let system_metrics = SystemMetrics::new();
// Simulate system running and updating metrics
for _ in 0..10 {
system_metrics.update_stats().await;
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
Ok(())
}
Observable Counter
Observable Counter is an asynchronous instrument that measures monotonically increasing values, such as total bytes read or CPU time consumed.
use opentelemetry::{global, KeyValue};
use std::sync::Arc;
use tokio::sync::Mutex;
struct ProcessMetrics {
_counter_registration: opentelemetry::metrics::ObservableCounter<u64>,
process_stats: Arc<Mutex<ProcessStats>>,
}
#[derive(Clone)]
struct ProcessStats {
user_cpu_time: u64,
system_cpu_time: u64,
gc_runs: u64,
allocated_objects: u64,
loaded_features: u64,
}
impl ProcessMetrics {
fn new() -> Self {
let meter = global::meter("process-metrics");
let process_stats = Arc::new(Mutex::new(ProcessStats {
user_cpu_time: 0,
system_cpu_time: 0,
gc_runs: 0,
allocated_objects: 0,
loaded_features: 100, // Initial libraries loaded
}));
let stats_for_callback = process_stats.clone();
let process_counter = meter.u64_observable_counter("process_resource_usage")
.with_description("Process resource usage counters")
.with_callback(move |observer| {
if let Ok(stats) = stats_for_callback.try_lock() {
observer.observe(stats.user_cpu_time, &[
KeyValue::new("cpu_type", "user"),
KeyValue::new("unit", "microseconds"),
]);
observer.observe(stats.system_cpu_time, &[
KeyValue::new("cpu_type", "system"),
KeyValue::new("unit", "microseconds"),
]);
observer.observe(stats.gc_runs, &[
KeyValue::new("resource", "gc_runs"),
KeyValue::new("unit", "count"),
]);
observer.observe(stats.allocated_objects, &[
KeyValue::new("resource", "allocated_objects"),
KeyValue::new("unit", "count"),
]);
observer.observe(stats.loaded_features, &[
KeyValue::new("resource", "loaded_features"),
KeyValue::new("unit", "count"),
]);
}
})
.build();
Self {
_counter_registration: process_counter,
process_stats,
}
}
async fn update_stats(&self) {
let mut stats = self.process_stats.lock().await;
// Simulate accumulating process metrics
stats.user_cpu_time += rand::random::<u64>() % 1000 + 100;
stats.system_cpu_time += rand::random::<u64>() % 500 + 50;
stats.gc_runs += if rand::random::<u8>() % 10 == 0 { 1 } else { 0 };
stats.allocated_objects += rand::random::<u64>() % 10000 + 1000;
stats.loaded_features += if rand::random::<u8>() % 20 == 0 { 1 } else { 0 };
}
}
// Usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let process_metrics = ProcessMetrics::new();
// Simulate process running and accumulating metrics
for _ in 0..10 {
process_metrics.update_stats().await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Ok(())
}
Observable UpDownCounter
Observable UpDownCounter is an asynchronous instrument that measures additive values that can increase or decrease, measured at observation time.
use opentelemetry::{global, KeyValue};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
struct QueueMetrics {
_gauge_registration: opentelemetry::metrics::ObservableUpDownCounter<i64>,
queue_stats: Arc<Mutex<HashMap<String, QueueStats>>>,
}
#[derive(Clone)]
struct QueueStats {
size: i64,
workers: i64,
}
impl QueueMetrics {
fn new() -> Self {
let meter = global::meter("queue-metrics");
let queue_stats = Arc::new(Mutex::new(HashMap::from([
("email".to_string(), QueueStats { size: 0, workers: 0 }),
("sms".to_string(), QueueStats { size: 0, workers: 0 }),
("push".to_string(), QueueStats { size: 0, workers: 0 }),
])));
let stats_for_callback = queue_stats.clone();
let queue_gauge = meter.i64_observable_up_down_counter("message_queue_status")
.with_description("Message queue status metrics")
.with_callback(move |observer| {
if let Ok(stats) = stats_for_callback.try_lock() {
for (queue_name, queue_stats) in stats.iter() {
observer.observe(queue_stats.size, &[
KeyValue::new("queue", queue_name.clone()),
KeyValue::new("metric", "size"),
]);
observer.observe(queue_stats.workers, &[
KeyValue::new("queue", queue_name.clone()),
KeyValue::new("metric", "workers"),
]);
}
}
})
.build();
Self {
_gauge_registration: queue_gauge,
queue_stats,
}
}
async fn update_queue_size(&self, queue_name: &str, change: i64) {
let mut stats = self.queue_stats.lock().await;
if let Some(queue_stats) = stats.get_mut(queue_name) {
queue_stats.size += change;
}
}
async fn update_workers(&self, queue_name: &str, change: i64) {
let mut stats = self.queue_stats.lock().await;
if let Some(queue_stats) = stats.get_mut(queue_name) {
queue_stats.workers += change;
}
}
}
// Usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let queue_metrics = QueueMetrics::new();
// Simulate queue operations
queue_metrics.update_queue_size("email", 5).await;
queue_metrics.update_workers("email", 2).await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
queue_metrics.update_queue_size("email", -2).await;
queue_metrics.update_workers("sms", 1).await;
// Allow metrics to be collected
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
Ok(())
}
Working with Attributes
Attributes provide contextual information that makes metrics more useful for analysis and filtering.
Adding Attributes to Measurements
use opentelemetry::{global, KeyValue};
struct ApiHandler {
api_requests: Arc<opentelemetry::metrics::Counter<u64>>,
request_duration: Arc<opentelemetry::metrics::Histogram<f64>>,
}
impl ApiHandler {
fn new() -> Self {
let meter = global::meter("api-handler");
let api_requests = meter.u64_counter("api_requests_total")
.with_description("Total API requests")
.build();
let request_duration = meter.f64_histogram("request_duration_seconds")
.with_description("Request duration")
.build();
Self {
api_requests: Arc::new(api_requests),
request_duration: Arc::new(request_duration),
}
}
async fn handle_request(
&self,
method: &str,
endpoint: &str,
user_type: &str,
region: &str,
) -> Result<ApiResponse, ApiError> {
let start_time = std::time::Instant::now();
let result = self.process_request(method, endpoint).await;
let status_code = match &result {
Ok(_) => "200",
Err(_) => "500",
};
// Record successful request with detailed attributes
self.api_requests.add(1, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("endpoint", endpoint.to_string()),
KeyValue::new("status_code", status_code),
KeyValue::new("user_type", user_type.to_string()),
KeyValue::new("region", region.to_string()),
KeyValue::new("cache_hit", "false"),
]);
// Record duration
let duration = start_time.elapsed().as_secs_f64();
self.request_duration.record(duration, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("endpoint", endpoint.to_string()),
KeyValue::new("status_code", status_code),
]);
result
}
async fn process_request(&self, _method: &str, _endpoint: &str) -> Result<ApiResponse, ApiError> {
// Simulate request processing
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
Ok(ApiResponse { message: "Success".to_string() })
}
}
struct ApiResponse {
message: String,
}
#[derive(Debug)]
struct ApiError;
// Example usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let handler = ApiHandler::new();
let _ = handler.handle_request("GET", "/api/users", "premium", "us-east-1").await;
let _ = handler.handle_request("POST", "/api/orders", "free", "eu-west-1").await;
Ok(())
}
Attribute Best Practices
Use meaningful attributes that provide valuable differentiation without creating excessive cardinality:
use opentelemetry::{global, KeyValue};
// Good: Low cardinality attributes
fn record_request_good(method: &str, status_code: u16, endpoint_category: &str) {
let meter = global::meter("http-server");
let http_requests = meter.u64_counter("http_requests_total").build();
http_requests.add(1, &[
KeyValue::new("method", method.to_string()), // Limited values: GET, POST, PUT, DELETE
KeyValue::new("status_class", format!("{}xx", status_code / 100)), // Grouped: 2xx, 3xx, 4xx, 5xx
KeyValue::new("endpoint_category", endpoint_category.to_string()), // Grouped: api, static, health
]);
}
fn categorize_endpoint(endpoint: &str) -> &'static str {
match endpoint {
path if path.starts_with("/api/") => "api",
path if path.starts_with("/static/") => "static",
"/health" => "health",
path if path.starts_with("/admin/") => "admin",
_ => "other",
}
}
// Example usage
fn example_usage() {
record_request_good("GET", 200, "api");
record_request_good("POST", 201, "api");
}
// Avoid: High cardinality attributes
fn record_request_bad(method: &str, status_code: u16, full_url: &str, user_id: &str) {
let meter = global::meter("http-server");
let http_requests = meter.u64_counter("http_requests_total").build();
// DON'T DO THIS - creates too many unique metric series
http_requests.add(1, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("status_code", status_code.to_string()), // 50+ possible values
KeyValue::new("full_url", full_url.to_string()), // Thousands of unique URLs
KeyValue::new("user_id", user_id.to_string()), // Thousands of users
]);
// This could create millions of unique metric series!
}
Practical Examples
HTTP Server Metrics
use opentelemetry::{global, KeyValue};
use std::sync::Arc;
use std::time::Instant;
pub struct HttpServerMetrics {
request_counter: Arc<opentelemetry::metrics::Counter<u64>>,
request_duration: Arc<opentelemetry::metrics::Histogram<f64>>,
active_requests: Arc<opentelemetry::metrics::UpDownCounter<i64>>,
response_size: Arc<opentelemetry::metrics::Histogram<u64>>,
error_counter: Arc<opentelemetry::metrics::Counter<u64>>,
}
impl HttpServerMetrics {
pub fn new() -> Self {
let meter = global::meter("http-server");
Self {
request_counter: Arc::new(meter.u64_counter("http_requests_total")
.with_description("Total HTTP requests")
.build()),
request_duration: Arc::new(meter.f64_histogram("http_request_duration_seconds")
.with_description("HTTP request duration")
.with_unit("s")
.build()),
active_requests: Arc::new(meter.i64_up_down_counter("http_requests_active")
.with_description("Active HTTP requests")
.build()),
response_size: Arc::new(meter.u64_histogram("http_response_size_bytes")
.with_description("HTTP response size")
.with_unit("By")
.build()),
error_counter: Arc::new(meter.u64_counter("http_errors_total")
.with_description("Total HTTP errors")
.build()),
}
}
pub fn record_request(&self, method: &str, route: &str, status_code: u16, duration: f64, response_size: u64) {
let attributes = [
KeyValue::new("method", method.to_string()),
KeyValue::new("route", route.to_string()),
KeyValue::new("status_code", status_code.to_string()),
];
// Record request count
self.request_counter.add(1, &attributes);
// Record duration
self.request_duration.record(duration, &attributes);
// Record response size
self.response_size.record(response_size, &attributes);
// Record errors
if status_code >= 400 {
let error_type = if status_code < 500 { "client_error" } else { "server_error" };
self.error_counter.add(1, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("route", route.to_string()),
KeyValue::new("error_type", error_type),
]);
}
}
pub fn start_request(&self, method: &str, route: &str) {
self.active_requests.add(1, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("route", route.to_string()),
]);
}
pub fn end_request(&self, method: &str, route: &str) {
self.active_requests.add(-1, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("route", route.to_string()),
]);
}
}
// Usage example
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let metrics = HttpServerMetrics::new();
// Simulate handling requests
for i in 0..10 {
let method = if i % 2 == 0 { "GET" } else { "POST" };
let route = "/api/users";
metrics.start_request(method, route);
let start = Instant::now();
// Simulate request processing
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let duration = start.elapsed().as_secs_f64();
let status_code = if i == 7 { 404 } else { 200 };
let response_size = 1024;
metrics.record_request(method, route, status_code, duration, response_size);
metrics.end_request(method, route);
}
Ok(())
}
Business Metrics
use opentelemetry::{global, KeyValue};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct BusinessMetrics {
user_registrations: Arc<opentelemetry::metrics::Counter<u64>>,
order_value: Arc<opentelemetry::metrics::Histogram<f64>>,
_subscription_gauge: opentelemetry::metrics::ObservableUpDownCounter<i64>,
_revenue_gauge: opentelemetry::metrics::ObservableGauge<f64>,
subscription_counts: Arc<Mutex<HashMap<String, i64>>>,
revenue_data: Arc<Mutex<HashMap<String, f64>>>,
}
impl BusinessMetrics {
pub fn new() -> Self {
let meter = global::meter("business");
let subscription_counts = Arc::new(Mutex::new(HashMap::from([
("basic".to_string(), 1250),
("premium".to_string(), 340),
("enterprise".to_string(), 45),
])));
let revenue_data = Arc::new(Mutex::new(HashMap::from([
("monthly_recurring".to_string(), 50000.0),
("one_time".to_string(), 15000.0),
("total".to_string(), 65000.0),
])));
let sub_counts_for_callback = subscription_counts.clone();
let subscription_gauge = meter.i64_observable_up_down_counter("subscriptions_active")
.with_description("Active subscriptions by plan")
.with_callback(move |observer| {
if let Ok(counts) = sub_counts_for_callback.try_lock() {
for (plan, count) in counts.iter() {
observer.observe(*count, &[KeyValue::new("plan", plan.clone())]);
}
}
})
.build();
let revenue_for_callback = revenue_data.clone();
let revenue_gauge = meter.f64_observable_gauge("revenue_metrics")
.with_description("Revenue metrics")
.with_unit("USD")
.with_callback(move |observer| {
if let Ok(revenue) = revenue_for_callback.try_lock() {
for (revenue_type, amount) in revenue.iter() {
observer.observe(*amount, &[KeyValue::new("type", revenue_type.clone())]);
}
}
})
.build();
Self {
user_registrations: Arc::new(meter.u64_counter("user_registrations_total")
.with_description("Total user registrations")
.build()),
order_value: Arc::new(meter.f64_histogram("order_value_usd")
.with_description("Order value in USD")
.with_unit("USD")
.build()),
_subscription_gauge: subscription_gauge,
_revenue_gauge: revenue_gauge,
subscription_counts,
revenue_data,
}
}
pub async fn record_user_registration(&self, source: &str, plan: &str) {
self.user_registrations.add(1, &[
KeyValue::new("source", source.to_string()),
KeyValue::new("plan", plan.to_string()),
KeyValue::new("hour", chrono::Utc::now().hour().to_string()),
]);
// Update subscription count
let mut counts = self.subscription_counts.lock().await;
*counts.entry(plan.to_string()).or_insert(0) += 1;
}
pub async fn record_order(&self, value: f64, currency: &str, category: &str) {
// Convert to USD for consistent reporting
let usd_value = self.convert_to_usd(value, currency);
self.order_value.record(usd_value, &[
KeyValue::new("category", category.to_string()),
KeyValue::new("currency", currency.to_string()),
KeyValue::new("value_range", self.get_value_range(usd_value)),
]);
// Update revenue
let mut revenue = self.revenue_data.lock().await;
*revenue.entry("one_time".to_string()).or_insert(0.0) += usd_value;
*revenue.entry("total".to_string()).or_insert(0.0) += usd_value;
}
fn convert_to_usd(&self, value: f64, currency: &str) -> f64 {
let rates = HashMap::from([
("USD".to_string(), 1.0),
("EUR".to_string(), 1.1),
("GBP".to_string(), 1.3),
("JPY".to_string(), 0.007),
]);
value * rates.get(currency).unwrap_or(&1.0)
}
fn get_value_range(&self, value: f64) -> String {
match value {
v if v < 10.0 => "small".to_string(),
v if v < 100.0 => "medium".to_string(),
v if v < 1000.0 => "large".to_string(),
_ => "premium".to_string(),
}
}
}
// Usage example
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let business_metrics = BusinessMetrics::new();
// Record business events
business_metrics.record_user_registration("google_ads", "premium").await;
business_metrics.record_order(99.99, "USD", "electronics").await;
business_metrics.record_order(49.99, "EUR", "books").await;
business_metrics.record_order(199.99, "USD", "clothing").await;
// Allow metrics to be collected
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Ok(())
}
Configuration and Performance
Environment Variables
Configure metrics behavior using environment variables:
# Metric export settings
export OTEL_METRICS_EXPORTER=otlp
export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=https://api.uptrace.dev/v1/metrics
export OTEL_EXPORTER_OTLP_METRICS_HEADERS="uptrace-dsn=YOUR_DSN"
# Collection interval (milliseconds)
export OTEL_METRIC_EXPORT_INTERVAL=60000
# Resource attributes
export OTEL_RESOURCE_ATTRIBUTES="service.name=my-service,service.version=1.0.0"
Memory Management
For long-running Rust applications, consider memory usage:
use opentelemetry::{global, KeyValue};
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct MemoryAwareMetrics {
_memory_gauge: opentelemetry::metrics::ObservableGauge<u64>,
memory_stats: Arc<Mutex<MemoryStats>>,
}
#[derive(Default)]
struct MemoryStats {
heap_allocated: u64,
heap_free: u64,
rss: u64,
}
impl MemoryAwareMetrics {
pub fn new() -> Self {
let meter = global::meter("memory-metrics");
let memory_stats = Arc::new(Mutex::new(MemoryStats::default()));
let stats_for_callback = memory_stats.clone();
let memory_gauge = meter.u64_observable_gauge("rust_memory_usage")
.with_description("Rust memory usage metrics")
.with_unit("By")
.with_callback(move |observer| {
if let Ok(mut stats) = stats_for_callback.try_lock() {
// Update memory stats
stats.heap_allocated = Self::get_heap_allocated();
stats.rss = Self::get_rss_memory();
observer.observe(stats.heap_allocated, &[KeyValue::new("type", "heap_allocated")]);
observer.observe(stats.rss, &[KeyValue::new("type", "rss")]);
}
})
.build();
Self {
_memory_gauge: memory_gauge,
memory_stats,
}
}
fn get_heap_allocated() -> u64 {
// Simulate heap memory calculation
// In real implementation, you might use jemalloc stats or similar
1024 * 1024 * 64 // 64MB
}
fn get_rss_memory() -> u64 {
// Simulate RSS memory reading
// In real implementation, you might read /proc/self/status
1024 * 1024 * 128 // 128MB
}
}
Attribute Optimization
Optimize attribute usage to prevent cardinality explosion:
use opentelemetry::{global, KeyValue};
pub struct OptimizedMetrics {
http_requests: Arc<opentelemetry::metrics::Counter<u64>>,
}
impl OptimizedMetrics {
pub fn new() -> Self {
let meter = global::meter("optimized");
Self {
http_requests: Arc::new(meter.u64_counter("http_requests_total").build()),
}
}
pub fn record_request(&self, method: &str, status_code: u16, endpoint: &str) {
// Use status classes instead of exact codes
let status_class = format!("{}xx", status_code / 100);
// Categorize endpoints to reduce cardinality
let endpoint_category = self.categorize_endpoint(endpoint);
self.http_requests.add(1, &[
KeyValue::new("method", method.to_string()), // ~10 possible values
KeyValue::new("status_class", status_class), // 5 possible values (2xx, 3xx, 4xx, 5xx)
KeyValue::new("endpoint_category", endpoint_category), // ~5 categories
]);
// Total cardinality: 10 × 5 × 5 = 250 series
}
fn categorize_endpoint(&self, endpoint: &str) -> String {
match endpoint {
path if path.starts_with("/api/") => "api".to_string(),
path if path.starts_with("/static/") => "static".to_string(),
"/health" => "health".to_string(),
path if path.starts_with("/admin/") => "admin".to_string(),
_ => "other".to_string(),
}
}
}
Best Practices
Instrument Naming
Follow OpenTelemetry naming conventions:
// Good: Descriptive, hierarchical names
let meter = global::meter("my-app");
meter.u64_counter("http.requests.total").build();
meter.f64_histogram("http.request.duration").build();
meter.f64_observable_gauge("system.memory.usage").build();
// Avoid: Generic or unclear names
// meter.u64_counter("requests").build();
// meter.f64_histogram("time").build();
// meter.f64_observable_gauge("memory").build();
Unit Specification
Always specify appropriate units:
let meter = global::meter("my-app");
meter.f64_histogram("request.duration")
.with_unit("s") // seconds
.build();
meter.u64_observable_gauge("memory.usage")
.with_unit("By") // bytes
.build();
meter.u64_counter("requests.total")
.with_unit("1") // dimensionless
.build();
meter.f64_observable_gauge("temperature")
.with_unit("Cel") // Celsius
.build();
Error Handling
Handle metric recording errors gracefully:
use opentelemetry::{global, KeyValue};
pub struct SafeMetrics {
counter: Arc<opentelemetry::metrics::Counter<u64>>,
}
impl SafeMetrics {
pub fn new() -> Self {
let meter = global::meter("safe-metrics");
Self {
counter: Arc::new(meter.u64_counter("safe_requests_total").build()),
}
}
pub fn safe_record_metric(&self, value: u64, attributes: &[KeyValue]) {
// Metrics recording in Rust is generally safe, but you can add
// additional validation or error handling as needed
self.counter.add(value, attributes);
// Log if needed for debugging
tracing::debug!("Recorded metric: {} with {} attributes", value, attributes.len());
}
}
Testing Metrics
Use dependency injection for testing:
use opentelemetry::{global, KeyValue};
pub trait MetricsProvider {
fn record_request(&self, method: &str, status_code: u16);
}
pub struct ProductionMetrics {
counter: Arc<opentelemetry::metrics::Counter<u64>>,
}
impl ProductionMetrics {
pub fn new() -> Self {
let meter = global::meter("production");
Self {
counter: Arc::new(meter.u64_counter("requests_total").build()),
}
}
}
impl MetricsProvider for ProductionMetrics {
fn record_request(&self, method: &str, status_code: u16) {
self.counter.add(1, &[
KeyValue::new("method", method.to_string()),
KeyValue::new("status_code", status_code.to_string()),
]);
}
}
pub struct TestMetrics {
pub recorded_calls: Arc<std::sync::Mutex<Vec<(String, u16)>>>,
}
impl TestMetrics {
pub fn new() -> Self {
Self {
recorded_calls: Arc::new(std::sync::Mutex::new(Vec::new())),
}
}
}
impl MetricsProvider for TestMetrics {
fn record_request(&self, method: &str, status_code: u16) {
let mut calls = self.recorded_calls.lock().unwrap();
calls.push((method.to_string(), status_code));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_recording() {
let test_metrics = TestMetrics::new();
test_metrics.record_request("GET", 200);
test_metrics.record_request("POST", 201);
let calls = test_metrics.recorded_calls.lock().unwrap();
assert_eq!(calls.len(), 2);
assert_eq!(calls[0], ("GET".to_string(), 200));
assert_eq!(calls[1], ("POST".to_string(), 201));
}
}
OpenTelemetry APM
Uptrace is a DataDog alternative that supports distributed tracing, metrics, and logs. You can use it to monitor applications and troubleshoot issues.

Uptrace comes with an intuitive query builder, rich dashboards, alerting rules with notifications, and integrations for most languages and frameworks.
Uptrace can process billions of spans and metrics on a single server and allows you to monitor your applications at 10x lower cost.
In just a few minutes, you can try Uptrace by visiting the cloud demo (no login required) or running it locally with Docker. The source code is available on GitHub.