Process AWS Kinesis Firehose data with Python

2023-09-11

Pipelining streamed data events directly into Amazon S3 via the AWS Kinesis Firehose service is convenient and efficient. And we could use Python with boto3 to consume the data directly from S3. This allows for seamless storage of your data, ensuring its integration and accessibility.

Mostly, we are dealing with JSON-formatted event logs. But there is one tiny stone in the shoe for logs feeding from AWS Kinesis Firehose, there is no newline between consecutive log entries.

The common way to deal with newline seperated JSON entries is like:

# fd is a file descriptor of a S3 object
for line in fd.readlines():
   data = json.loads(line)

Without using newline as separators, the code will only result in malformed JSON errors. After some research, we found a solution that could be easily adopted when using Python, and the solution doesn’t need any configuration in AWS. The answer is actually with another Python JSON parser – ijson. ijson handles JSON from a byte stream, so not only that ijson parses JSON in a memory efficient way, but it also means that ijson could handle those JSON logs without any separators between independent log entries.

The following code snippet is generally how we handle JSON logs collected by AWS Kinesis Firehose from a S3 bucket.

import ijson
import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket('example-bucket-name')

for i in bucket.objects.all():
    resp = s3.meta.client.get_object(Bucket=bucket.name, Key=i.key)
    for data in ijson.items(resp['Body'], '', multiple_values=True):
        # processing data

Note that the param multiple_values is for handling independent JSON objects.