Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Streams
    • Все инструкции
    • Управление потоками данных
      • Подготовка окружения
      • Создание потока данных
      • Отправка данных в поток
      • Чтение данных из потока
      • Удаление потока данных
  • Управление доступом
  • Правила тарификации
  • Вопросы и ответы
  1. Пошаговые инструкции
  2. Работа с AWS SDK
  3. Чтение данных из потока

Чтение данных из потока в AWS SDK

Статья создана
Yandex Cloud
Обновлена 28 декабря 2023 г.
Python

Для чтения записей из потока данных используется пара методов: get_shard_iterator и get_record/get_records. При вызове этого метода необходимо указать следующие параметры:

  • Имя потока данных, например example-stream.
  • Идентификатор облака, в котором находится поток, например b1gi1kuj2dht********.
  • Идентификатор базы данных YDB с потоком, например cc8028jgtuab********.

Вам также потребуется настроить AWS SDK и назначить сервисному аккаунту роль yds.viewer.

Для чтения записей из потока с параметрами, указанными выше:

  1. Создайте файл stream_get_records.py и скопируйте в него следующий код:

    import boto3
    from pprint import pprint
    import itertools
    
    def get_records(cloud, database, stream_name):
        client = boto3.client('kinesis', endpoint_url="https://yds.serverless.yandexcloud.net")
    
        StreamName = "/ru-central1/{cloud}/{database}/{stream}".format(cloud=cloud,
                                                                     database=database,
                                                                     stream=stream_name)
    
    
        describe_stream_result = client.describe_stream(StreamName=StreamName)
        shard_iterators = {}
    
        shards = [shard["ShardId"] for shard in describe_stream_result['StreamDescription']['Shards']]
    
        for shard_id in itertools.cycle(shards):
            if shard_id not in shard_iterators:
                shard_iterators[shard_id] = client.get_shard_iterator(StreamName=StreamName,
                                                                     ShardId=shard_id,
                                                                     ShardIteratorType='LATEST')['ShardIterator']
               
            record_response = client.get_records(ShardIterator=shard_iterators[shard_id])
            if "Records" in record_response:
                for record in [record for record in record_response["Records"]]:
                    yield record["Data"]
    
            if "NextShardIterator" in record_response:
                shard_iterators[shard_id] = record_response["NextShardIterator"]
    
    
    if __name__ == '__main__':
        for record in get_records(cloud="b1gi1kuj2dht********",
                                  database="cc8028jgtuab********",
                                  stream_name="example-stream"):
            pprint(record)    
            print("The record has been read successfully")
    
  2. Запустите программу:

    python3 stream_get_records.py
    

    Результат:

    The record has been read successfully
    b'{"user_id":"user1","score":100}'
    The record has been read successfully
    b'{"user_id":"user1","score":100}'
    ...
    

Была ли статья полезна?

Предыдущая
Отправка данных в поток
Следующая
Удаление потока данных
Проект Яндекса
© 2025 ООО «Яндекс.Облако»