OpenTelemetry Python Metrics API
This document teaches you how to use the OpenTelemetry Python Metrics API to measure application performance with metrics. To learn how to install and configure the OpenTelemetry Python SDK, see Getting started with OpenTelemetry Python.
If you are not familiar with metrics terminology such as timeseries or additive/synchronous/asynchronous instruments, read the introduction to OpenTelemetry Metrics first.
Prerequisites
Before using the Metrics API, ensure you have the required packages installed:
pip install opentelemetry-api opentelemetry-sdk
Getting Started
To get started with metrics, you need to create a meter:
from opentelemetry import metrics
meter = metrics.get_meter("app_or_package_name", "1.0.0")
Using the meter, you can create instruments to measure performance. The simplest Counter instrument looks like this:
counter = meter.create_counter(
name="requests_total",
description="Total number of requests processed",
unit="1",
)
for i in range(1000):
counter.add(1, {"status": "success", "method": "GET"})
if i % 10 == 0:
# Force collection for demonstration
time.sleep(0.1)
Metric Instruments
OpenTelemetry provides several types of instruments to capture different kinds of measurements. Each instrument serves a specific purpose and has distinct characteristics.
Counter
Counter is a synchronous instrument that measures additive non-decreasing values, representing cumulative totals like the number of requests, errors, or completed tasks.
import time
from opentelemetry import metrics
meter = metrics.get_meter("app_or_package_name", "1.0.0")
http_requests_counter = meter.create_counter(
name="http_requests_total",
description="Total number of HTTP requests",
unit="1"
)
error_counter = meter.create_counter(
name="http_errors_total",
description="Total number of HTTP errors",
unit="1"
)
def handle_request(method, endpoint, status_code):
# Record successful request
http_requests_counter.add(1, {
"method": method,
"endpoint": endpoint,
"status_code": str(status_code)
})
# Record error if applicable
if status_code >= 400:
error_counter.add(1, {
"method": method,
"endpoint": endpoint,
"error_type": "client_error" if status_code < 500 else "server_error"
})
# Example usage
handle_request("GET", "/api/users", 200)
handle_request("POST", "/api/users", 201)
handle_request("GET", "/api/users/999", 404)
UpDownCounter
UpDownCounter is a synchronous instrument that measures additive values that can both increase and decrease, such as the number of active connections or items in a queue.
import random
import time
active_connections = meter.create_up_down_counter(
name="database_connections_active",
description="Number of active database connections",
unit="1"
)
queue_size = meter.create_up_down_counter(
name="task_queue_size",
description="Number of items in the task queue",
unit="1"
)
def simulate_connections():
"""Simulate database connection management"""
for i in range(20):
# Connection established
active_connections.add(1, {"database": "users", "pool": "main"})
# Simulate work
time.sleep(0.1)
# Connection closed
active_connections.add(-1, {"database": "users", "pool": "main"})
def simulate_queue_operations():
"""Simulate queue operations"""
while True:
# Add items to queue
items_added = random.randint(1, 5)
queue_size.add(items_added, {"queue": "email", "priority": "high"})
# Process items from queue
items_processed = random.randint(1, 3)
queue_size.add(-items_processed, {"queue": "email", "priority": "high"})
time.sleep(1)
Histogram
Histogram is a synchronous instrument that measures the statistical distribution of values, such as request latencies or response sizes, grouping them into buckets.
import time
import random
request_duration = meter.create_histogram(
name="http_request_duration_seconds",
description="HTTP request duration in seconds",
unit="s"
)
response_size = meter.create_histogram(
name="http_response_size_bytes",
description="HTTP response size in bytes",
unit="by"
)
def handle_http_request(method, endpoint):
"""Handle HTTP request with timing and size measurement"""
start_time = time.time()
try:
# Simulate request processing
processing_time = random.uniform(0.01, 0.5)
time.sleep(processing_time)
# Simulate response
response_data = "x" * random.randint(100, 5000)
# Record metrics
duration = time.time() - start_time
request_duration.record(duration, {
"method": method,
"endpoint": endpoint,
"status_code": "200"
})
response_size.record(len(response_data), {
"method": method,
"endpoint": endpoint,
"content_type": "application/json"
})
return response_data
except Exception as e:
duration = time.time() - start_time
request_duration.record(duration, {
"method": method,
"endpoint": endpoint,
"status_code": "500"
})
raise
# Example usage
handle_http_request("GET", "/api/users")
handle_http_request("POST", "/api/users")
Observable Gauge
Observable Gauge is an asynchronous instrument that measures non-additive values at a point in time, such as CPU usage, memory consumption, or temperature readings.
import psutil
import time
from opentelemetry.metrics import CallbackOptions, Observation
def get_system_metrics(options: CallbackOptions):
"""Callback function to collect system metrics"""
# CPU usage
cpu_usage = psutil.cpu_percent(interval=None)
yield Observation(cpu_usage, {"resource": "cpu", "unit": "percent"})
# Memory usage
memory = psutil.virtual_memory()
yield Observation(memory.percent, {"resource": "memory", "unit": "percent"})
yield Observation(memory.used, {"resource": "memory", "unit": "bytes", "type": "used"})
yield Observation(memory.available, {"resource": "memory", "unit": "bytes", "type": "available"})
# Disk usage
disk = psutil.disk_usage('/')
disk_usage_percent = (disk.used / disk.total) * 100
yield Observation(disk_usage_percent, {"resource": "disk", "unit": "percent", "mount": "/"})
# Create observable gauge
system_metrics = meter.create_observable_gauge(
name="system_resource_usage",
description="System resource utilization",
unit="1",
callbacks=[get_system_metrics]
)
def get_application_metrics(options: CallbackOptions):
"""Callback function to collect application-specific metrics"""
# Current timestamp
yield Observation(time.time(), {"metric": "last_update", "unit": "timestamp"})
# Active threads
import threading
active_threads = threading.active_count()
yield Observation(active_threads, {"metric": "active_threads", "unit": "count"})
app_metrics = meter.create_observable_gauge(
name="application_metrics",
description="Application-specific metrics",
unit="1",
callbacks=[get_application_metrics]
)
Observable Counter
Observable Counter is an asynchronous instrument that measures monotonically increasing values, such as total bytes read or CPU time consumed.
import os
import time
def get_process_metrics(options: CallbackOptions):
"""Callback function to collect process metrics"""
# Process CPU time
cpu_times = psutil.Process().cpu_times()
yield Observation(cpu_times.user, {"cpu_type": "user", "unit": "seconds"})
yield Observation(cpu_times.system, {"cpu_type": "system", "unit": "seconds"})
# Process memory info
memory_info = psutil.Process().memory_info()
yield Observation(memory_info.rss, {"memory_type": "rss", "unit": "bytes"})
yield Observation(memory_info.vms, {"memory_type": "vms", "unit": "bytes"})
# File descriptor count (Unix-like systems)
try:
num_fds = psutil.Process().num_fds()
yield Observation(num_fds, {"resource": "file_descriptors", "unit": "count"})
except AttributeError:
# Windows doesn't have num_fds
pass
process_metrics = meter.create_observable_counter(
name="process_resource_usage",
description="Process resource usage counters",
unit="1",
callbacks=[get_process_metrics]
)
def get_io_metrics(options: CallbackOptions):
"""Callback function to collect I/O metrics"""
try:
io_counters = psutil.Process().io_counters()
yield Observation(io_counters.read_bytes, {"io_type": "read", "unit": "bytes"})
yield Observation(io_counters.write_bytes, {"io_type": "write", "unit": "bytes"})
yield Observation(io_counters.read_count, {"io_type": "read", "unit": "operations"})
yield Observation(io_counters.write_count, {"io_type": "write", "unit": "operations"})
except AttributeError:
# Some systems don't support I/O counters
pass
io_metrics = meter.create_observable_counter(
name="process_io_usage",
description="Process I/O usage counters",
unit="1",
callbacks=[get_io_metrics]
)
Observable UpDownCounter
Observable UpDownCounter is an asynchronous instrument that measures additive values that can increase or decrease, measured at observation time.
import threading
import queue
import time
# Global state for demonstration
message_queues = {
"email": queue.Queue(),
"sms": queue.Queue(),
"push": queue.Queue()
}
active_workers = {"email": 0, "sms": 0, "push": 0}
def get_queue_metrics(options: CallbackOptions):
"""Callback function to collect queue metrics"""
for queue_name, q in message_queues.items():
# Queue size (can go up and down)
yield Observation(q.qsize(), {"queue": queue_name, "metric": "size"})
# Active workers (can go up and down)
workers = active_workers.get(queue_name, 0)
yield Observation(workers, {"queue": queue_name, "metric": "active_workers"})
queue_metrics = meter.create_observable_up_down_counter(
name="message_queue_status",
description="Message queue status metrics",
unit="1",
callbacks=[get_queue_metrics]
)
def get_connection_pool_metrics(options: CallbackOptions):
"""Callback function for connection pool metrics"""
# Simulate connection pool status
pools = {
"database": {"active": 5, "idle": 3, "max": 10},
"redis": {"active": 2, "idle": 8, "max": 10},
"elasticsearch": {"active": 1, "idle": 4, "max": 5}
}
for pool_name, stats in pools.items():
yield Observation(stats["active"], {"pool": pool_name, "state": "active"})
yield Observation(stats["idle"], {"pool": pool_name, "state": "idle"})
yield Observation(stats["max"], {"pool": pool_name, "state": "max"})
connection_pool_metrics = meter.create_observable_up_down_counter(
name="connection_pool_status",
description="Connection pool status",
unit="1",
callbacks=[get_connection_pool_metrics]
)
Working with Attributes
Attributes provide contextual information that makes metrics more useful for analysis and filtering.
Adding Attributes to Measurements
# Create various counters and histograms
api_requests = meter.create_counter("api_requests_total", description="Total API requests")
request_duration = meter.create_histogram("request_duration_seconds", description="Request duration")
def handle_api_request(method, endpoint, user_type, region):
"""Handle API request with detailed attributes"""
start_time = time.time()
try:
# Simulate request processing
processing_time = random.uniform(0.01, 0.3)
time.sleep(processing_time)
# Record successful request with detailed attributes
api_requests.add(1, {
"method": method,
"endpoint": endpoint,
"status_code": "200",
"user_type": user_type,
"region": region,
"cache_hit": "false"
})
# Record duration
duration = time.time() - start_time
request_duration.record(duration, {
"method": method,
"endpoint": endpoint,
"status_code": "200"
})
except Exception as e:
# Record error
api_requests.add(1, {
"method": method,
"endpoint": endpoint,
"status_code": "500",
"user_type": user_type,
"region": region,
"error_type": type(e).__name__
})
duration = time.time() - start_time
request_duration.record(duration, {
"method": method,
"endpoint": endpoint,
"status_code": "500"
})
# Example usage
handle_api_request("GET", "/api/users", "premium", "us-east-1")
handle_api_request("POST", "/api/orders", "free", "eu-west-1")
Attribute Best Practices
Use meaningful attributes that provide valuable differentiation without creating excessive cardinality:
# Good: Low cardinality attributes
http_requests = meter.create_counter("http_requests_total")
def record_request(method, status_code, endpoint_category):
"""Record request with low-cardinality attributes"""
http_requests.add(1, {
"method": method, # Limited values: GET, POST, PUT, DELETE
"status_class": f"{status_code//100}xx", # Grouped: 2xx, 3xx, 4xx, 5xx
"endpoint_category": endpoint_category # Grouped: api, static, health
})
# Avoid: High cardinality attributes
def bad_example(method, status_code, user_id, session_id, timestamp):
"""Example of what NOT to do - high cardinality attributes"""
# DON'T DO THIS - creates too many unique metric series
http_requests.add(1, {
"method": method,
"status_code": status_code,
"user_id": user_id, # Unique per user - high cardinality
"session_id": session_id, # Unique per session - high cardinality
"timestamp": timestamp # Unique per request - very high cardinality
})
Recording Measurements
Synchronous Measurements
Synchronous instruments are recorded inline with application logic:
# Create instruments
operation_counter = meter.create_counter("operations_total")
operation_duration = meter.create_histogram("operation_duration_seconds")
error_counter = meter.create_counter("operation_errors_total")
def perform_operation(operation_type, user_id):
"""Perform operation with comprehensive metrics"""
start_time = time.time()
try:
# Increment operation counter
operation_counter.add(1, {
"operation": operation_type,
"status": "started"
})
# Simulate operation
processing_time = random.uniform(0.1, 1.0)
time.sleep(processing_time)
# Simulate potential failure
if random.random() < 0.1: # 10% failure rate
raise ValueError("Operation failed")
# Record successful completion
operation_counter.add(1, {
"operation": operation_type,
"status": "completed"
})
return f"Operation {operation_type} completed"
except Exception as e:
# Record error
error_counter.add(1, {
"operation": operation_type,
"error_type": type(e).__name__,
"error_message": str(e)[:50] # Truncate to avoid high cardinality
})
operation_counter.add(1, {
"operation": operation_type,
"status": "failed"
})
raise
finally:
# Always record duration
duration = time.time() - start_time
operation_duration.record(duration, {
"operation": operation_type
})
# Example usage
try:
result = perform_operation("user_registration", "user123")
print(result)
except Exception as e:
print(f"Operation failed: {e}")
Asynchronous Measurements
Asynchronous instruments use callbacks that are invoked during metric collection:
# Global state for demonstration
system_stats = {
"cpu_usage": 0.0,
"memory_usage": 0.0,
"disk_usage": 0.0,
"network_connections": 0
}
queue_stats = {
"email": {"size": 0, "processed": 0},
"sms": {"size": 0, "processed": 0},
"push": {"size": 0, "processed": 0}
}
def collect_system_metrics(options: CallbackOptions):
"""Callback to collect system metrics"""
# Update system stats (in real app, this would call actual system APIs)
system_stats["cpu_usage"] = psutil.cpu_percent()
system_stats["memory_usage"] = psutil.virtual_memory().percent
system_stats["disk_usage"] = psutil.disk_usage('/').percent
system_stats["network_connections"] = len(psutil.net_connections())
# Yield observations
for metric_name, value in system_stats.items():
yield Observation(value, {"metric": metric_name})
def collect_queue_metrics(options: CallbackOptions):
"""Callback to collect queue metrics"""
for queue_name, stats in queue_stats.items():
# Queue size (up/down counter)
yield Observation(stats["size"], {"queue": queue_name, "metric": "size"})
# Total processed (counter)
yield Observation(stats["processed"], {"queue": queue_name, "metric": "processed"})
# Create observable instruments
system_gauge = meter.create_observable_gauge(
name="system_metrics",
description="System performance metrics",
unit="1",
callbacks=[collect_system_metrics]
)
queue_counter = meter.create_observable_up_down_counter(
name="queue_metrics",
description="Queue status metrics",
unit="1",
callbacks=[collect_queue_metrics]
)
# Simulate queue operations
def simulate_queue_activity():
"""Simulate queue activity to generate metrics"""
while True:
for queue_name in queue_stats:
# Add items to queue
new_items = random.randint(0, 5)
queue_stats[queue_name]["size"] += new_items
# Process items from queue
processed = min(queue_stats[queue_name]["size"], random.randint(0, 3))
queue_stats[queue_name]["size"] -= processed
queue_stats[queue_name]["processed"] += processed
time.sleep(2)
# In a real application, you'd run this in a separate thread
# threading.Thread(target=simulate_queue_activity, daemon=True).start()
Practical Examples
HTTP Server Metrics
import time
import random
from opentelemetry import metrics
class HTTPServerMetrics:
"""Comprehensive HTTP server metrics collection"""
def __init__(self, meter):
self.request_counter = meter.create_counter(
"http_requests_total",
description="Total HTTP requests",
unit="1"
)
self.request_duration = meter.create_histogram(
"http_request_duration_seconds",
description="HTTP request duration",
unit="s"
)
self.active_requests = meter.create_up_down_counter(
"http_requests_active",
description="Active HTTP requests",
unit="1"
)
self.response_size = meter.create_histogram(
"http_response_size_bytes",
description="HTTP response size",
unit="by"
)
self.error_counter = meter.create_counter(
"http_errors_total",
description="Total HTTP errors",
unit="1"
)
def record_request(self, method, route, status_code, duration, response_size):
"""Record metrics for an HTTP request"""
attributes = {
"method": method,
"route": route,
"status_code": str(status_code)
}
# Record request count
self.request_counter.add(1, attributes)
# Record duration
self.request_duration.record(duration, attributes)
# Record response size
self.response_size.record(response_size, attributes)
# Record errors
if status_code >= 400:
error_attributes = {
"method": method,
"route": route,
"status_code": str(status_code),
"error_type": "client_error" if status_code < 500 else "server_error"
}
self.error_counter.add(1, error_attributes)
def start_request(self, method, route):
"""Track active request start"""
self.active_requests.add(1, {"method": method, "route": route})
def end_request(self, method, route):
"""Track active request end"""
self.active_requests.add(-1, {"method": method, "route": route})
# Usage example
meter = metrics.get_meter("http_server", "1.0.0")
server_metrics = HTTPServerMetrics(meter)
def handle_request(method, route):
"""Simulate handling an HTTP request"""
start_time = time.time()
# Track active request
server_metrics.start_request(method, route)
try:
# Simulate processing
processing_time = random.uniform(0.01, 0.5)
time.sleep(processing_time)
# Simulate response
status_code = random.choices([200, 404, 500], weights=[85, 10, 5])[0]
response_size = random.randint(100, 10000)
# Record metrics
duration = time.time() - start_time
server_metrics.record_request(method, route, status_code, duration, response_size)
return status_code, response_size
finally:
# Always end active request tracking
server_metrics.end_request(method, route)
# Example usage
handle_request("GET", "/api/users")
handle_request("POST", "/api/orders")
handle_request("GET", "/api/products")
Database Connection Pool Metrics
import threading
import time
import random
from opentelemetry.metrics import CallbackOptions, Observation
class DatabasePoolMetrics:
"""Database connection pool metrics"""
def __init__(self, meter):
self.query_counter = meter.create_counter(
"db_queries_total",
description="Total database queries",
unit="1"
)
self.query_duration = meter.create_histogram(
"db_query_duration_seconds",
description="Database query duration",
unit="s"
)
self.connection_pool_gauge = meter.create_observable_gauge(
"db_connection_pool_status",
description="Database connection pool status",
unit="1",
callbacks=[self._collect_pool_metrics]
)
# Simulated connection pool state
self.pool_stats = {
"active": 0,
"idle": 10,
"max": 20,
"total_created": 10,
"total_closed": 0
}
self.lock = threading.Lock()
def _collect_pool_metrics(self, options: CallbackOptions):
"""Collect connection pool metrics"""
with self.lock:
yield Observation(self.pool_stats["active"], {"state": "active"})
yield Observation(self.pool_stats["idle"], {"state": "idle"})
yield Observation(self.pool_stats["max"], {"state": "max"})
yield Observation(self.pool_stats["total_created"], {"state": "total_created"})
yield Observation(self.pool_stats["total_closed"], {"state": "total_closed"})
def execute_query(self, query_type, table):
"""Execute a database query with metrics"""
start_time = time.time()
# Simulate getting connection from pool
with self.lock:
if self.pool_stats["idle"] > 0:
self.pool_stats["idle"] -= 1
self.pool_stats["active"] += 1
else:
# Would normally wait for connection or create new one
pass
try:
# Simulate query execution
execution_time = random.uniform(0.001, 0.1)
time.sleep(execution_time)
# Record metrics
duration = time.time() - start_time
attributes = {
"query_type": query_type,
"table": table
}
self.query_counter.add(1, attributes)
self.query_duration.record(duration, attributes)
return f"Query {query_type} on {table} completed"
finally:
# Return connection to pool
with self.lock:
self.pool_stats["active"] -= 1
self.pool_stats["idle"] += 1
# Usage example
meter = metrics.get_meter("database", "1.0.0")
db_metrics = DatabasePoolMetrics(meter)
# Simulate database operations
db_metrics.execute_query("SELECT", "users")
db_metrics.execute_query("INSERT", "orders")
db_metrics.execute_query("UPDATE", "products")
Business Metrics
import time
import random
from opentelemetry.metrics import CallbackOptions, Observation
class BusinessMetrics:
"""Business-specific metrics collection"""
def __init__(self, meter):
self.user_registrations = meter.create_counter(
"user_registrations_total",
description="Total user registrations",
unit="1"
)
self.order_value = meter.create_histogram(
"order_value_usd",
description="Order value in USD",
unit="USD"
)
self.subscription_status = meter.create_observable_up_down_counter(
"subscriptions_active",
description="Active subscriptions by plan",
unit="1",
callbacks=[self._collect_subscription_metrics]
)
self.revenue_gauge = meter.create_observable_gauge(
"revenue_metrics",
description="Revenue metrics",
unit="USD",
callbacks=[self._collect_revenue_metrics]
)
# Simulated business data
self.subscription_counts = {
"basic": 1250,
"premium": 340,
"enterprise": 45
}
self.revenue_data = {
"monthly_recurring": 50000,
"one_time": 15000,
"total": 65000
}
def _collect_subscription_metrics(self, options: CallbackOptions):
"""Collect subscription metrics"""
for plan, count in self.subscription_counts.items():
yield Observation(count, {"plan": plan})
def _collect_revenue_metrics(self, options: CallbackOptions):
"""Collect revenue metrics"""
for revenue_type, amount in self.revenue_data.items():
yield Observation(amount, {"type": revenue_type})
def record_user_registration(self, source, plan):
"""Record a new user registration"""
self.user_registrations.add(1, {
"source": source,
"plan": plan,
"hour": str(time.localtime().tm_hour) # Hour bucket for analysis
})
# Update subscription count
self.subscription_counts[plan] += 1
def record_order(self, value, currency, category):
"""Record an order"""
# Convert to USD for consistent reporting
usd_value = self._convert_to_usd(value, currency)
self.order_value.record(usd_value, {
"category": category,
"currency": currency,
"value_range": self._get_value_range(usd_value)
})
# Update revenue
self.revenue_data["one_time"] += usd_value
self.revenue_data["total"] += usd_value
def _convert_to_usd(self, value, currency):
"""Convert currency to USD (simplified)"""
rates = {"USD": 1.0, "EUR": 1.1, "GBP": 1.3, "JPY": 0.007}
return value * rates.get(currency, 1.0)
def _get_value_range(self, value):
"""Categorize order value"""
if value < 10:
return "small"
elif value < 100:
return "medium"
elif value < 1000:
return "large"
else:
return "premium"
# Usage example
meter = metrics.get_meter("business", "1.0.0")
business_metrics = BusinessMetrics(meter)
# Record business events
business_metrics.record_user_registration("google_ads", "premium")
business_metrics.record_order(99.99, "USD", "electronics")
business_metrics.record_order(49.99, "EUR", "books")
business_metrics.record_order(199.99, "USD", "clothing")
Config and Performance
Metric Reader Configuration
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
# Configure metric reader with custom intervals
exporter = OTLPMetricExporter(
endpoint="https://api.uptrace.dev:4317",
headers={"uptrace-dsn": "your_dsn_here"}
)
# Create reader with custom export interval
reader = PeriodicExportingMetricReader(
exporter=exporter,
export_interval_millis=60000, # Export every 60 seconds
export_timeout_millis=30000 # 30 second timeout
)
# Create meter provider with reader
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)
# Multiple readers for different destinations
console_reader = PeriodicExportingMetricReader(
ConsoleMetricExporter(),
export_interval_millis=10000 # More frequent for debugging
)
provider = MeterProvider(metric_readers=[reader, console_reader])
Memory Management
For long-running applications, consider memory usage:
import gc
import time
def monitor_memory_usage():
"""Monitor and manage memory usage"""
while True:
# Force garbage collection periodically
gc.collect()
# Monitor memory usage
memory_usage = psutil.Process().memory_info().rss / 1024 / 1024 # MB
if memory_usage > 500: # If using more than 500MB
print(f"Warning: High memory usage: {memory_usage:.2f} MB")
time.sleep(30)
# Run in background thread
import threading
threading.Thread(target=monitor_memory_usage, daemon=True).start()
Attribute Optimization
Optimize attribute usage to prevent cardinality explosion:
# Good: Limited attribute values
http_requests = meter.create_counter("http_requests_total")
def record_request(method, status_code, endpoint):
"""Record HTTP request with optimized attributes"""
# Use status classes instead of exact codes
status_class = f"{status_code // 100}xx"
# Categorize endpoints to reduce cardinality
endpoint_category = categorize_endpoint(endpoint)
http_requests.add(1, {
"method": method, # ~10 possible values
"status_class": status_class, # 5 possible values (2xx, 3xx, 4xx, 5xx)
"endpoint_category": endpoint_category # ~5 categories
})
# Total cardinality: 10 × 5 × 5 = 250 series
def categorize_endpoint(endpoint):
"""Categorize endpoints to reduce cardinality"""
if endpoint.startswith("/api/"):
return "api"
elif endpoint.startswith("/static/"):
return "static"
elif endpoint == "/health":
return "health"
elif endpoint.startswith("/admin/"):
return "admin"
else:
return "other"
# Avoid: High cardinality attributes
def bad_example(method, status_code, full_url, user_id, timestamp):
"""Example of what NOT to do"""
# DON'T DO THIS - creates millions of metric series
http_requests.add(1, {
"method": method,
"status_code": str(status_code), # 50+ possible values
"full_url": full_url, # Thousands of unique URLs
"user_id": user_id, # Thousands of users
"timestamp": str(timestamp) # Infinite unique values
})
# This could create millions of unique metric series!
Environment Variables
Configure metrics behavior using environment variables:
# Metric export settings
export OTEL_METRICS_EXPORTER=otlp
export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=https://api.uptrace.dev:4317
export OTEL_EXPORTER_OTLP_METRICS_HEADERS="uptrace-dsn=YOUR_DSN"
# Collection interval (milliseconds)
export OTEL_METRIC_EXPORT_INTERVAL=60000
# Resource attributes
export OTEL_RESOURCE_ATTRIBUTES="service.name=my-service,service.version=1.0.0"
Use environment variables in your application:
import os
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
def setup_metrics():
"""Setup metrics using environment variables"""
# Get configuration from environment
endpoint = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")
headers_str = os.getenv("OTEL_EXPORTER_OTLP_METRICS_HEADERS", "")
export_interval = int(os.getenv("OTEL_METRIC_EXPORT_INTERVAL", "60000"))
# Parse headers
headers = {}
if headers_str:
for header in headers_str.split(","):
if "=" in header:
key, value = header.split("=", 1)
headers[key.strip()] = value.strip()
# Create exporter
exporter = OTLPMetricExporter(
endpoint=endpoint,
headers=headers
)
# Create reader
reader = PeriodicExportingMetricReader(
exporter=exporter,
export_interval_millis=export_interval
)
# Create and set meter provider
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)
return provider
# Usage
if __name__ == "__main__":
setup_metrics()
meter = metrics.get_meter("my_app", "1.0.0")
Best Practices
Instrument Naming
Follow OpenTelemetry naming conventions:
# Good: Descriptive, hierarchical names
meter.create_counter("http.requests.total")
meter.create_histogram("http.request.duration")
meter.create_observable_gauge("system.memory.usage")
# Avoid: Generic or unclear names
meter.create_counter("requests")
meter.create_histogram("time")
meter.create_observable_gauge("memory")
Unit Specification
Always specify appropriate units:
meter.create_histogram("request.duration", unit="s") # seconds
meter.create_observable_gauge("memory.usage", unit="By") # bytes
meter.create_counter("requests.total", unit="1") # dimensionless
meter.create_histogram("file.size", unit="By") # bytes
meter.create_observable_gauge("temperature", unit="Cel") # Celsius
Error Handling
Handle metric recording errors gracefully:
import logging
logger = logging.getLogger(__name__)
def safe_record_metric(counter, value, attributes):
"""Safely record metric with error handling"""
try:
counter.add(value, attributes)
except Exception as e:
# Log the error but don't let metrics break your application
logger.error(f"Failed to record metric: {e}")
# Usage
safe_record_metric(request_counter, 1, {"status": "success"})
Testing Metrics
Create helper functions for testing:
import unittest
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
class MetricsTestCase(unittest.TestCase):
def setUp(self):
"""Set up test environment"""
self.reader = InMemoryMetricReader()
self.provider = MeterProvider(metric_readers=[self.reader])
self.meter = self.provider.get_meter("test_meter", "1.0.0")
def get_metrics(self):
"""Get collected metrics"""
return self.reader.get_metrics_data()
def test_counter(self):
"""Test counter functionality"""
counter = self.meter.create_counter("test_counter")
counter.add(1, {"key": "value"})
counter.add(2, {"key": "value"})
metrics = self.get_metrics()
# Assert metrics were recorded correctly
self.assertEqual(len(metrics.resource_metrics), 1)
def tearDown(self):
"""Clean up after test"""
self.provider.shutdown()
Performance Monitoring
Monitor the performance impact of metrics collection:
import time
import functools
def time_metric_operation(func):
"""Decorator to measure metric operation performance"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
duration = time.perf_counter() - start_time
# Only log if operation takes too long
if duration > 0.001: # 1ms threshold
print(f"Metric operation {func.__name__} took {duration:.4f}s")
return result
return wrapper
# Usage
@time_metric_operation
def record_business_metric():
business_counter.add(1, {"type": "important"})
OpenTelemetry APM
Uptrace is a DataDog alternative that supports distributed tracing, metrics, and logs. You can use it to monitor applications and troubleshoot issues.
Uptrace comes with an intuitive query builder, rich dashboards, alerting rules with notifications, and integrations for most languages and frameworks.
Uptrace can process billions of spans and metrics on a single server and allows you to monitor your applications at 10x lower cost.
In just a few minutes, you can try Uptrace by visiting the cloud demo (no login required) or running it locally with Docker. The source code is available on GitHub.
What's Next?
Now that you understand the OpenTelemetry Python Metrics API, explore these related topics: