Reading data from a stream in the AWS SDK
Written by
Updated at January 30, 2024
Python
You can get data from a stream using the get_shard_iterator
and get_record/get_records
methods. When you invoke this method, you should specify the following parameters:
- Name of the stream, e.g.,
example-stream
. - ID of the cloud the stream is located in, e.g.,
b1gi1kuj2dht********
. - YDB database ID with the stream, e.g.,
cc8028jgtuab********
.
You also need to configure the AWS SDK and assign the service account the yds.viewer
role.
To read records from a stream with the parameters specified above:
-
Create a file named
stream_get_records.py
and copy the following code into it:import boto3 from pprint import pprint import itertools def get_records(cloud, database, stream_name): client = boto3.client('kinesis', endpoint_url="https://yds.serverless.yandexcloud.net") StreamName = "/ru-central1/{cloud}/{database}/{stream}".format(cloud=cloud, database=database, stream=stream_name) describe_stream_result = client.describe_stream(StreamName=StreamName) shard_iterators = {} shards = [shard["ShardId"] for shard in describe_stream_result['StreamDescription']['Shards']] for shard_id in itertools.cycle(shards): if shard_id not in shard_iterators: shard_iterators[shard_id] = client.get_shard_iterator(StreamName=StreamName, ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator'] record_response = client.get_records(ShardIterator=shard_iterators[shard_id]) if "Records" in record_response: for record in [record for record in record_response["Records"]]: yield record["Data"] if "NextShardIterator" in record_response: shard_iterators[shard_id] = record_response["NextShardIterator"] if __name__ == '__main__': for record in get_records(cloud="b1gi1kuj2dht********", database="cc8028jgtuab********", stream_name="example-stream"): pprint(record) print("The record has been read successfully")
-
Run the program:
python3 stream_get_records.py
Result:
The record has been read successfully b'{"user_id":"user1","score":100}' The record has been read successfully b'{"user_id":"user1","score":100}' ...