OpenTelemetry Python Metrics API

This document teaches you how to use the OpenTelemetry Python Metrics API to measure application performance with metrics. To learn how to install and configure the OpenTelemetry Python SDK, see Getting started with OpenTelemetry Python.

If you are not familiar with metrics terminology such as timeseries or additive/synchronous/asynchronous instruments, read the introduction to OpenTelemetry Metrics first.

Prerequisites

Before using the Metrics API, ensure you have the required packages installed:

bash
pip install opentelemetry-api opentelemetry-sdk

Getting Started

To get started with metrics, you need to create a meter:

python
from opentelemetry import metrics

meter = metrics.get_meter("app_or_package_name", "1.0.0")

Using the meter, you can create instruments to measure performance. The simplest Counter instrument looks like this:

python
counter = meter.create_counter(
    name="requests_total",
    description="Total number of requests processed",
    unit="1",
)

for i in range(1000):
    counter.add(1, {"status": "success", "method": "GET"})

    if i % 10 == 0:
        # Force collection for demonstration
        time.sleep(0.1)

Metric Instruments

OpenTelemetry provides several types of instruments to capture different kinds of measurements. Each instrument serves a specific purpose and has distinct characteristics.

Counter

Counter is a synchronous instrument that measures additive non-decreasing values, representing cumulative totals like the number of requests, errors, or completed tasks.

python
import time
from opentelemetry import metrics

meter = metrics.get_meter("app_or_package_name", "1.0.0")

http_requests_counter = meter.create_counter(
    name="http_requests_total",
    description="Total number of HTTP requests",
    unit="1"
)

error_counter = meter.create_counter(
    name="http_errors_total",
    description="Total number of HTTP errors",
    unit="1"
)

def handle_request(method, endpoint, status_code):
    # Record successful request
    http_requests_counter.add(1, {
        "method": method,
        "endpoint": endpoint,
        "status_code": str(status_code)
    })

    # Record error if applicable
    if status_code >= 400:
        error_counter.add(1, {
            "method": method,
            "endpoint": endpoint,
            "error_type": "client_error" if status_code < 500 else "server_error"
        })

# Example usage
handle_request("GET", "/api/users", 200)
handle_request("POST", "/api/users", 201)
handle_request("GET", "/api/users/999", 404)

UpDownCounter

UpDownCounter is a synchronous instrument that measures additive values that can both increase and decrease, such as the number of active connections or items in a queue.

python
import random
import time

active_connections = meter.create_up_down_counter(
    name="database_connections_active",
    description="Number of active database connections",
    unit="1"
)

queue_size = meter.create_up_down_counter(
    name="task_queue_size",
    description="Number of items in the task queue",
    unit="1"
)

def simulate_connections():
    """Simulate database connection management"""
    for i in range(20):
        # Connection established
        active_connections.add(1, {"database": "users", "pool": "main"})

        # Simulate work
        time.sleep(0.1)

        # Connection closed
        active_connections.add(-1, {"database": "users", "pool": "main"})

def simulate_queue_operations():
    """Simulate queue operations"""
    while True:
        # Add items to queue
        items_added = random.randint(1, 5)
        queue_size.add(items_added, {"queue": "email", "priority": "high"})

        # Process items from queue
        items_processed = random.randint(1, 3)
        queue_size.add(-items_processed, {"queue": "email", "priority": "high"})

        time.sleep(1)

Histogram

Histogram is a synchronous instrument that measures the statistical distribution of values, such as request latencies or response sizes, grouping them into buckets.

python
import time
import random

request_duration = meter.create_histogram(
    name="http_request_duration_seconds",
    description="HTTP request duration in seconds",
    unit="s"
)

response_size = meter.create_histogram(
    name="http_response_size_bytes",
    description="HTTP response size in bytes",
    unit="by"
)

def handle_http_request(method, endpoint):
    """Handle HTTP request with timing and size measurement"""
    start_time = time.time()

    try:
        # Simulate request processing
        processing_time = random.uniform(0.01, 0.5)
        time.sleep(processing_time)

        # Simulate response
        response_data = "x" * random.randint(100, 5000)

        # Record metrics
        duration = time.time() - start_time
        request_duration.record(duration, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "200"
        })

        response_size.record(len(response_data), {
            "method": method,
            "endpoint": endpoint,
            "content_type": "application/json"
        })

        return response_data

    except Exception as e:
        duration = time.time() - start_time
        request_duration.record(duration, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "500"
        })
        raise

# Example usage
handle_http_request("GET", "/api/users")
handle_http_request("POST", "/api/users")

Observable Gauge

Observable Gauge is an asynchronous instrument that measures non-additive values at a point in time, such as CPU usage, memory consumption, or temperature readings.

python
import psutil
import time
from opentelemetry.metrics import CallbackOptions, Observation

def get_system_metrics(options: CallbackOptions):
    """Callback function to collect system metrics"""
    # CPU usage
    cpu_usage = psutil.cpu_percent(interval=None)
    yield Observation(cpu_usage, {"resource": "cpu", "unit": "percent"})

    # Memory usage
    memory = psutil.virtual_memory()
    yield Observation(memory.percent, {"resource": "memory", "unit": "percent"})
    yield Observation(memory.used, {"resource": "memory", "unit": "bytes", "type": "used"})
    yield Observation(memory.available, {"resource": "memory", "unit": "bytes", "type": "available"})

    # Disk usage
    disk = psutil.disk_usage('/')
    disk_usage_percent = (disk.used / disk.total) * 100
    yield Observation(disk_usage_percent, {"resource": "disk", "unit": "percent", "mount": "/"})

# Create observable gauge
system_metrics = meter.create_observable_gauge(
    name="system_resource_usage",
    description="System resource utilization",
    unit="1",
    callbacks=[get_system_metrics]
)

def get_application_metrics(options: CallbackOptions):
    """Callback function to collect application-specific metrics"""
    # Current timestamp
    yield Observation(time.time(), {"metric": "last_update", "unit": "timestamp"})

    # Active threads
    import threading
    active_threads = threading.active_count()
    yield Observation(active_threads, {"metric": "active_threads", "unit": "count"})

app_metrics = meter.create_observable_gauge(
    name="application_metrics",
    description="Application-specific metrics",
    unit="1",
    callbacks=[get_application_metrics]
)

Observable Counter

Observable Counter is an asynchronous instrument that measures monotonically increasing values, such as total bytes read or CPU time consumed.

python
import os
import time

def get_process_metrics(options: CallbackOptions):
    """Callback function to collect process metrics"""
    # Process CPU time
    cpu_times = psutil.Process().cpu_times()
    yield Observation(cpu_times.user, {"cpu_type": "user", "unit": "seconds"})
    yield Observation(cpu_times.system, {"cpu_type": "system", "unit": "seconds"})

    # Process memory info
    memory_info = psutil.Process().memory_info()
    yield Observation(memory_info.rss, {"memory_type": "rss", "unit": "bytes"})
    yield Observation(memory_info.vms, {"memory_type": "vms", "unit": "bytes"})

    # File descriptor count (Unix-like systems)
    try:
        num_fds = psutil.Process().num_fds()
        yield Observation(num_fds, {"resource": "file_descriptors", "unit": "count"})
    except AttributeError:
        # Windows doesn't have num_fds
        pass

process_metrics = meter.create_observable_counter(
    name="process_resource_usage",
    description="Process resource usage counters",
    unit="1",
    callbacks=[get_process_metrics]
)

def get_io_metrics(options: CallbackOptions):
    """Callback function to collect I/O metrics"""
    try:
        io_counters = psutil.Process().io_counters()
        yield Observation(io_counters.read_bytes, {"io_type": "read", "unit": "bytes"})
        yield Observation(io_counters.write_bytes, {"io_type": "write", "unit": "bytes"})
        yield Observation(io_counters.read_count, {"io_type": "read", "unit": "operations"})
        yield Observation(io_counters.write_count, {"io_type": "write", "unit": "operations"})
    except AttributeError:
        # Some systems don't support I/O counters
        pass

io_metrics = meter.create_observable_counter(
    name="process_io_usage",
    description="Process I/O usage counters",
    unit="1",
    callbacks=[get_io_metrics]
)

Observable UpDownCounter

Observable UpDownCounter is an asynchronous instrument that measures additive values that can increase or decrease, measured at observation time.

python
import threading
import queue
import time

# Global state for demonstration
message_queues = {
    "email": queue.Queue(),
    "sms": queue.Queue(),
    "push": queue.Queue()
}

active_workers = {"email": 0, "sms": 0, "push": 0}

def get_queue_metrics(options: CallbackOptions):
    """Callback function to collect queue metrics"""
    for queue_name, q in message_queues.items():
        # Queue size (can go up and down)
        yield Observation(q.qsize(), {"queue": queue_name, "metric": "size"})

        # Active workers (can go up and down)
        workers = active_workers.get(queue_name, 0)
        yield Observation(workers, {"queue": queue_name, "metric": "active_workers"})

queue_metrics = meter.create_observable_up_down_counter(
    name="message_queue_status",
    description="Message queue status metrics",
    unit="1",
    callbacks=[get_queue_metrics]
)

def get_connection_pool_metrics(options: CallbackOptions):
    """Callback function for connection pool metrics"""
    # Simulate connection pool status
    pools = {
        "database": {"active": 5, "idle": 3, "max": 10},
        "redis": {"active": 2, "idle": 8, "max": 10},
        "elasticsearch": {"active": 1, "idle": 4, "max": 5}
    }

    for pool_name, stats in pools.items():
        yield Observation(stats["active"], {"pool": pool_name, "state": "active"})
        yield Observation(stats["idle"], {"pool": pool_name, "state": "idle"})
        yield Observation(stats["max"], {"pool": pool_name, "state": "max"})

connection_pool_metrics = meter.create_observable_up_down_counter(
    name="connection_pool_status",
    description="Connection pool status",
    unit="1",
    callbacks=[get_connection_pool_metrics]
)

Working with Attributes

Attributes provide contextual information that makes metrics more useful for analysis and filtering.

Adding Attributes to Measurements

python
# Create various counters and histograms
api_requests = meter.create_counter("api_requests_total", description="Total API requests")
request_duration = meter.create_histogram("request_duration_seconds", description="Request duration")

def handle_api_request(method, endpoint, user_type, region):
    """Handle API request with detailed attributes"""
    start_time = time.time()

    try:
        # Simulate request processing
        processing_time = random.uniform(0.01, 0.3)
        time.sleep(processing_time)

        # Record successful request with detailed attributes
        api_requests.add(1, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "200",
            "user_type": user_type,
            "region": region,
            "cache_hit": "false"
        })

        # Record duration
        duration = time.time() - start_time
        request_duration.record(duration, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "200"
        })

    except Exception as e:
        # Record error
        api_requests.add(1, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "500",
            "user_type": user_type,
            "region": region,
            "error_type": type(e).__name__
        })

        duration = time.time() - start_time
        request_duration.record(duration, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "500"
        })

# Example usage
handle_api_request("GET", "/api/users", "premium", "us-east-1")
handle_api_request("POST", "/api/orders", "free", "eu-west-1")

Attribute Best Practices

Use meaningful attributes that provide valuable differentiation without creating excessive cardinality:

python
# Good: Low cardinality attributes
http_requests = meter.create_counter("http_requests_total")

def record_request(method, status_code, endpoint_category):
    """Record request with low-cardinality attributes"""
    http_requests.add(1, {
        "method": method,              # Limited values: GET, POST, PUT, DELETE
        "status_class": f"{status_code//100}xx",  # Grouped: 2xx, 3xx, 4xx, 5xx
        "endpoint_category": endpoint_category     # Grouped: api, static, health
    })

# Avoid: High cardinality attributes
def bad_example(method, status_code, user_id, session_id, timestamp):
    """Example of what NOT to do - high cardinality attributes"""
    # DON'T DO THIS - creates too many unique metric series
    http_requests.add(1, {
        "method": method,
        "status_code": status_code,
        "user_id": user_id,        # Unique per user - high cardinality
        "session_id": session_id,  # Unique per session - high cardinality
        "timestamp": timestamp     # Unique per request - very high cardinality
    })

Recording Measurements

Synchronous Measurements

Synchronous instruments are recorded inline with application logic:

python
# Create instruments
operation_counter = meter.create_counter("operations_total")
operation_duration = meter.create_histogram("operation_duration_seconds")
error_counter = meter.create_counter("operation_errors_total")

def perform_operation(operation_type, user_id):
    """Perform operation with comprehensive metrics"""
    start_time = time.time()

    try:
        # Increment operation counter
        operation_counter.add(1, {
            "operation": operation_type,
            "status": "started"
        })

        # Simulate operation
        processing_time = random.uniform(0.1, 1.0)
        time.sleep(processing_time)

        # Simulate potential failure
        if random.random() < 0.1:  # 10% failure rate
            raise ValueError("Operation failed")

        # Record successful completion
        operation_counter.add(1, {
            "operation": operation_type,
            "status": "completed"
        })

        return f"Operation {operation_type} completed"

    except Exception as e:
        # Record error
        error_counter.add(1, {
            "operation": operation_type,
            "error_type": type(e).__name__,
            "error_message": str(e)[:50]  # Truncate to avoid high cardinality
        })

        operation_counter.add(1, {
            "operation": operation_type,
            "status": "failed"
        })

        raise

    finally:
        # Always record duration
        duration = time.time() - start_time
        operation_duration.record(duration, {
            "operation": operation_type
        })

# Example usage
try:
    result = perform_operation("user_registration", "user123")
    print(result)
except Exception as e:
    print(f"Operation failed: {e}")

Asynchronous Measurements

Asynchronous instruments use callbacks that are invoked during metric collection:

python
# Global state for demonstration
system_stats = {
    "cpu_usage": 0.0,
    "memory_usage": 0.0,
    "disk_usage": 0.0,
    "network_connections": 0
}

queue_stats = {
    "email": {"size": 0, "processed": 0},
    "sms": {"size": 0, "processed": 0},
    "push": {"size": 0, "processed": 0}
}

def collect_system_metrics(options: CallbackOptions):
    """Callback to collect system metrics"""
    # Update system stats (in real app, this would call actual system APIs)
    system_stats["cpu_usage"] = psutil.cpu_percent()
    system_stats["memory_usage"] = psutil.virtual_memory().percent
    system_stats["disk_usage"] = psutil.disk_usage('/').percent
    system_stats["network_connections"] = len(psutil.net_connections())

    # Yield observations
    for metric_name, value in system_stats.items():
        yield Observation(value, {"metric": metric_name})

def collect_queue_metrics(options: CallbackOptions):
    """Callback to collect queue metrics"""
    for queue_name, stats in queue_stats.items():
        # Queue size (up/down counter)
        yield Observation(stats["size"], {"queue": queue_name, "metric": "size"})

        # Total processed (counter)
        yield Observation(stats["processed"], {"queue": queue_name, "metric": "processed"})

# Create observable instruments
system_gauge = meter.create_observable_gauge(
    name="system_metrics",
    description="System performance metrics",
    unit="1",
    callbacks=[collect_system_metrics]
)

queue_counter = meter.create_observable_up_down_counter(
    name="queue_metrics",
    description="Queue status metrics",
    unit="1",
    callbacks=[collect_queue_metrics]
)

# Simulate queue operations
def simulate_queue_activity():
    """Simulate queue activity to generate metrics"""
    while True:
        for queue_name in queue_stats:
            # Add items to queue
            new_items = random.randint(0, 5)
            queue_stats[queue_name]["size"] += new_items

            # Process items from queue
            processed = min(queue_stats[queue_name]["size"], random.randint(0, 3))
            queue_stats[queue_name]["size"] -= processed
            queue_stats[queue_name]["processed"] += processed

        time.sleep(2)

# In a real application, you'd run this in a separate thread
# threading.Thread(target=simulate_queue_activity, daemon=True).start()

Practical Examples

HTTP Server Metrics

python
import time
import random
from opentelemetry import metrics

class HTTPServerMetrics:
    """Comprehensive HTTP server metrics collection"""

    def __init__(self, meter):
        self.request_counter = meter.create_counter(
            "http_requests_total",
            description="Total HTTP requests",
            unit="1"
        )

        self.request_duration = meter.create_histogram(
            "http_request_duration_seconds",
            description="HTTP request duration",
            unit="s"
        )

        self.active_requests = meter.create_up_down_counter(
            "http_requests_active",
            description="Active HTTP requests",
            unit="1"
        )

        self.response_size = meter.create_histogram(
            "http_response_size_bytes",
            description="HTTP response size",
            unit="by"
        )

        self.error_counter = meter.create_counter(
            "http_errors_total",
            description="Total HTTP errors",
            unit="1"
        )

    def record_request(self, method, route, status_code, duration, response_size):
        """Record metrics for an HTTP request"""
        attributes = {
            "method": method,
            "route": route,
            "status_code": str(status_code)
        }

        # Record request count
        self.request_counter.add(1, attributes)

        # Record duration
        self.request_duration.record(duration, attributes)

        # Record response size
        self.response_size.record(response_size, attributes)

        # Record errors
        if status_code >= 400:
            error_attributes = {
                "method": method,
                "route": route,
                "status_code": str(status_code),
                "error_type": "client_error" if status_code < 500 else "server_error"
            }
            self.error_counter.add(1, error_attributes)

    def start_request(self, method, route):
        """Track active request start"""
        self.active_requests.add(1, {"method": method, "route": route})

    def end_request(self, method, route):
        """Track active request end"""
        self.active_requests.add(-1, {"method": method, "route": route})

# Usage example
meter = metrics.get_meter("http_server", "1.0.0")
server_metrics = HTTPServerMetrics(meter)

def handle_request(method, route):
    """Simulate handling an HTTP request"""
    start_time = time.time()

    # Track active request
    server_metrics.start_request(method, route)

    try:
        # Simulate processing
        processing_time = random.uniform(0.01, 0.5)
        time.sleep(processing_time)

        # Simulate response
        status_code = random.choices([200, 404, 500], weights=[85, 10, 5])[0]
        response_size = random.randint(100, 10000)

        # Record metrics
        duration = time.time() - start_time
        server_metrics.record_request(method, route, status_code, duration, response_size)

        return status_code, response_size

    finally:
        # Always end active request tracking
        server_metrics.end_request(method, route)

# Example usage
handle_request("GET", "/api/users")
handle_request("POST", "/api/orders")
handle_request("GET", "/api/products")

Database Connection Pool Metrics

python
import threading
import time
import random
from opentelemetry.metrics import CallbackOptions, Observation

class DatabasePoolMetrics:
    """Database connection pool metrics"""

    def __init__(self, meter):
        self.query_counter = meter.create_counter(
            "db_queries_total",
            description="Total database queries",
            unit="1"
        )

        self.query_duration = meter.create_histogram(
            "db_query_duration_seconds",
            description="Database query duration",
            unit="s"
        )

        self.connection_pool_gauge = meter.create_observable_gauge(
            "db_connection_pool_status",
            description="Database connection pool status",
            unit="1",
            callbacks=[self._collect_pool_metrics]
        )

        # Simulated connection pool state
        self.pool_stats = {
            "active": 0,
            "idle": 10,
            "max": 20,
            "total_created": 10,
            "total_closed": 0
        }

        self.lock = threading.Lock()

    def _collect_pool_metrics(self, options: CallbackOptions):
        """Collect connection pool metrics"""
        with self.lock:
            yield Observation(self.pool_stats["active"], {"state": "active"})
            yield Observation(self.pool_stats["idle"], {"state": "idle"})
            yield Observation(self.pool_stats["max"], {"state": "max"})
            yield Observation(self.pool_stats["total_created"], {"state": "total_created"})
            yield Observation(self.pool_stats["total_closed"], {"state": "total_closed"})

    def execute_query(self, query_type, table):
        """Execute a database query with metrics"""
        start_time = time.time()

        # Simulate getting connection from pool
        with self.lock:
            if self.pool_stats["idle"] > 0:
                self.pool_stats["idle"] -= 1
                self.pool_stats["active"] += 1
            else:
                # Would normally wait for connection or create new one
                pass

        try:
            # Simulate query execution
            execution_time = random.uniform(0.001, 0.1)
            time.sleep(execution_time)

            # Record metrics
            duration = time.time() - start_time
            attributes = {
                "query_type": query_type,
                "table": table
            }

            self.query_counter.add(1, attributes)
            self.query_duration.record(duration, attributes)

            return f"Query {query_type} on {table} completed"

        finally:
            # Return connection to pool
            with self.lock:
                self.pool_stats["active"] -= 1
                self.pool_stats["idle"] += 1

# Usage example
meter = metrics.get_meter("database", "1.0.0")
db_metrics = DatabasePoolMetrics(meter)

# Simulate database operations
db_metrics.execute_query("SELECT", "users")
db_metrics.execute_query("INSERT", "orders")
db_metrics.execute_query("UPDATE", "products")

Business Metrics

python
import time
import random
from opentelemetry.metrics import CallbackOptions, Observation

class BusinessMetrics:
    """Business-specific metrics collection"""

    def __init__(self, meter):
        self.user_registrations = meter.create_counter(
            "user_registrations_total",
            description="Total user registrations",
            unit="1"
        )

        self.order_value = meter.create_histogram(
            "order_value_usd",
            description="Order value in USD",
            unit="USD"
        )

        self.subscription_status = meter.create_observable_up_down_counter(
            "subscriptions_active",
            description="Active subscriptions by plan",
            unit="1",
            callbacks=[self._collect_subscription_metrics]
        )

        self.revenue_gauge = meter.create_observable_gauge(
            "revenue_metrics",
            description="Revenue metrics",
            unit="USD",
            callbacks=[self._collect_revenue_metrics]
        )

        # Simulated business data
        self.subscription_counts = {
            "basic": 1250,
            "premium": 340,
            "enterprise": 45
        }

        self.revenue_data = {
            "monthly_recurring": 50000,
            "one_time": 15000,
            "total": 65000
        }

    def _collect_subscription_metrics(self, options: CallbackOptions):
        """Collect subscription metrics"""
        for plan, count in self.subscription_counts.items():
            yield Observation(count, {"plan": plan})

    def _collect_revenue_metrics(self, options: CallbackOptions):
        """Collect revenue metrics"""
        for revenue_type, amount in self.revenue_data.items():
            yield Observation(amount, {"type": revenue_type})

    def record_user_registration(self, source, plan):
        """Record a new user registration"""
        self.user_registrations.add(1, {
            "source": source,
            "plan": plan,
            "hour": str(time.localtime().tm_hour)  # Hour bucket for analysis
        })

        # Update subscription count
        self.subscription_counts[plan] += 1

    def record_order(self, value, currency, category):
        """Record an order"""
        # Convert to USD for consistent reporting
        usd_value = self._convert_to_usd(value, currency)

        self.order_value.record(usd_value, {
            "category": category,
            "currency": currency,
            "value_range": self._get_value_range(usd_value)
        })

        # Update revenue
        self.revenue_data["one_time"] += usd_value
        self.revenue_data["total"] += usd_value

    def _convert_to_usd(self, value, currency):
        """Convert currency to USD (simplified)"""
        rates = {"USD": 1.0, "EUR": 1.1, "GBP": 1.3, "JPY": 0.007}
        return value * rates.get(currency, 1.0)

    def _get_value_range(self, value):
        """Categorize order value"""
        if value < 10:
            return "small"
        elif value < 100:
            return "medium"
        elif value < 1000:
            return "large"
        else:
            return "premium"

# Usage example
meter = metrics.get_meter("business", "1.0.0")
business_metrics = BusinessMetrics(meter)

# Record business events
business_metrics.record_user_registration("google_ads", "premium")
business_metrics.record_order(99.99, "USD", "electronics")
business_metrics.record_order(49.99, "EUR", "books")
business_metrics.record_order(199.99, "USD", "clothing")

Config and Performance

Metric Reader Configuration

python
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

# Configure metric reader with custom intervals
exporter = OTLPMetricExporter(
    endpoint="https://api.uptrace.dev:4317",
    headers={"uptrace-dsn": "your_dsn_here"}
)

# Create reader with custom export interval
reader = PeriodicExportingMetricReader(
    exporter=exporter,
    export_interval_millis=60000,  # Export every 60 seconds
    export_timeout_millis=30000    # 30 second timeout
)

# Create meter provider with reader
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)

# Multiple readers for different destinations
console_reader = PeriodicExportingMetricReader(
    ConsoleMetricExporter(),
    export_interval_millis=10000  # More frequent for debugging
)

provider = MeterProvider(metric_readers=[reader, console_reader])

Memory Management

For long-running applications, consider memory usage:

python
import gc
import time

def monitor_memory_usage():
    """Monitor and manage memory usage"""
    while True:
        # Force garbage collection periodically
        gc.collect()

        # Monitor memory usage
        memory_usage = psutil.Process().memory_info().rss / 1024 / 1024  # MB

        if memory_usage > 500:  # If using more than 500MB
            print(f"Warning: High memory usage: {memory_usage:.2f} MB")

        time.sleep(30)

# Run in background thread
import threading
threading.Thread(target=monitor_memory_usage, daemon=True).start()

Attribute Optimization

Optimize attribute usage to prevent cardinality explosion:

python
# Good: Limited attribute values
http_requests = meter.create_counter("http_requests_total")

def record_request(method, status_code, endpoint):
    """Record HTTP request with optimized attributes"""
    # Use status classes instead of exact codes
    status_class = f"{status_code // 100}xx"

    # Categorize endpoints to reduce cardinality
    endpoint_category = categorize_endpoint(endpoint)

    http_requests.add(1, {
        "method": method,              # ~10 possible values
        "status_class": status_class,  # 5 possible values (2xx, 3xx, 4xx, 5xx)
        "endpoint_category": endpoint_category  # ~5 categories
    })
    # Total cardinality: 10 × 5 × 5 = 250 series

def categorize_endpoint(endpoint):
    """Categorize endpoints to reduce cardinality"""
    if endpoint.startswith("/api/"):
        return "api"
    elif endpoint.startswith("/static/"):
        return "static"
    elif endpoint == "/health":
        return "health"
    elif endpoint.startswith("/admin/"):
        return "admin"
    else:
        return "other"

# Avoid: High cardinality attributes
def bad_example(method, status_code, full_url, user_id, timestamp):
    """Example of what NOT to do"""
    # DON'T DO THIS - creates millions of metric series
    http_requests.add(1, {
        "method": method,
        "status_code": str(status_code),  # 50+ possible values
        "full_url": full_url,             # Thousands of unique URLs
        "user_id": user_id,               # Thousands of users
        "timestamp": str(timestamp)       # Infinite unique values
    })
    # This could create millions of unique metric series!

Environment Variables

Configure metrics behavior using environment variables:

bash
# Metric export settings
export OTEL_METRICS_EXPORTER=otlp
export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=https://api.uptrace.dev:4317
export OTEL_EXPORTER_OTLP_METRICS_HEADERS="uptrace-dsn=YOUR_DSN"

# Collection interval (milliseconds)
export OTEL_METRIC_EXPORT_INTERVAL=60000

# Resource attributes
export OTEL_RESOURCE_ATTRIBUTES="service.name=my-service,service.version=1.0.0"

Use environment variables in your application:

python
import os
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

def setup_metrics():
    """Setup metrics using environment variables"""
    # Get configuration from environment
    endpoint = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")
    headers_str = os.getenv("OTEL_EXPORTER_OTLP_METRICS_HEADERS", "")
    export_interval = int(os.getenv("OTEL_METRIC_EXPORT_INTERVAL", "60000"))

    # Parse headers
    headers = {}
    if headers_str:
        for header in headers_str.split(","):
            if "=" in header:
                key, value = header.split("=", 1)
                headers[key.strip()] = value.strip()

    # Create exporter
    exporter = OTLPMetricExporter(
        endpoint=endpoint,
        headers=headers
    )

    # Create reader
    reader = PeriodicExportingMetricReader(
        exporter=exporter,
        export_interval_millis=export_interval
    )

    # Create and set meter provider
    provider = MeterProvider(metric_readers=[reader])
    metrics.set_meter_provider(provider)

    return provider

# Usage
if __name__ == "__main__":
    setup_metrics()
    meter = metrics.get_meter("my_app", "1.0.0")

Best Practices

Instrument Naming

Follow OpenTelemetry naming conventions:

python
# Good: Descriptive, hierarchical names
meter.create_counter("http.requests.total")
meter.create_histogram("http.request.duration")
meter.create_observable_gauge("system.memory.usage")

# Avoid: Generic or unclear names
meter.create_counter("requests")
meter.create_histogram("time")
meter.create_observable_gauge("memory")

Unit Specification

Always specify appropriate units:

python
meter.create_histogram("request.duration", unit="s")          # seconds
meter.create_observable_gauge("memory.usage", unit="By")      # bytes
meter.create_counter("requests.total", unit="1")              # dimensionless
meter.create_histogram("file.size", unit="By")                # bytes
meter.create_observable_gauge("temperature", unit="Cel")      # Celsius

Error Handling

Handle metric recording errors gracefully:

python
import logging

logger = logging.getLogger(__name__)

def safe_record_metric(counter, value, attributes):
    """Safely record metric with error handling"""
    try:
        counter.add(value, attributes)
    except Exception as e:
        # Log the error but don't let metrics break your application
        logger.error(f"Failed to record metric: {e}")

# Usage
safe_record_metric(request_counter, 1, {"status": "success"})

Testing Metrics

Create helper functions for testing:

python
import unittest
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader

class MetricsTestCase(unittest.TestCase):
    def setUp(self):
        """Set up test environment"""
        self.reader = InMemoryMetricReader()
        self.provider = MeterProvider(metric_readers=[self.reader])
        self.meter = self.provider.get_meter("test_meter", "1.0.0")

    def get_metrics(self):
        """Get collected metrics"""
        return self.reader.get_metrics_data()

    def test_counter(self):
        """Test counter functionality"""
        counter = self.meter.create_counter("test_counter")
        counter.add(1, {"key": "value"})
        counter.add(2, {"key": "value"})

        metrics = self.get_metrics()
        # Assert metrics were recorded correctly
        self.assertEqual(len(metrics.resource_metrics), 1)

    def tearDown(self):
        """Clean up after test"""
        self.provider.shutdown()

Performance Monitoring

Monitor the performance impact of metrics collection:

python
import time
import functools

def time_metric_operation(func):
    """Decorator to measure metric operation performance"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        duration = time.perf_counter() - start_time

        # Only log if operation takes too long
        if duration > 0.001:  # 1ms threshold
            print(f"Metric operation {func.__name__} took {duration:.4f}s")

        return result
    return wrapper

# Usage
@time_metric_operation
def record_business_metric():
    business_counter.add(1, {"type": "important"})

OpenTelemetry APM

Uptrace is a DataDog alternative that supports distributed tracing, metrics, and logs. You can use it to monitor applications and troubleshoot issues.

Uptrace comes with an intuitive query builder, rich dashboards, alerting rules with notifications, and integrations for most languages and frameworks.

Uptrace can process billions of spans and metrics on a single server and allows you to monitor your applications at 10x lower cost.

In just a few minutes, you can try Uptrace by visiting the cloud demo (no login required) or running it locally with Docker. The source code is available on GitHub.

What's Next?

Now that you understand the OpenTelemetry Python Metrics API, explore these related topics: