Yandex Cloud
Search
Contact UsGet started
  • Blog
  • Pricing
  • Documentation
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • ML & AI
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Customer Stories
    • Start testing with double trial credits
    • Cloud credits to scale your IT product
    • Gateway to Russia
    • Cloud for Startups
    • Education and Science
    • Yandex Cloud Partner program
  • Blog
  • Pricing
  • Documentation
© 2025 Direct Cursus Technology L.L.C.
Yandex Data Streams
    • All guides
    • Managing data streams
      • Preparing the environment
      • Creating a stream
      • Sending data to a stream
      • Reading data from a stream
      • Deleting a stream
  • Access management
  • Pricing policy
  • FAQ
  1. Step-by-step guides
  2. Working with the AWS SDK
  3. Reading data from a stream

Reading data from a stream in the AWS SDK

Written by
Yandex Cloud
Updated at January 30, 2024
Python

You can get data from a stream using the get_shard_iterator and get_record/get_records methods. When you invoke this method, you should specify the following parameters:

  • Name of the stream, e.g., example-stream.
  • ID of the cloud the stream is located in, e.g., b1gi1kuj2dht********.
  • YDB database ID with the stream, e.g., cc8028jgtuab********.

You also need to configure the AWS SDK and assign the service account the yds.viewer role.

To read records from a stream with the parameters specified above:

  1. Create a file named stream_get_records.py and copy the following code into it:

    import boto3
    from pprint import pprint
    import itertools
    
    def get_records(cloud, database, stream_name):
        client = boto3.client('kinesis', endpoint_url="https://yds.serverless.yandexcloud.net")
    
        StreamName = "/ru-central1/{cloud}/{database}/{stream}".format(cloud=cloud,
                                                                     database=database,
                                                                     stream=stream_name)
    
    
        describe_stream_result = client.describe_stream(StreamName=StreamName)
        shard_iterators = {}
    
        shards = [shard["ShardId"] for shard in describe_stream_result['StreamDescription']['Shards']]
    
        for shard_id in itertools.cycle(shards):
            if shard_id not in shard_iterators:
                shard_iterators[shard_id] = client.get_shard_iterator(StreamName=StreamName,
                                                                     ShardId=shard_id,
                                                                     ShardIteratorType='LATEST')['ShardIterator']
    
            record_response = client.get_records(ShardIterator=shard_iterators[shard_id])
            if "Records" in record_response:
                for record in [record for record in record_response["Records"]]:
                    yield record["Data"]
    
            if "NextShardIterator" in record_response:
                shard_iterators[shard_id] = record_response["NextShardIterator"]
    
    
    if __name__ == '__main__':
        for record in get_records(cloud="b1gi1kuj2dht********",
                                  database="cc8028jgtuab********",
                                  stream_name="example-stream"):
            pprint(record)    
            print("The record has been read successfully")
    
  2. Run the program:

    python3 stream_get_records.py
    

    Result:

    The record has been read successfully
    b'{"user_id":"user1","score":100}'
    The record has been read successfully
    b'{"user_id":"user1","score":100}'
    ...
    

Was the article helpful?

Previous
Sending data to a stream
Next
Deleting a stream
© 2025 Direct Cursus Technology L.L.C.