arundhaj

all that is technology

Getting started with AWS Kinesis using Python

 

Amazon Kinesis is a fully managed stream hosted on AWS. It is used to collect and process large streams of data in real time. Along with Kinesis Analytics, Kinesis Firehose, AWS Lambda, AWS S3, AWS EMR you can build a robust distributed application to power your real-time monitoring dashboards, do massive scale batch analytics, etc.

First create a Kinesis stream using the following aws-cli command

> aws kinesis create-stream --stream-name python-stream --shard-count 1

The following code, say kinesis_producer.py will put records to the stream continuosly every 5 seconds

import boto3
import json
from datetime import datetime
import calendar
import random
import time

my_stream_name = 'python-stream'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

def put_to_stream(thing_id, property_value, property_timestamp):
    payload = {
                'prop': str(property_value),
                'timestamp': str(property_timestamp),
                'thing_id': thing_id
              }

    print payload

    put_response = kinesis_client.put_record(
                        StreamName=my_stream_name,
                        Data=json.dumps(payload),
                        PartitionKey=thing_id)

while True:
    property_value = random.randint(40, 120)
    property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
    thing_id = 'aa-bb'

    put_to_stream(thing_id, property_value, property_timestamp)

    # wait for 5 second
    time.sleep(5)

Start consuming with kinesis_consumer.py as shown below. The following consumer will start consuming the data as the producer puts to the stream.

import boto3
import json
from datetime import datetime
import time

my_stream_name = 'python-stream'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

response = kinesis_client.describe_stream(StreamName=my_stream_name)

my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']

shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
                                                      ShardId=my_shard_id,
                                                      ShardIteratorType='LATEST')

my_shard_iterator = shard_iterator['ShardIterator']

record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,
                                              Limit=2)

while 'NextShardIterator' in record_response:
    record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],
                                                  Limit=2)

    print record_response

    # wait for 5 seconds
    time.sleep(5)

Assuming you have the credentials appropriately configured.

Hope this helps!

Comments