# OpenTelemetry Python Metrics API

> Collect Python application metrics with the OpenTelemetry Meter API, configure instruments, and forward data to Uptrace dashboards.

![undefined](/devicon/python-original.svg)This document teaches you how to use the OpenTelemetry Python Metrics API to measure application performance with metrics.

## Prerequisites

<partial path="otel-prereq-python">



</partial>

If you are not familiar with metrics terminology such as timeseries or additive/synchronous/asynchronous instruments, read the introduction to [OpenTelemetry Metrics](/opentelemetry/metrics) first.

## Getting Started

To get started with metrics, you need to create a meter:

```python
from opentelemetry import metrics

meter = metrics.get_meter("app_or_package_name", "1.0.0")
```

Using the meter, you can create [instruments](/opentelemetry/metrics#instruments) to measure performance. The simplest [Counter](/opentelemetry/metrics#counter) instrument looks like this:

```python
counter = meter.create_counter(
    name="requests_total",
    description="Total number of requests processed",
    unit="1",
)

for i in range(1000):
    counter.add(1, {"status": "success", "method": "GET"})

    if i % 10 == 0:
        # Force collection for demonstration
        time.sleep(0.1)
```

## Metric Instruments

OpenTelemetry provides several types of instruments to capture different kinds of measurements. Each instrument serves a specific purpose and has distinct characteristics.

### Counter

[Counter](/opentelemetry/metrics#counter) is a synchronous instrument that measures additive non-decreasing values, representing cumulative totals like the number of requests, errors, or completed tasks.

```python
import time
from opentelemetry import metrics

meter = metrics.get_meter("app_or_package_name", "1.0.0")

http_requests_counter = meter.create_counter(
    name="http_requests_total",
    description="Total number of HTTP requests",
    unit="1"
)

error_counter = meter.create_counter(
    name="http_errors_total",
    description="Total number of HTTP errors",
    unit="1"
)

def handle_request(method, endpoint, status_code):
    # Record successful request
    http_requests_counter.add(1, {
        "method": method,
        "endpoint": endpoint,
        "status_code": str(status_code)
    })

    # Record error if applicable
    if status_code >= 400:
        error_counter.add(1, {
            "method": method,
            "endpoint": endpoint,
            "error_type": "client_error" if status_code < 500 else "server_error"
        })

# Example usage
handle_request("GET", "/api/users", 200)
handle_request("POST", "/api/users", 201)
handle_request("GET", "/api/users/999", 404)
```

### UpDownCounter

[UpDownCounter](/opentelemetry/metrics#updowncounter) is a synchronous instrument that measures additive values that can both increase and decrease, such as the number of active connections or items in a queue.

```python
import random
import time

active_connections = meter.create_up_down_counter(
    name="database_connections_active",
    description="Number of active database connections",
    unit="1"
)

queue_size = meter.create_up_down_counter(
    name="task_queue_size",
    description="Number of items in the task queue",
    unit="1"
)

def simulate_connections():
    """Simulate database connection management"""
    for i in range(20):
        # Connection established
        active_connections.add(1, {"database": "users", "pool": "main"})

        # Simulate work
        time.sleep(0.1)

        # Connection closed
        active_connections.add(-1, {"database": "users", "pool": "main"})

def simulate_queue_operations():
    """Simulate queue operations"""
    while True:
        # Add items to queue
        items_added = random.randint(1, 5)
        queue_size.add(items_added, {"queue": "email", "priority": "high"})

        # Process items from queue
        items_processed = random.randint(1, 3)
        queue_size.add(-items_processed, {"queue": "email", "priority": "high"})

        time.sleep(1)
```

### Histogram

[Histogram](/opentelemetry/metrics#histogram) is a synchronous instrument that measures the statistical distribution of values, such as request latencies or response sizes, grouping them into buckets.

```python
import time
import random

request_duration = meter.create_histogram(
    name="http_request_duration_seconds",
    description="HTTP request duration in seconds",
    unit="s"
)

response_size = meter.create_histogram(
    name="http_response_size_bytes",
    description="HTTP response size in bytes",
    unit="by"
)

def handle_http_request(method, endpoint):
    """Handle HTTP request with timing and size measurement"""
    start_time = time.time()

    try:
        # Simulate request processing
        processing_time = random.uniform(0.01, 0.5)
        time.sleep(processing_time)

        # Simulate response
        response_data = "x" * random.randint(100, 5000)

        # Record metrics
        duration = time.time() - start_time
        request_duration.record(duration, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "200"
        })

        response_size.record(len(response_data), {
            "method": method,
            "endpoint": endpoint,
            "content_type": "application/json"
        })

        return response_data

    except Exception as e:
        duration = time.time() - start_time
        request_duration.record(duration, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "500"
        })
        raise

# Example usage
handle_http_request("GET", "/api/users")
handle_http_request("POST", "/api/users")
```

### Observable Gauge

[Observable Gauge](/opentelemetry/metrics#gaugeobserver) is an asynchronous instrument that measures non-additive values at a point in time, such as CPU usage, memory consumption, or temperature readings.

```python
import psutil
import time
from opentelemetry.metrics import CallbackOptions, Observation

def get_system_metrics(options: CallbackOptions):
    """Callback function to collect system metrics"""
    # CPU usage
    cpu_usage = psutil.cpu_percent(interval=None)
    yield Observation(cpu_usage, {"resource": "cpu", "unit": "percent"})

    # Memory usage
    memory = psutil.virtual_memory()
    yield Observation(memory.percent, {"resource": "memory", "unit": "percent"})
    yield Observation(memory.used, {"resource": "memory", "unit": "bytes", "type": "used"})
    yield Observation(memory.available, {"resource": "memory", "unit": "bytes", "type": "available"})

    # Disk usage
    disk = psutil.disk_usage('/')
    disk_usage_percent = (disk.used / disk.total) * 100
    yield Observation(disk_usage_percent, {"resource": "disk", "unit": "percent", "mount": "/"})

# Create observable gauge
system_metrics = meter.create_observable_gauge(
    name="system_resource_usage",
    description="System resource utilization",
    unit="1",
    callbacks=[get_system_metrics]
)

def get_application_metrics(options: CallbackOptions):
    """Callback function to collect application-specific metrics"""
    # Current timestamp
    yield Observation(time.time(), {"metric": "last_update", "unit": "timestamp"})

    # Active threads
    import threading
    active_threads = threading.active_count()
    yield Observation(active_threads, {"metric": "active_threads", "unit": "count"})

app_metrics = meter.create_observable_gauge(
    name="application_metrics",
    description="Application-specific metrics",
    unit="1",
    callbacks=[get_application_metrics]
)
```

### Observable Counter

[Observable Counter](/opentelemetry/metrics#counterobserver) is an asynchronous instrument that measures monotonically increasing values, such as total bytes read or CPU time consumed.

```python
import os
import time

def get_process_metrics(options: CallbackOptions):
    """Callback function to collect process metrics"""
    # Process CPU time
    cpu_times = psutil.Process().cpu_times()
    yield Observation(cpu_times.user, {"cpu_type": "user", "unit": "seconds"})
    yield Observation(cpu_times.system, {"cpu_type": "system", "unit": "seconds"})

    # Process memory info
    memory_info = psutil.Process().memory_info()
    yield Observation(memory_info.rss, {"memory_type": "rss", "unit": "bytes"})
    yield Observation(memory_info.vms, {"memory_type": "vms", "unit": "bytes"})

    # File descriptor count (Unix-like systems)
    try:
        num_fds = psutil.Process().num_fds()
        yield Observation(num_fds, {"resource": "file_descriptors", "unit": "count"})
    except AttributeError:
        # Windows doesn't have num_fds
        pass

process_metrics = meter.create_observable_counter(
    name="process_resource_usage",
    description="Process resource usage counters",
    unit="1",
    callbacks=[get_process_metrics]
)

def get_io_metrics(options: CallbackOptions):
    """Callback function to collect I/O metrics"""
    try:
        io_counters = psutil.Process().io_counters()
        yield Observation(io_counters.read_bytes, {"io_type": "read", "unit": "bytes"})
        yield Observation(io_counters.write_bytes, {"io_type": "write", "unit": "bytes"})
        yield Observation(io_counters.read_count, {"io_type": "read", "unit": "operations"})
        yield Observation(io_counters.write_count, {"io_type": "write", "unit": "operations"})
    except AttributeError:
        # Some systems don't support I/O counters
        pass

io_metrics = meter.create_observable_counter(
    name="process_io_usage",
    description="Process I/O usage counters",
    unit="1",
    callbacks=[get_io_metrics]
)
```

### Observable UpDownCounter

[Observable UpDownCounter](/opentelemetry/metrics#updowncounterobserver) is an asynchronous instrument that measures additive values that can increase or decrease, measured at observation time.

```python
import threading
import queue
import time

# Global state for demonstration
message_queues = {
    "email": queue.Queue(),
    "sms": queue.Queue(),
    "push": queue.Queue()
}

active_workers = {"email": 0, "sms": 0, "push": 0}

def get_queue_metrics(options: CallbackOptions):
    """Callback function to collect queue metrics"""
    for queue_name, q in message_queues.items():
        # Queue size (can go up and down)
        yield Observation(q.qsize(), {"queue": queue_name, "metric": "size"})

        # Active workers (can go up and down)
        workers = active_workers.get(queue_name, 0)
        yield Observation(workers, {"queue": queue_name, "metric": "active_workers"})

queue_metrics = meter.create_observable_up_down_counter(
    name="message_queue_status",
    description="Message queue status metrics",
    unit="1",
    callbacks=[get_queue_metrics]
)

def get_connection_pool_metrics(options: CallbackOptions):
    """Callback function for connection pool metrics"""
    # Simulate connection pool status
    pools = {
        "database": {"active": 5, "idle": 3, "max": 10},
        "redis": {"active": 2, "idle": 8, "max": 10},
        "elasticsearch": {"active": 1, "idle": 4, "max": 5}
    }

    for pool_name, stats in pools.items():
        yield Observation(stats["active"], {"pool": pool_name, "state": "active"})
        yield Observation(stats["idle"], {"pool": pool_name, "state": "idle"})
        yield Observation(stats["max"], {"pool": pool_name, "state": "max"})

connection_pool_metrics = meter.create_observable_up_down_counter(
    name="connection_pool_status",
    description="Connection pool status",
    unit="1",
    callbacks=[get_connection_pool_metrics]
)
```

## Working with Attributes

Attributes provide contextual information that makes metrics more useful for analysis and filtering.

### Adding Attributes to Measurements

```python
# Create various counters and histograms
api_requests = meter.create_counter("api_requests_total", description="Total API requests")
request_duration = meter.create_histogram("request_duration_seconds", description="Request duration")

def handle_api_request(method, endpoint, user_type, region):
    """Handle API request with detailed attributes"""
    start_time = time.time()

    try:
        # Simulate request processing
        processing_time = random.uniform(0.01, 0.3)
        time.sleep(processing_time)

        # Record successful request with detailed attributes
        api_requests.add(1, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "200",
            "user_type": user_type,
            "region": region,
            "cache_hit": "false"
        })

        # Record duration
        duration = time.time() - start_time
        request_duration.record(duration, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "200"
        })

    except Exception as e:
        # Record error
        api_requests.add(1, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "500",
            "user_type": user_type,
            "region": region,
            "error_type": type(e).__name__
        })

        duration = time.time() - start_time
        request_duration.record(duration, {
            "method": method,
            "endpoint": endpoint,
            "status_code": "500"
        })

# Example usage
handle_api_request("GET", "/api/users", "premium", "us-east-1")
handle_api_request("POST", "/api/orders", "free", "eu-west-1")
```

### Attribute Best Practices

Use meaningful attributes that provide valuable differentiation without creating excessive cardinality:

```python
# Good: Low cardinality attributes
http_requests = meter.create_counter("http_requests_total")

def record_request(method, status_code, endpoint_category):
    """Record request with low-cardinality attributes"""
    http_requests.add(1, {
        "method": method,              # Limited values: GET, POST, PUT, DELETE
        "status_class": f"{status_code//100}xx",  # Grouped: 2xx, 3xx, 4xx, 5xx
        "endpoint_category": endpoint_category     # Grouped: api, static, health
    })

# Avoid: High cardinality attributes
def bad_example(method, status_code, user_id, session_id, timestamp):
    """Example of what NOT to do - high cardinality attributes"""
    # DON'T DO THIS - creates too many unique metric series
    http_requests.add(1, {
        "method": method,
        "status_code": status_code,
        "user_id": user_id,        # Unique per user - high cardinality
        "session_id": session_id,  # Unique per session - high cardinality
        "timestamp": timestamp     # Unique per request - very high cardinality
    })
```

## Recording Measurements

### Synchronous Measurements

Synchronous instruments are recorded inline with application logic:

```python
# Create instruments
operation_counter = meter.create_counter("operations_total")
operation_duration = meter.create_histogram("operation_duration_seconds")
error_counter = meter.create_counter("operation_errors_total")

def perform_operation(operation_type, user_id):
    """Perform operation with comprehensive metrics"""
    start_time = time.time()

    try:
        # Increment operation counter
        operation_counter.add(1, {
            "operation": operation_type,
            "status": "started"
        })

        # Simulate operation
        processing_time = random.uniform(0.1, 1.0)
        time.sleep(processing_time)

        # Simulate potential failure
        if random.random() < 0.1:  # 10% failure rate
            raise ValueError("Operation failed")

        # Record successful completion
        operation_counter.add(1, {
            "operation": operation_type,
            "status": "completed"
        })

        return f"Operation {operation_type} completed"

    except Exception as e:
        # Record error
        error_counter.add(1, {
            "operation": operation_type,
            "error_type": type(e).__name__,
            "error_message": str(e)[:50]  # Truncate to avoid high cardinality
        })

        operation_counter.add(1, {
            "operation": operation_type,
            "status": "failed"
        })

        raise

    finally:
        # Always record duration
        duration = time.time() - start_time
        operation_duration.record(duration, {
            "operation": operation_type
        })

# Example usage
try:
    result = perform_operation("user_registration", "user123")
    print(result)
except Exception as e:
    print(f"Operation failed: {e}")
```

### Asynchronous Measurements

Asynchronous instruments use callbacks that are invoked during metric collection:

```python
# Global state for demonstration
system_stats = {
    "cpu_usage": 0.0,
    "memory_usage": 0.0,
    "disk_usage": 0.0,
    "network_connections": 0
}

queue_stats = {
    "email": {"size": 0, "processed": 0},
    "sms": {"size": 0, "processed": 0},
    "push": {"size": 0, "processed": 0}
}

def collect_system_metrics(options: CallbackOptions):
    """Callback to collect system metrics"""
    # Update system stats (in real app, this would call actual system APIs)
    system_stats["cpu_usage"] = psutil.cpu_percent()
    system_stats["memory_usage"] = psutil.virtual_memory().percent
    system_stats["disk_usage"] = psutil.disk_usage('/').percent
    system_stats["network_connections"] = len(psutil.net_connections())

    # Yield observations
    for metric_name, value in system_stats.items():
        yield Observation(value, {"metric": metric_name})

def collect_queue_metrics(options: CallbackOptions):
    """Callback to collect queue metrics"""
    for queue_name, stats in queue_stats.items():
        # Queue size (up/down counter)
        yield Observation(stats["size"], {"queue": queue_name, "metric": "size"})

        # Total processed (counter)
        yield Observation(stats["processed"], {"queue": queue_name, "metric": "processed"})

# Create observable instruments
system_gauge = meter.create_observable_gauge(
    name="system_metrics",
    description="System performance metrics",
    unit="1",
    callbacks=[collect_system_metrics]
)

queue_counter = meter.create_observable_up_down_counter(
    name="queue_metrics",
    description="Queue status metrics",
    unit="1",
    callbacks=[collect_queue_metrics]
)

# Simulate queue operations
def simulate_queue_activity():
    """Simulate queue activity to generate metrics"""
    while True:
        for queue_name in queue_stats:
            # Add items to queue
            new_items = random.randint(0, 5)
            queue_stats[queue_name]["size"] += new_items

            # Process items from queue
            processed = min(queue_stats[queue_name]["size"], random.randint(0, 3))
            queue_stats[queue_name]["size"] -= processed
            queue_stats[queue_name]["processed"] += processed

        time.sleep(2)

# In a real application, you'd run this in a separate thread
# threading.Thread(target=simulate_queue_activity, daemon=True).start()
```

## Practical Examples

### HTTP Server Metrics

```python
import time
import random
from opentelemetry import metrics

class HTTPServerMetrics:
    """Comprehensive HTTP server metrics collection"""

    def __init__(self, meter):
        self.request_counter = meter.create_counter(
            "http_requests_total",
            description="Total HTTP requests",
            unit="1"
        )

        self.request_duration = meter.create_histogram(
            "http_request_duration_seconds",
            description="HTTP request duration",
            unit="s"
        )

        self.active_requests = meter.create_up_down_counter(
            "http_requests_active",
            description="Active HTTP requests",
            unit="1"
        )

        self.response_size = meter.create_histogram(
            "http_response_size_bytes",
            description="HTTP response size",
            unit="by"
        )

        self.error_counter = meter.create_counter(
            "http_errors_total",
            description="Total HTTP errors",
            unit="1"
        )

    def record_request(self, method, route, status_code, duration, response_size):
        """Record metrics for an HTTP request"""
        attributes = {
            "method": method,
            "route": route,
            "status_code": str(status_code)
        }

        # Record request count
        self.request_counter.add(1, attributes)

        # Record duration
        self.request_duration.record(duration, attributes)

        # Record response size
        self.response_size.record(response_size, attributes)

        # Record errors
        if status_code >= 400:
            error_attributes = {
                "method": method,
                "route": route,
                "status_code": str(status_code),
                "error_type": "client_error" if status_code < 500 else "server_error"
            }
            self.error_counter.add(1, error_attributes)

    def start_request(self, method, route):
        """Track active request start"""
        self.active_requests.add(1, {"method": method, "route": route})

    def end_request(self, method, route):
        """Track active request end"""
        self.active_requests.add(-1, {"method": method, "route": route})

# Usage example
meter = metrics.get_meter("http_server", "1.0.0")
server_metrics = HTTPServerMetrics(meter)

def handle_request(method, route):
    """Simulate handling an HTTP request"""
    start_time = time.time()

    # Track active request
    server_metrics.start_request(method, route)

    try:
        # Simulate processing
        processing_time = random.uniform(0.01, 0.5)
        time.sleep(processing_time)

        # Simulate response
        status_code = random.choices([200, 404, 500], weights=[85, 10, 5])[0]
        response_size = random.randint(100, 10000)

        # Record metrics
        duration = time.time() - start_time
        server_metrics.record_request(method, route, status_code, duration, response_size)

        return status_code, response_size

    finally:
        # Always end active request tracking
        server_metrics.end_request(method, route)

# Example usage
handle_request("GET", "/api/users")
handle_request("POST", "/api/orders")
handle_request("GET", "/api/products")
```

### Database Connection Pool Metrics

```python
import threading
import time
import random
from opentelemetry.metrics import CallbackOptions, Observation

class DatabasePoolMetrics:
    """Database connection pool metrics"""

    def __init__(self, meter):
        self.query_counter = meter.create_counter(
            "db_queries_total",
            description="Total database queries",
            unit="1"
        )

        self.query_duration = meter.create_histogram(
            "db_query_duration_seconds",
            description="Database query duration",
            unit="s"
        )

        self.connection_pool_gauge = meter.create_observable_gauge(
            "db_connection_pool_status",
            description="Database connection pool status",
            unit="1",
            callbacks=[self._collect_pool_metrics]
        )

        # Simulated connection pool state
        self.pool_stats = {
            "active": 0,
            "idle": 10,
            "max": 20,
            "total_created": 10,
            "total_closed": 0
        }

        self.lock = threading.Lock()

    def _collect_pool_metrics(self, options: CallbackOptions):
        """Collect connection pool metrics"""
        with self.lock:
            yield Observation(self.pool_stats["active"], {"state": "active"})
            yield Observation(self.pool_stats["idle"], {"state": "idle"})
            yield Observation(self.pool_stats["max"], {"state": "max"})
            yield Observation(self.pool_stats["total_created"], {"state": "total_created"})
            yield Observation(self.pool_stats["total_closed"], {"state": "total_closed"})

    def execute_query(self, query_type, table):
        """Execute a database query with metrics"""
        start_time = time.time()

        # Simulate getting connection from pool
        with self.lock:
            if self.pool_stats["idle"] > 0:
                self.pool_stats["idle"] -= 1
                self.pool_stats["active"] += 1
            else:
                # Would normally wait for connection or create new one
                pass

        try:
            # Simulate query execution
            execution_time = random.uniform(0.001, 0.1)
            time.sleep(execution_time)

            # Record metrics
            duration = time.time() - start_time
            attributes = {
                "query_type": query_type,
                "table": table
            }

            self.query_counter.add(1, attributes)
            self.query_duration.record(duration, attributes)

            return f"Query {query_type} on {table} completed"

        finally:
            # Return connection to pool
            with self.lock:
                self.pool_stats["active"] -= 1
                self.pool_stats["idle"] += 1

# Usage example
meter = metrics.get_meter("database", "1.0.0")
db_metrics = DatabasePoolMetrics(meter)

# Simulate database operations
db_metrics.execute_query("SELECT", "users")
db_metrics.execute_query("INSERT", "orders")
db_metrics.execute_query("UPDATE", "products")
```

### Business Metrics

```python
import time
import random
from opentelemetry.metrics import CallbackOptions, Observation

class BusinessMetrics:
    """Business-specific metrics collection"""

    def __init__(self, meter):
        self.user_registrations = meter.create_counter(
            "user_registrations_total",
            description="Total user registrations",
            unit="1"
        )

        self.order_value = meter.create_histogram(
            "order_value_usd",
            description="Order value in USD",
            unit="USD"
        )

        self.subscription_status = meter.create_observable_up_down_counter(
            "subscriptions_active",
            description="Active subscriptions by plan",
            unit="1",
            callbacks=[self._collect_subscription_metrics]
        )

        self.revenue_gauge = meter.create_observable_gauge(
            "revenue_metrics",
            description="Revenue metrics",
            unit="USD",
            callbacks=[self._collect_revenue_metrics]
        )

        # Simulated business data
        self.subscription_counts = {
            "basic": 1250,
            "premium": 340,
            "enterprise": 45
        }

        self.revenue_data = {
            "monthly_recurring": 50000,
            "one_time": 15000,
            "total": 65000
        }

    def _collect_subscription_metrics(self, options: CallbackOptions):
        """Collect subscription metrics"""
        for plan, count in self.subscription_counts.items():
            yield Observation(count, {"plan": plan})

    def _collect_revenue_metrics(self, options: CallbackOptions):
        """Collect revenue metrics"""
        for revenue_type, amount in self.revenue_data.items():
            yield Observation(amount, {"type": revenue_type})

    def record_user_registration(self, source, plan):
        """Record a new user registration"""
        self.user_registrations.add(1, {
            "source": source,
            "plan": plan,
            "hour": str(time.localtime().tm_hour)  # Hour bucket for analysis
        })

        # Update subscription count
        self.subscription_counts[plan] += 1

    def record_order(self, value, currency, category):
        """Record an order"""
        # Convert to USD for consistent reporting
        usd_value = self._convert_to_usd(value, currency)

        self.order_value.record(usd_value, {
            "category": category,
            "currency": currency,
            "value_range": self._get_value_range(usd_value)
        })

        # Update revenue
        self.revenue_data["one_time"] += usd_value
        self.revenue_data["total"] += usd_value

    def _convert_to_usd(self, value, currency):
        """Convert currency to USD (simplified)"""
        rates = {"USD": 1.0, "EUR": 1.1, "GBP": 1.3, "JPY": 0.007}
        return value * rates.get(currency, 1.0)

    def _get_value_range(self, value):
        """Categorize order value"""
        if value < 10:
            return "small"
        elif value < 100:
            return "medium"
        elif value < 1000:
            return "large"
        else:
            return "premium"

# Usage example
meter = metrics.get_meter("business", "1.0.0")
business_metrics = BusinessMetrics(meter)

# Record business events
business_metrics.record_user_registration("google_ads", "premium")
business_metrics.record_order(99.99, "USD", "electronics")
business_metrics.record_order(49.99, "EUR", "books")
business_metrics.record_order(199.99, "USD", "clothing")
```

## Config and Performance

### Metric Reader Configuration

```python
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

# Configure metric reader with custom intervals
exporter = OTLPMetricExporter(
    endpoint="https://api.uptrace.dev:4317",
    headers={"uptrace-dsn": "your_dsn_here"}
)

# Create reader with custom export interval
reader = PeriodicExportingMetricReader(
    exporter=exporter,
    export_interval_millis=60000,  # Export every 60 seconds
    export_timeout_millis=30000    # 30 second timeout
)

# Create meter provider with reader
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)

# Multiple readers for different destinations
console_reader = PeriodicExportingMetricReader(
    ConsoleMetricExporter(),
    export_interval_millis=10000  # More frequent for debugging
)

provider = MeterProvider(metric_readers=[reader, console_reader])
```

### Memory Management

For long-running applications, consider memory usage:

```python
import gc
import time

def monitor_memory_usage():
    """Monitor and manage memory usage"""
    while True:
        # Force garbage collection periodically
        gc.collect()

        # Monitor memory usage
        memory_usage = psutil.Process().memory_info().rss / 1024 / 1024  # MB

        if memory_usage > 500:  # If using more than 500MB
            print(f"Warning: High memory usage: {memory_usage:.2f} MB")

        time.sleep(30)

# Run in background thread
import threading
threading.Thread(target=monitor_memory_usage, daemon=True).start()
```

### Attribute Optimization

Optimize attribute usage to prevent cardinality explosion:

```python
# Good: Limited attribute values
http_requests = meter.create_counter("http_requests_total")

def record_request(method, status_code, endpoint):
    """Record HTTP request with optimized attributes"""
    # Use status classes instead of exact codes
    status_class = f"{status_code // 100}xx"

    # Categorize endpoints to reduce cardinality
    endpoint_category = categorize_endpoint(endpoint)

    http_requests.add(1, {
        "method": method,              # ~10 possible values
        "status_class": status_class,  # 5 possible values (2xx, 3xx, 4xx, 5xx)
        "endpoint_category": endpoint_category  # ~5 categories
    })
    # Total cardinality: 10 × 5 × 5 = 250 series

def categorize_endpoint(endpoint):
    """Categorize endpoints to reduce cardinality"""
    if endpoint.startswith("/api/"):
        return "api"
    elif endpoint.startswith("/static/"):
        return "static"
    elif endpoint == "/health":
        return "health"
    elif endpoint.startswith("/admin/"):
        return "admin"
    else:
        return "other"

# Avoid: High cardinality attributes
def bad_example(method, status_code, full_url, user_id, timestamp):
    """Example of what NOT to do"""
    # DON'T DO THIS - creates millions of metric series
    http_requests.add(1, {
        "method": method,
        "status_code": str(status_code),  # 50+ possible values
        "full_url": full_url,             # Thousands of unique URLs
        "user_id": user_id,               # Thousands of users
        "timestamp": str(timestamp)       # Infinite unique values
    })
    # This could create millions of unique metric series!
```

## Environment Variables

Configure metrics behavior using environment variables:

```bash
# Metric export settings
export OTEL_METRICS_EXPORTER=otlp
export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=https://api.uptrace.dev:4317
export OTEL_EXPORTER_OTLP_METRICS_HEADERS="uptrace-dsn=YOUR_DSN"

# Collection interval (milliseconds)
export OTEL_METRIC_EXPORT_INTERVAL=60000

# Resource attributes
export OTEL_RESOURCE_ATTRIBUTES="service.name=my-service,service.version=1.0.0"
```

Use environment variables in your application:

```python
import os
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

def setup_metrics():
    """Setup metrics using environment variables"""
    # Get configuration from environment
    endpoint = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")
    headers_str = os.getenv("OTEL_EXPORTER_OTLP_METRICS_HEADERS", "")
    export_interval = int(os.getenv("OTEL_METRIC_EXPORT_INTERVAL", "60000"))

    # Parse headers
    headers = {}
    if headers_str:
        for header in headers_str.split(","):
            if "=" in header:
                key, value = header.split("=", 1)
                headers[key.strip()] = value.strip()

    # Create exporter
    exporter = OTLPMetricExporter(
        endpoint=endpoint,
        headers=headers
    )

    # Create reader
    reader = PeriodicExportingMetricReader(
        exporter=exporter,
        export_interval_millis=export_interval
    )

    # Create and set meter provider
    provider = MeterProvider(metric_readers=[reader])
    metrics.set_meter_provider(provider)

    return provider

# Usage
if __name__ == "__main__":
    setup_metrics()
    meter = metrics.get_meter("my_app", "1.0.0")
```

## Best Practices

### Instrument Naming

Follow OpenTelemetry naming conventions:

```python
# Good: Descriptive, hierarchical names
meter.create_counter("http.requests.total")
meter.create_histogram("http.request.duration")
meter.create_observable_gauge("system.memory.usage")

# Avoid: Generic or unclear names
meter.create_counter("requests")
meter.create_histogram("time")
meter.create_observable_gauge("memory")
```

### Unit Specification

Always specify appropriate units:

```python
meter.create_histogram("request.duration", unit="s")          # seconds
meter.create_observable_gauge("memory.usage", unit="By")      # bytes
meter.create_counter("requests.total", unit="1")              # dimensionless
meter.create_histogram("file.size", unit="By")                # bytes
meter.create_observable_gauge("temperature", unit="Cel")      # Celsius
```

### Error Handling

Handle metric recording errors gracefully:

```python
import logging

logger = logging.getLogger(__name__)

def safe_record_metric(counter, value, attributes):
    """Safely record metric with error handling"""
    try:
        counter.add(value, attributes)
    except Exception as e:
        # Log the error but don't let metrics break your application
        logger.error(f"Failed to record metric: {e}")

# Usage
safe_record_metric(request_counter, 1, {"status": "success"})
```

### Testing Metrics

Create helper functions for testing:

```python
import unittest
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader

class MetricsTestCase(unittest.TestCase):
    def setUp(self):
        """Set up test environment"""
        self.reader = InMemoryMetricReader()
        self.provider = MeterProvider(metric_readers=[self.reader])
        self.meter = self.provider.get_meter("test_meter", "1.0.0")

    def get_metrics(self):
        """Get collected metrics"""
        return self.reader.get_metrics_data()

    def test_counter(self):
        """Test counter functionality"""
        counter = self.meter.create_counter("test_counter")
        counter.add(1, {"key": "value"})
        counter.add(2, {"key": "value"})

        metrics = self.get_metrics()
        # Assert metrics were recorded correctly
        self.assertEqual(len(metrics.resource_metrics), 1)

    def tearDown(self):
        """Clean up after test"""
        self.provider.shutdown()
```

### Performance Monitoring

Monitor the performance impact of metrics collection:

```python
import time
import functools

def time_metric_operation(func):
    """Decorator to measure metric operation performance"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        duration = time.perf_counter() - start_time

        # Only log if operation takes too long
        if duration > 0.001:  # 1ms threshold
            print(f"Metric operation {func.__name__} took {duration:.4f}s")

        return result
    return wrapper

# Usage
@time_metric_operation
def record_business_metric():
    business_counter.add(1, {"type": "important"})
```

## What's Next?

Now that you understand the OpenTelemetry Python Metrics API, explore these related topics:

- [Learn about OpenTelemetry Python Resource Detectors](/get/opentelemetry-python/resources)
- [Learn about OpenTelemetry Python Sampling](/get/opentelemetry-python/sampling)
- [OpenTelemetry Python Tracing API](/get/opentelemetry-python/tracing)
