Processing Real-Time Weather Data with AWS Kinesis and Lambda: Storing in S3 and Monitoring with CloudWatch
In this Blog we will dive into how to use amazon kinesis in collecting real time weather information from our IOT app or devices sending it to our s3 bucket and the data which can then be visualized using cloud watch. here we are processing real time events as fast as possible and the data travels through kinesis which will now be checked by Aws Lambda and the latter triggers the lambda functions.
AWS KINESIS
if you have used to Apache kafka, strimzi or debezium kafka. Aws kinesis is aws managed alternative to kafka that ingests and processes data records from streams in real time from thousands of sensors without data loss. it provides accelerated data feed intake from logs, metrics, videos, websites clickstreams.
AWS LAMBDA
lambda is a serverless model which is a managed aws service that allows developers and engineers execute an uploaded code when needed and scales automatically from a few request per day. it is been triggered by various aws services like kinesis, to process the data as it arrives.
to begin we will lay down the steps required for this project
Pre-requisites
STEP 1
We will create an IAM Role with the necessary permission
Login into your Aws management console and navigate to the IAM console and on the sidebar click on IAM role, in the iamrole console navigate to create role, choose AWS services then use-case choose lambda.
Then Enter the role Name, i would use “data-s3-lambda-kinesis” and attach the specific permissions below, then create the role
AmazonKinesisFullAccess
CloudWatchLogsFullAccess
AmazonS3FullAccess
AWSLambdaBasicExecutionRole
AWSXRayFullAccess
AWSLambda_FullAccess
STEP 2
Create the S3 bucket, the bucket will be used to store the data in a json format once the lambda receives the trigger.
navigate to the s3 bucket console and click on create bucket
add the name “data-kinesis-s3-logs”
leave all configurations as default and click on create
STEP 3
we will create the kinesis stream to enable it collect the weather data for real-time processing, i am provisioning 2 shards which will be the consumer each shard ingests up to 1MiB per second and 1000 records per second and emits 2 MiB/sec the capacity mode is “on-demand” and the data retention period is 1 day, once done click on create
STEP 4
Create the lambda functions
Navigate to the lambda console and click on create functions
then author from scratch
name the functions “ datalogs-s3-kinesis”
the runtime python 3.9 version as at the time of this blog, the architecture chosen is x86_64
choose an execution role by using the created role “data-s3-lambda-kinesis”
then you click on create function
Add a Trigger
click on “add trigger”
Seelct “kinesis” from the list of available triggers.
For the kinesis stream, select “kinesis-data-streaming.”
set the batch size to 2 and leave other configurations as default.
click “add” to save the trigger.
Update Lambda Function Code:
in the Lambda console, click on “Code” to access the code source.
Replace the default Lambda function code with the provided python script.
Click “Deploy” to save the updated code.
import json
import base64
import boto3
import logging
import time
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
# List to store records
all_records = []
def write_to_s3(records):
try:
# Specify your S3 bucket and key for the single file
key = 'weather-data/all_records.json'
# Store all records in S3
s3_client.put_object(
Bucket='data-kinesis-s3-logs',
Key=key,
Body=json.dumps(records)
)
# Log success
logger.info(f"Successfully wrote {len(records)} records to S3. Key: {key}")
except Exception as e:
# Log error
logger.error(f"Error writing to S3: {e}")
def lambda_handler(event, context):
global all_records
try:
for record in event['Records']:
try:
# Process the data from the Kinesis record
kinesis_data = record['kinesis']['data']
# Decode base64-encoded data
decoded_data = base64.b64decode(kinesis_data).decode('utf-8')
# Log the decoded data
logger.info(f"Decoded Kinesis Data: {decoded_data}")
# Parse JSON data
data = json.loads(decoded_data)
# Example: Extract relevant data
city = data.get('city', 'UnknownCity')
timestamp = data.get('timestamp', int(time.time()))
temperature = data.get('temperature', 0.0)
humidity = data.get('humidity', 0.0)
wind_speed = data.get('wind_speed', 0.0)
# Format the data
record_data = {
'city': city,
'timestamp': timestamp,
'temperature': temperature,
'humidity': humidity,
'wind_speed': wind_speed
}
# Append the record to the list
all_records.append(record_data)
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON: {e}")
continue # Skip to the next record if there's an error
# Store all records in S3
write_to_s3(all_records)
return {
'statusCode': 200,
'body': json.dumps('All records processed and stored in S3')
}
except Exception as e:
logger.error(f"An error occurred: {e}")
return {
'statusCode': 500,
'body': json.dumps('Internal Server Error')
}
Now we create our own IOT device by running a python code to randomly generate weather data. this can be generated from a server or a local server to run this code
configure the aws cli to enable the stream to your kinesis
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
aws --version
configure your AWS credentials
aws configure
Here You will be prompted to enter:
AWS Access Key ID
AWS Secret Access Key
Default region name
Default output format (e.g., json, text, or table )
then create an environment for the access key and secret access key
export AWS_ACCESS_KEY_ID="insert_your_access_key_id_here"
export AWS_SECRET_ACCESS_KEY="insert_your_secret_access_key_here"
next install python and boto3 which is AWS SDK for python
sudo yum install python3 python3-pip -y
pip3 install boto3
then run this python app
import boto3
import os
import json
import time
import random
# Retrieve AWS credentials from environment variables
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
# Check if credentials are loaded
if not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY:
raise Exception("AWS credentials are not set in environment variables.")
# Initialize the Kinesis client
region_name = 'us-east-1'
# Initialize Kinesis client
kinesis_client = boto3.client('kinesis', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name=region_name)
def put_weather_data(city, temperature, humidity, wind_speed):
try:
data = {
"city": city,
"temperature": temperature,
"humidity": humidity,
"wind_speed": wind_speed,
"timestamp": int(time.time())
}
response = kinesis_client.put_record(
StreamName='kinesis-data-streaming',
Data=json.dumps(data),
PartitionKey='city'
)
print(f"Weather information. SequenceNumber: {response['SequenceNumber']}, ShardId: {response['ShardId']}, Data: {data}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
cities = ["Lagos", "Abuja", "Accra", "Houston", "New york", "ondo", "texas", "london"]
for _ in range(8):
city = random.choice(cities)
temperature = round(random.uniform(-10, 40), 2)
humidity = round(random.uniform(40, 80), 2)
wind_speed = round(random.uniform (0, 100), 2)
put_weather_data(city, temperature, humidity, wind_speed)
time.sleep(2) # Simulating periodic data transmission
python3 lamdba-weather.py
this will produce the weather data in the interval of every second, it printed out the shardId and the sequence number of every data message, the shaedId can be used to trace the data within the stream on kinesis console, we can confirm the data stream was successful.
Mointoring and Validation
Chcek CloudWatch Logs we confirm the Lambda functions execution and success in AWS cloudwatch.
Verify the Data in S3: we check the specified S3 bucket “data-kinesis-s3-logs” to ensure data is being stored correctly
This workflow integrates several AWS services (lambda, kinesis, cloud watch and S3) where kinesis managed the real-time data streaming efficiently, lambda which is a serverless resource used for data processing tasks, we stored the data in S3 bucket for future use. the captured data was analyzed using cloud watch and the IoT device was simulated using python to generate and send weather data, mimicking IoT devices.
thanks for the Read
source code link: https://github.com/A-LPHARM/data-engineering/tree/main/kinesis-lambda-s3