Power of CloudWatch API: A Developer's Toolkit for Custom Metrics
CloudWatch Metrics API provides developers and DevOps engineers with powerful tools to access AWS monitoring data programmatically and publish custom metrics. This technical guide covers everything you need to know about fetching existing metrics and sending custom metrics to CloudWatch.
Looking for a complete reference of available CloudWatch metrics? Check our article AWS CloudWatch Metrics Mastery: The Ultimate Reference List for Every AWS Service for a comprehensive list of metrics for all AWS services.
Technical Prerequisites
Before working with the CloudWatch API, ensure you have:
Required IAM Permissions
At minimum, your IAM role or user needs these permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"cloudwatch:GetMetricData",
"cloudwatch:GetMetricStatistics",
"cloudwatch:ListMetrics",
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}
SDK Setup
Install the AWS SDK for your preferred language:
# Python
pip install boto3
# Node.js
npm install aws-sdk
# AWS CLI
pip install awscli
Retrieving CloudWatch Metrics via API
List Available Metrics
The ListMetrics
API call returns metrics available in your AWS account. You can filter by namespace, metric name, or dimensions.
Python Example:
import boto3
cloudwatch = boto3.client('cloudwatch')
# List all EC2 metrics
response = cloudwatch.list_metrics(
Namespace='AWS/EC2'
)
# List metrics for a specific instance
response = cloudwatch.list_metrics(
Namespace='AWS/EC2',
Dimensions=[
{
'Name': 'InstanceId',
'Value': 'i-1234567890abcdef0'
},
]
)
# Print all metric names
for metric in response['Metrics']:
print(f"Metric Name: {metric['MetricName']}")
print(f"Dimensions: {metric['Dimensions']}")
print("---")
AWS CLI Example:
# List all EC2 metrics
aws cloudwatch list-metrics --namespace AWS/EC2
# List specific metrics
aws cloudwatch list-metrics --namespace AWS/EC2 --metric-name CPUUtilization
Query Metric Data
To retrieve actual metric data points, use GetMetricData
(for querying multiple metrics) or GetMetricStatistics
(for a single metric with statistics).
GetMetricData (Python):
import boto3
from datetime import datetime, timedelta
cloudwatch = boto3.client('cloudwatch')
# Define time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=3)
# Query multiple metrics at once
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
'Id': 'cpu_utilization',
'MetricStat': {
'Metric': {
'Namespace': 'AWS/EC2',
'MetricName': 'CPUUtilization',
'Dimensions': [
{
'Name': 'InstanceId',
'Value': 'i-1234567890abcdef0'
},
]
},
'Period': 300,
'Stat': 'Average'
},
'ReturnData': True
},
{
'Id': 'disk_read_ops',
'MetricStat': {
'Metric': {
'Namespace': 'AWS/EC2',
'MetricName': 'DiskReadOps',
'Dimensions': [
{
'Name': 'InstanceId',
'Value': 'i-1234567890abcdef0'
},
]
},
'Period': 300,
'Stat': 'Sum'
},
'ReturnData': True
}
],
StartTime=start_time,
EndTime=end_time
)
# Process the results
for query_result in response['MetricDataResults']:
print(f"Metric ID: {query_result['Id']}")
print(f"Values: {query_result['Values']}")
print(f"Timestamps: {query_result['Timestamps']}")
print("---")
GetMetricStatistics (Python):
import boto3
from datetime import datetime, timedelta
cloudwatch = boto3.client('cloudwatch')
# Define time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=3)
# Query a single metric with various statistics
response = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[
{
'Name': 'InstanceId',
'Value': 'i-1234567890abcdef0'
},
],
StartTime=start_time,
EndTime=end_time,
Period=300, # 5-minute intervals
Statistics=['Average', 'Maximum', 'Minimum']
)
# Process the results
for datapoint in response['Datapoints']:
print(f"Timestamp: {datapoint['Timestamp']}")
print(f"Average: {datapoint.get('Average', 'N/A')}%")
print(f"Maximum: {datapoint.get('Maximum', 'N/A')}%")
print(f"Minimum: {datapoint.get('Minimum', 'N/A')}%")
print("---")
Optimizing API Queries
When working with the CloudWatch API, consider these optimization techniques:
- Batch requests using
GetMetricData
instead of multipleGetMetricStatistics
calls - Specify appropriate time periods (larger periods for longer time ranges)
- Limit dimensions to only those needed
- Use pagination for large result sets
- Set appropriate statistics (don't request all statistics if you only need one)
Applying Metric Math
CloudWatch Metric Math allows you to perform calculations on metrics to derive new insights:
import boto3
from datetime import datetime, timedelta
cloudwatch = boto3.client('cloudwatch')
# Define time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=1)
# Using Metric Math to calculate free memory percentage
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
'Id': 'total_memory',
'MetricStat': {
'Metric': {
'Namespace': 'CWAgent',
'MetricName': 'mem_total',
'Dimensions': [
{
'Name': 'InstanceId',
'Value': 'i-1234567890abcdef0'
}
]
},
'Period': 300,
'Stat': 'Average'
},
'ReturnData': False
},
{
'Id': 'free_memory',
'MetricStat': {
'Metric': {
'Namespace': 'CWAgent',
'MetricName': 'mem_available',
'Dimensions': [
{
'Name': 'InstanceId',
'Value': 'i-1234567890abcdef0'
}
]
},
'Period': 300,
'Stat': 'Average'
},
'ReturnData': False
},
{
'Id': 'memory_utilization_percent',
'Expression': '100 - (free_memory / total_memory * 100)',
'Label': 'Memory Utilization (%)',
'ReturnData': True
}
],
StartTime=start_time,
EndTime=end_time
)
Common Metric Math Functions
Function | Description | Example |
---|---|---|
SUM | Sum of values | SUM([m1, m2]) |
AVG | Average of values | AVG([m1, m2]) |
MIN | Minimum value | MIN([m1, m2]) |
MAX | Maximum value | MAX([m1, m2]) |
METRICS() | Returns all metrics from query | SUM(METRICS()) |
RATE | Rate of change | RATE(m1) |
DIFF | Difference between points | DIFF(m1) |
PERIOD | Return period of metric | m1 / PERIOD(m1) |
Sending Custom Metrics to CloudWatch
PutMetricData Basics
The PutMetricData
API allows you to publish custom metrics to CloudWatch.
Python Example:
import boto3
import random
from datetime import datetime
cloudwatch = boto3.client('cloudwatch')
# Send a single metric data point
response = cloudwatch.put_metric_data(
Namespace='MyApplication',
MetricData=[
{
'MetricName': 'RequestLatency',
'Dimensions': [
{
'Name': 'Service',
'Value': 'API'
},
{
'Name': 'Region',
'Value': 'us-east-1'
}
],
'Timestamp': datetime.utcnow(),
'Value': random.uniform(0.1, 1.5),
'Unit': 'Seconds'
}
]
)
Node.js Example:
const AWS = require('aws-sdk')
const cloudwatch = new AWS.CloudWatch({ region: 'us-east-1' })
// Send a single metric data point
const params = {
Namespace: 'MyApplication',
MetricData: [
{
MetricName: 'RequestLatency',
Dimensions: [
{
Name: 'Service',
Value: 'API',
},
{
Name: 'Region',
Value: 'us-east-1',
},
],
Timestamp: new Date(),
Value: Math.random() * 1.4 + 0.1,
Unit: 'Seconds',
},
],
}
cloudwatch.putMetricData(params, function (err, data) {
if (err) console.log(err, err.stack)
else console.log('Metric published successfully')
})
AWS CLI Example:
aws cloudwatch put-metric-data \
--namespace "MyApplication" \
--metric-name "RequestLatency" \
--dimensions "Service=API,Region=us-east-1" \
--value 0.85 \
--unit Seconds
Batch Metrics Publishing
CloudWatch allows you to send up to 1,000 metrics in a single API call:
import boto3
import random
from datetime import datetime
cloudwatch = boto3.client('cloudwatch')
# Prepare a batch of metrics
metric_data = []
for service in ['API', 'Database', 'Authentication']:
metric_data.append({
'MetricName': 'RequestLatency',
'Dimensions': [
{
'Name': 'Service',
'Value': service
}
],
'Timestamp': datetime.utcnow(),
'Value': random.uniform(0.1, 1.5),
'Unit': 'Seconds'
})
metric_data.append({
'MetricName': 'RequestCount',
'Dimensions': [
{
'Name': 'Service',
'Value': service
}
],
'Timestamp': datetime.utcnow(),
'Value': random.randint(10, 100),
'Unit': 'Count'
})
# Submit the batch
response = cloudwatch.put_metric_data(
Namespace='MyApplication',
MetricData=metric_data
)
High-Resolution Metrics
CloudWatch supports high-resolution metrics with 1-second granularity:
import boto3
from datetime import datetime
cloudwatch = boto3.client('cloudwatch')
# Send a high-resolution metric
response = cloudwatch.put_metric_data(
Namespace='MyApplication',
MetricData=[
{
'MetricName': 'TransactionLatency',
'Dimensions': [
{
'Name': 'Service',
'Value': 'Payments'
}
],
'Timestamp': datetime.utcnow(),
'Value': 0.35,
'Unit': 'Seconds',
'StorageResolution': 1 # 1-second resolution
}
]
)
Metric Units
When sending metrics, use the appropriate unit:
Metric Type | Recommended Units |
---|---|
Time | Seconds , Milliseconds , Microseconds |
Size | Bytes , Kilobytes , Megabytes , Gigabytes |
Rate | Count/Second , Bytes/Second |
Percentages | Percent |
Counters | Count |
Utilization | Percent |
Creating and Managing AWS CloudWatch Alarms via API
CloudWatch alarms monitor metrics and trigger actions when thresholds are breached. Here's how to program with AWS CloudWatch alarms using the API.
Creating a Basic Metric Alarm
The PutMetricAlarm
operation creates or updates an alarm:
Python Example:
import boto3
cloudwatch = boto3.client('cloudwatch')
# Create a basic CPU alarm
response = cloudwatch.put_metric_alarm(
AlarmName='high-cpu-alarm',
AlarmDescription='Alarm when CPU exceeds 70%',
ActionsEnabled=True,
MetricName='CPUUtilization',
Namespace='AWS/EC2',
Statistic='Average',
Dimensions=[
{
'Name': 'InstanceId',
'Value': 'i-1234567890abcdef0'
},
],
Period=300, # 5 minutes
EvaluationPeriods=2, # Number of periods to evaluate
Threshold=70.0,
ComparisonOperator='GreaterThanThreshold',
AlarmActions=[
'arn:aws:sns:us-east-1:123456789012:alarm-notification'
]
)
Node.js Example:
const AWS = require('aws-sdk')
const cloudwatch = new AWS.CloudWatch({ region: 'us-east-1' })
const params = {
AlarmName: 'high-cpu-alarm',
AlarmDescription: 'Alarm when CPU exceeds 70%',
ActionsEnabled: true,
MetricName: 'CPUUtilization',
Namespace: 'AWS/EC2',
Statistic: 'Average',
Dimensions: [
{
Name: 'InstanceId',
Value: 'i-1234567890abcdef0',
},
],
Period: 300,
EvaluationPeriods: 2,
Threshold: 70.0,
ComparisonOperator: 'GreaterThanThreshold',
AlarmActions: ['arn:aws:sns:us-east-1:123456789012:alarm-notification'],
}
cloudwatch.putMetricAlarm(params, function (err, data) {
if (err) console.log(err, err.stack)
else console.log('Alarm created successfully')
})
Getting Alarm States
Retrieve the state of alarms using the DescribeAlarms
operation:
import boto3
cloudwatch = boto3.client('cloudwatch')
# Get information about all alarms
response = cloudwatch.describe_alarms()
# Print alarm states
for alarm in response['MetricAlarms']:
print(f"Alarm Name: {alarm['AlarmName']}")
print(f"State: {alarm['StateValue']}")
print(f"Reason: {alarm.get('StateReason', 'N/A')}")
print("---")
# Filter alarms by state
alarms_in_alarm_state = cloudwatch.describe_alarms(
StateValue='ALARM'
)
# Filter alarms by name prefix
cpu_alarms = cloudwatch.describe_alarms(
AlarmNamePrefix='cpu-'
)
Creating Composite Alarms
Composite alarms evaluate multiple conditions using a rule expression:
import boto3
cloudwatch = boto3.client('cloudwatch')
# Create a composite alarm
response = cloudwatch.put_composite_alarm(
AlarmName='composite-system-alarm',
AlarmRule='(ALARM("high-cpu-alarm") OR ALARM("high-memory-alarm")) AND ALARM("instance-reachable-alarm")',
AlarmDescription='Triggers when both CPU/memory is high and the instance is reachable',
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-east-1:123456789012:critical-system-notification'
]
)
Setting Up Anomaly Detection Alarms
CloudWatch can create alarms based on anomaly detection bands:
import boto3
cloudwatch = boto3.client('cloudwatch')
# Create an anomaly detection alarm
response = cloudwatch.put_metric_alarm(
AlarmName='traffic-anomaly-alarm',
AlarmDescription='Alarm when traffic is outside normal patterns',
ActionsEnabled=True,
MetricName='RequestCount',
Namespace='AWS/ApplicationELB',
Statistic='Sum',
Dimensions=[
{
'Name': 'LoadBalancer',
'Value': 'app/my-load-balancer/1234567890abcdef'
},
],
Period=300,
EvaluationPeriods=2,
ThresholdMetricId='ad1', # Reference to the anomaly detection model
ComparisonOperator='LessThanLowerOrGreaterThanUpperThreshold',
AlarmActions=[
'arn:aws:sns:us-east-1:123456789012:anomaly-notification'
]
)
# Create the anomaly detection model
response = cloudwatch.put_metric_anomaly_detection(
MetricName='RequestCount',
Namespace='AWS/ApplicationELB',
Dimensions=[
{
'Name': 'LoadBalancer',
'Value': 'app/my-load-balancer/1234567890abcdef'
},
],
Stat='Sum',
Configuration={
'ExcludedTimeRanges': [
{
'StartTime': '2023-01-01T00:00:00Z',
'EndTime': '2023-01-02T00:00:00Z'
}
]
}
)
AWS CloudFormation Integration
You can define CloudWatch alarms in Infrastructure as Code using CloudFormation.
AWS::CloudWatch::Alarm Resource
Here's an example CloudFormation template with an AWS::CloudWatch::MetricAlarm
resource:
Resources:
HighCPUAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: high-cpu-utilization
AlarmDescription: Alarm when CPU exceeds 70%
MetricName: CPUUtilization
Namespace: AWS/EC2
Statistic: Average
Period: 300
EvaluationPeriods: 2
Threshold: 70
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: InstanceId
Value: !Ref MyEC2Instance
AlarmActions:
- !Ref SNSNotificationTopic
InsufficientDataActions:
- !Ref SNSNotificationTopic
SNSNotificationTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName: Alarm Notifications
Subscription:
- Protocol: email
Endpoint: admin@example.com
MyEC2Instance:
Type: AWS::EC2::Instance
Properties:
InstanceType: t3.micro
ImageId: ami-0abcdef1234567890
# Other instance properties...
Managing CloudFormation Stacks via API
You can also create and update CloudFormation stacks programmatically:
import boto3
import json
cloudformation = boto3.client('cloudformation')
# Load the template from a file
with open('cloudwatch-alarms-template.yaml', 'r') as file:
template_body = file.read()
# Create a stack with the template
response = cloudformation.create_stack(
StackName='monitoring-alarms-stack',
TemplateBody=template_body,
Parameters=[
{
'ParameterKey': 'Environment',
'ParameterValue': 'Production'
},
],
Capabilities=['CAPABILITY_IAM'],
OnFailure='ROLLBACK'
)
print(f"Stack creation initiated. Stack ID: {response['StackId']}")
Integration with Amazon EventBridge
Creating Rules for Alarm State Changes
You can create EventBridge rules to take automated actions when alarms change state:
import boto3
events = boto3.client('events')
# Create a rule for alarm state changes
response = events.put_rule(
Name='cloudwatch-alarm-state-change',
EventPattern=json.dumps({
"source": ["aws.cloudwatch"],
"detail-type": ["CloudWatch Alarm State Change"],
"detail": {
"state": {
"value": ["ALARM"]
}
}
}),
State='ENABLED',
Description='Trigger when any CloudWatch alarm enters ALARM state'
)
# Add a target to invoke a Lambda function
response = events.put_targets(
Rule='cloudwatch-alarm-state-change',
Targets=[
{
'Id': '1',
'Arn': 'arn:aws:lambda:us-east-1:123456789012:function:handle-alarm'
}
]
)
Creating Rules for Specific Metrics
This example creates a rule that triggers when a specific metric crosses a threshold, without creating an alarm:
import boto3
import json
events = boto3.client('events')
# Create a rule for metric threshold breach
response = events.put_rule(
Name='cpu-spike-detection',
EventPattern=json.dumps({
"source": ["aws.cloudwatch"],
"detail-type": ["CloudWatch Metric Alarm State Change"],
"resources": ["arn:aws:cloudwatch:us-east-1:123456789012:alarm:high-cpu-alarm"]
}),
State='ENABLED',
Description='Detect CPU spikes via CloudWatch metric alarm'
)
# Add a target to send an SNS notification
response = events.put_targets(
Rule='cpu-spike-detection',
Targets=[
{
'Id': '1',
'Arn': 'arn:aws:sns:us-east-1:123456789012:ops-alerts'
}
]
)
Integration with Auto Scaling
You can use custom metrics to drive Auto Scaling policies:
import boto3
autoscaling = boto3.client('autoscaling')
# Create a scaling policy based on a custom metric
response = autoscaling.put_scaling_policy(
AutoScalingGroupName='my-application-asg',
PolicyName='custom-metric-scale-out',
PolicyType='TargetTrackingScaling',
TargetTrackingConfiguration={
'CustomizedMetricSpecification': {
'MetricName': 'RequestsPerInstance',
'Namespace': 'MyApplication',
'Dimensions': [
{
'Name': 'AutoScalingGroupName',
'Value': 'my-application-asg'
}
],
'Statistic': 'Average'
},
'TargetValue': 1000.0, # Target requests per instance
'DisableScaleIn': False
}
)
Advanced API Examples in Different Languages
Java Example:
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.cloudwatch.model.*;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
public class CloudWatchMetricsExample {
public static void main(String[] args) {
AmazonCloudWatch cloudWatch = AmazonCloudWatchClientBuilder.defaultClient();
// Create custom dimensions
Dimension serviceDimension = new Dimension()
.withName("Service")
.withValue("Payment");
// Create a metric data point
MetricDatum datum = new MetricDatum()
.withMetricName("TransactionLatency")
.withUnit(StandardUnit.Milliseconds)
.withValue(42.2)
.withTimestamp(Instant.now())
.withDimensions(serviceDimension);
// Add the metric to the request
PutMetricDataRequest request = new PutMetricDataRequest()
.withNamespace("MyApplication")
.withMetricData(datum);
// Send the request
PutMetricDataResult response = cloudWatch.putMetricData(request);
System.out.println("Successfully published metric data");
// Create an alarm
PutMetricAlarmRequest alarmRequest = new PutMetricAlarmRequest()
.withAlarmName("HighTransactionLatency")
.withAlarmDescription("Alarm when transaction latency exceeds 100ms")
.withActionsEnabled(true)
.withMetricName("TransactionLatency")
.withNamespace("MyApplication")
.withStatistic(Statistic.Average)
.withDimensions(serviceDimension)
.withPeriod(60)
.withEvaluationPeriods(1)
.withThreshold(100.0)
.withComparisonOperator(ComparisonOperator.GreaterThanThreshold);
// Send the alarm creation request
PutMetricAlarmResult alarmResponse = cloudWatch.putMetricAlarm(alarmRequest);
System.out.println("Successfully created alarm");
}
}
Go Example:
package main
import (
"fmt"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
)
func main() {
// Create AWS session
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
// Create CloudWatch client
svc := cloudwatch.New(sess)
// Create metric data
metric := &cloudwatch.MetricDatum{
MetricName: aws.String("TransactionLatency"),
Unit: aws.String("Milliseconds"),
Value: aws.Float64(42.2),
Timestamp: aws.Time(time.Now()),
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("Service"),
Value: aws.String("Payment"),
},
},
}
// Create the PutMetricData input
input := &cloudwatch.PutMetricDataInput{
Namespace: aws.String("MyApplication"),
MetricData: []*cloudwatch.MetricDatum{metric},
}
// Send the metric data to CloudWatch
result, err := svc.PutMetricData(input)
if err != nil {
fmt.Println("Error adding metrics:", err)
return
}
fmt.Println("Successfully added metrics:", result)
// Create an alarm
alarmInput := &cloudwatch.PutMetricAlarmInput{
AlarmName: aws.String("HighTransactionLatency"),
AlarmDescription: aws.String("Alarm when transaction latency exceeds 100ms"),
ActionsEnabled: aws.Bool(true),
MetricName: aws.String("TransactionLatency"),
Namespace: aws.String("MyApplication"),
Statistic: aws.String("Average"),
Period: aws.Int64(60),
EvaluationPeriods: aws.Int64(1),
Threshold: aws.Float64(100.0),
ComparisonOperator: aws.String("GreaterThanThreshold"),
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("Service"),
Value: aws.String("Payment"),
},
},
}
// Create the alarm
alarmResult, err := svc.PutMetricAlarm(alarmInput)
if err != nil {
fmt.Println("Error creating alarm:", err)
return
}
fmt.Println("Successfully created alarm:", alarmResult)
}
Ruby Example:
require 'aws-sdk-cloudwatch'
# Create CloudWatch client
cloudwatch = Aws::CloudWatch::Client.new(region: 'us-east-1')
# Create metric data
metric_data = {
namespace: 'MyApplication',
metric_data: [
{
metric_name: 'TransactionLatency',
dimensions: [
{
name: 'Service',
value: 'Payment'
}
],
timestamp: Time.now,
value: 42.2,
unit: 'Milliseconds'
}
]
}
# Send the metric data to CloudWatch
begin
cloudwatch.put_metric_data(metric_data)
puts 'Successfully published metric data'
rescue Aws::CloudWatch::Errors::ServiceError => e
puts "Error publishing metric data: #{e}"
end
# Create an alarm
alarm_params = {
alarm_name: 'HighTransactionLatency',
alarm_description: 'Alarm when transaction latency exceeds 100ms',
actions_enabled: true,
metric_name: 'TransactionLatency',
namespace: 'MyApplication',
statistic: 'Average',
dimensions: [
{
name: 'Service',
value: 'Payment'
}
],
period: 60,
evaluation_periods: 1,
threshold: 100.0,
comparison_operator: 'GreaterThanThreshold'
}
# Create the alarm
begin
cloudwatch.put_metric_alarm(alarm_params)
puts 'Successfully created alarm'
rescue Aws::CloudWatch::Errors::ServiceError => e
puts "Error creating alarm: #{e}"
end
Monitoring API Usage and Handling Errors
Tracking API Limits and Usage
Monitor your API usage to avoid throttling:
import boto3
from datetime import datetime, timedelta
cloudwatch = boto3.client('cloudwatch')
# Get API usage metrics
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
'Id': 'api_calls',
'MetricStat': {
'Metric': {
'Namespace': 'AWS/Usage',
'MetricName': 'CallCount',
'Dimensions': [
{
'Name': 'Service',
'Value': 'CloudWatch'
},
{
'Name': 'Type',
'Value': 'API'
},
{
'Name': 'Resource',
'Value': 'PutMetricData'
}
]
},
'Period': 3600,
'Stat': 'Sum'
},
'ReturnData': True
}
],
StartTime=datetime.utcnow() - timedelta(days=1),
EndTime=datetime.utcnow()
)
print("API call usage:")
for result in response['MetricDataResults']:
print(f"ID: {result['Id']}")
print(f"Values: {result['Values']}")
print(f"Timestamps: {result['Timestamps']}")
Comprehensive Error Handling
A robust error handling approach for CloudWatch API:
import boto3
import time
from botocore.exceptions import ClientError, ParamValidationError
def put_metric_with_comprehensive_error_handling(namespace, metric_data, max_retries=5):
cloudwatch = boto3.client('cloudwatch')
retry = 0
while retry <= max_retries:
try:
response = cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=metric_data
)
return response
except ClientError as e:
error_code = e.response['Error']['Code']
# Handle different error types
if error_code == 'Throttling':
wait_time = 2 ** retry
print(f"Request throttled, retrying in {wait_time} seconds...")
time.sleep(wait_time)
retry += 1
elif error_code == 'InvalidParameterValue':
print(f"Parameter validation failed: {e}")
# Fix specific parameter issues
if 'Namespace' in str(e):
namespace = namespace.replace('/', '.')
retry += 1
continue
else:
# Can't auto-fix, reraise
raise
elif error_code == 'MissingParameter':
print(f"Missing required parameter: {e}")
raise
else:
# Other AWS service errors
print(f"AWS service error: {error_code} - {e}")
raise
except ParamValidationError as e:
# Local parameter validation failed
print(f"Parameter validation failed: {e}")
raise
except Exception as e:
# Unexpected errors
print(f"Unexpected error: {e}")
raise
raise Exception("Maximum retries exceeded")
# Usage example
try:
put_metric_with_comprehensive_error_handling(
'My/Application',
[
{
'MetricName': 'TransactionLatency',
'Dimensions': [
{
'Name': 'Service',
'Value': 'Payment'
}
],
'Timestamp': datetime.utcnow(),
'Value': 42.2,
'Unit': 'Milliseconds'
}
]
)
print("Successfully published metric")
except Exception as e:
print(f"Failed to publish metric: {e}")
Uptrace as an alternative to CloudWatch
While CloudWatch is AWS's native monitoring solution, it has some limitations:
- Higher costs for custom metrics and detailed monitoring
- 15-month maximum retention period
- Limited analytics and visualization capabilities
- Lack of integrated distributed tracing
Uptrace offers an alternative with:
- More cost-efficient pricing model for high-volume metrics
- Built-in distributed tracing integration
- Longer retention periods
- Advanced dashboard and analytics tools
- OpenTelemetry-based collection
Technical Comparison
Feature | CloudWatch | Uptrace |
---|---|---|
Retention | 15 months max | Configurable, up to years |
Resolution | 1-second to 1-minute | Up to nanosecond precision |
Query Language | Metric Math | SQL-based and PromQL |
Tracing Integration | Requires X-Ray | Built-in |
Collection Protocol | AWS API | OpenTelemetry |
Custom Metric Cost | $0.30 per metric/month | Volumetric pricing |
Visualization | Basic dashboards | Advanced dashboards and heatmaps |
Integrating CloudWatch with Uptrace
If you want to leverage both CloudWatch's tight AWS integration and Uptrace's advanced analytics, you can set up data forwarding using AWS Data Firehose or Prometheus with yet-another-cloudwatch-exporter.
For step-by-step instructions on how to integrate AWS CloudWatch metrics and logs with Uptrace, refer to Uptrace's detailed guide on CloudWatch integrations.
FAQ
1. How do I reduce CloudWatch API costs?
- Batch your metrics: Use batch operations like
PutMetricData
with multiple data points - Reduce frequency: Consider if you really need high-resolution metrics
- Limit dimensions: Only use dimensions that are necessary for analysis
- Use metric filters: For log-based metrics, use CloudWatch Logs Metric Filters
- Implement caching: Aggregate metrics locally before sending
2. What's the difference between PutMetricData and PutMetricAlarm?
PutMetricData
submits data points for a metric, while PutMetricAlarm
creates or updates an alarm that watches a metric. Use PutMetricData
to report values, and PutMetricAlarm
to set up notifications or actions when those values cross thresholds.
3. How do I troubleshoot missing custom metrics?
- Check IAM permissions for CloudWatch
- Verify the namespace and metric name are consistent
- Ensure timestamp values are within the valid range
- Check API call success with proper error handling
- Confirm you're checking the right AWS region
4. Can I delete or modify metrics after publishing? No, CloudWatch metrics cannot be deleted or modified after they are published. If you publish incorrect data, you can only wait for it to expire based on the retention period or overwrite it with new data points if your use case allows.
5. How do I create a dashboard programmatically?
Use the PutDashboard
API call:
import boto3
import json
cloudwatch = boto3.client('cloudwatch')
dashboard_body = {
"widgets": [
{
"type": "metric",
"x": 0,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["MyApplication", "TransactionLatency", "Service", "Payment"],
["MyApplication", "RequestCount", "Service", "Payment"]
],
"period": 300,
"stat": "Average",
"region": "us-east-1",
"title": "Payment Service Metrics"
}
}
]
}
response = cloudwatch.put_dashboard(
DashboardName="PaymentServiceDashboard",
DashboardBody=json.dumps(dashboard_body)
)
print(f"Dashboard creation status: {response['DashboardValidationMessages']}")
Conclusion
The CloudWatch Metrics API provides powerful capabilities for monitoring AWS resources and custom applications. By understanding how to effectively query existing metrics, publish custom metrics, and manage alarms, you can build comprehensive monitoring solutions for your infrastructure.
For the most up-to-date information, refer to the official AWS CloudWatch API documentation:
You may also be interested in:
Table of Contents