Создать триггер для Data Streams, который вызывает контейнер Serverless Containers
Создайте триггер для Data Streams, который будет вызывать контейнер Serverless Containers при отправке данных в поток.
Примечание
Триггер для Data Streams принимает и отправляет сообщения только в формате JSON
Перед началом работы
Для создания триггера вам понадобятся:
-
Контейнер, который триггер будет вызывать. Если у вас нет контейнера:
-
(Опционально) Очередь Dead Letter Queue, куда будут перенаправляться сообщения, которые не смог обработать контейнер. Если у вас нет очереди, создайте ее.
-
Сервисные аккаунты с правами:
- на вызов контейнера;
- на чтение из потока, при отправке данных в который триггер будет запускаться;
- (опционально) на запись в очередь Dead Letter Queue.
Вы можете использовать один и тот же сервисный аккаунт или разные. Если у вас нет сервисного аккаунта, создайте его.
-
Поток, при отправке данных в который триггер будет запускаться. Если у вас нет потока, создайте его.
Создать триггер
Примечание
Триггер начинает работать в течение 5 минут после создания.
-
В консоли управления
перейдите в каталог, в котором хотите создать триггер. -
Откройте сервис Serverless Containers.
-
На панели слева выберите
Триггеры. -
Нажмите кнопку Создать триггер.
-
В блоке Базовые параметры:
- Введите имя и описание триггера.
- В поле Тип выберите
Data Streams
. - В поле Запускаемый ресурс выберите
Контейнер
.
-
В блоке Настройки Data Streams выберите поток данных и сервисный аккаунт с правами на чтение из потока данных и запись в него.
-
В блоке Настройки группирования сообщений укажите:
- Время ожидания, с. Допустимые значения от 1 до 60 секунд, значение по умолчанию — 1 секунда.
- Размер группы, Б. Допустимые значения от 1 Б до 64 КБ, значение по умолчанию — 1 Б.
Триггер группирует сообщения не дольше указанного времени ожидания и отправляет их в контейнер. Суммарный объем данных, которые передаются в контейнер, может превышать указанный размер группы, если данные передаются в одном сообщении. Во всех остальных случаях объем данных не превышает размер группы.
-
В блоке Настройки контейнера выберите его и сервисный аккаунт, от имени которого он будет вызываться.
-
(Опционально) В блоке Настройки повторных запросов:
- В поле Интервал укажите время, через которое будет сделан повторный вызов контейнера, если текущий завершился неуспешно. Допустимые значения — от 10 до 60 секунд, значение по умолчанию — 10 секунд.
- В поле Количество попыток укажите количество повторных вызовов контейнера, которые будут сделаны, прежде чем триггер отправит сообщение в Dead Letter Queue. Допустимые значения — от 1 до 5, значение по умолчанию — 1.
-
(Опционально) В блоке Настройки Dead Letter Queue выберите очередь Dead Letter Queue и сервисный аккаунт с правами на запись в нее.
-
Нажмите кнопку Создать триггер.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы создать триггер, который вызывает контейнер, выполните команду:
yc serverless trigger create yds \
--name <имя_триггера> \
--database <размещение_базы_данных> \
--stream <имя_потока_данных> \
--batch-size <размер_группы_сообщений> \
--batch-cutoff <максимальное_время_ожидания> \
--stream-service-account-id <идентификатор_сервисного_аккаунта> \
--invoke-container-id <идентификатор_контейнера> \
--invoke-container-service-account-id <идентификатор_сервисного_аккаунта> \
--retry-attempts <количество_повторных_вызовов> \
--retry-interval <интервал_между_повторными_вызовами> \
--dlq-queue-id <идентификатор_очереди_Dead_Letter_Queue> \
--dlq-service-account-id <идентификатор_сервисного_аккаунта>
Где:
-
--name
— имя триггера. -
--database
— размещение базы данных YDB, к которой привязан поток Data Streams.Чтобы узнать, где размещена база данных, выполните команду
yc ydb database list
. Размещение базы данных указано в столбцеENDPOINT
, в параметреdatabase
, например/ru-central1/b1gia87mbah2********/etn7hehf6gh3********
. -
--stream
— имя потока данных. -
--batch-size
— размер группы сообщений. Необязательный параметр. Допустимые значения от 1 Б до 64 КБ, значение по умолчанию — 1 Б. -
--batch-cutoff
— максимальное время ожидания. Необязательный параметр. Допустимые значения от 1 до 60 секунд, значение по умолчанию — 1 секунда. Триггер группирует сообщения не дольшеbatch-cutoff
и отправляет их в контейнер. Суммарный объем данных, которые передаются в контейнер, может превышатьbatch-size
, если данные передаются в одном сообщении. Во всех остальных случаях объем данных не превышаетbatch-size
. -
--stream-service-account-id
— идентификатор сервисного аккаунта, у которого есть права на чтение из потока данных и запись в него.
--invoke-container-id
— идентификатор контейнера.--invoke-container-service-account-id
— идентификатор сервисного аккаунта с правами на вызов контейнера.--retry-attempts
— количество повторных вызовов, которые будут сделаны, прежде чем триггер отправит сообщение в Dead Letter Queue. Необязательный параметр. Допустимые значения — от 1 до 5, значение по умолчанию — 1.--retry-interval
— время, через которое будет сделан повторный вызов контейнера, если текущий завершился неуспешно. Необязательный параметр. Допустимые значения — от 10 до 60 секунд, значение по умолчанию — 10 секунд.--dlq-queue-id
— идентификатор очереди Dead Letter Queue. Необязательный параметр.--dlq-service-account-id
— идентификатор сервисного аккаунта с правами на запись в очередь Dead Letter Queue. Необязательный параметр.
Результат:
id: a1s5msktijh2********
folder_id: b1gmit33hgh2********
created_at: "2022-10-24T14:07:04.693126923Z"
name: data-streams-trigger
rule:
data_stream:
database: /ru-central1/b1gia87mbah2********/etn7hehh2********
stream: streams-name
service_account_id: ajep8qm0kh2********
batch_settings:
size: "1"
cutoff: 1s
invoke_container:
container_id: bba5jb38o8h2********
service_account_id: aje03adgd2h2********
retry_settings:
retry_attempts: "1"
interval: 10s
dead_letter_queue:
queue-id: yrn:yc:ymq:ru-central1:b1gmit33ngh2********:dlq
service-account-id: aje3lebfemh2********
status: ACTIVE
Terraform
Terraform распространяется под лицензией Business Source License
Подробную информацию о ресурсах провайдера смотрите в документации на сайте Terraform
Если у вас еще нет Terraform, установите его и настройте провайдер Yandex Cloud.
Чтобы создать триггер для Data Streams:
-
Опишите в конфигурационном файле параметры триггера:
resource "yandex_function_trigger" "my_trigger" { name = "<имя_триггера>" container { id = "<идентификатор_контейнера>" service_account_id = "<идентификатор_сервисного_аккаунта>" retry_attempts = "<количество_повторных_вызовов>" retry_interval = "<интервал_между_повторными_вызовами>" } data_streams { stream_name = "<имя_потока_данных>" database = "<размещение_базы_данных>" service_account_id = "<идентификатор_сервисного_аккаунта>" batch_cutoff = "<максимальное_время_ожидания>" batch_size = "<размер_группы_сообщений>" } dlq { queue_id = "<идентификатор_очереди_Dead_Letter_Queue>" service_account_id = "<идентификатор_сервисного_аккаунта>" } }
Где:
-
name
— имя триггера. Формат имени:- длина — от 2 до 63 символов;
- может содержать строчные буквы латинского алфавита, цифры и дефисы;
- первый символ — буква, последний — не дефис.
-
container
— параметры контейнера:id
— идентификатор контейнера.service_account_id
— идентификатор сервисного аккаунта с правами на вызов контейнера.
retry_attempts
— количество повторных вызовов, которые будут сделаны, прежде чем триггер отправит сообщение в Dead Letter Queue. Необязательный параметр. Допустимые значения — от 1 до 5, значение по умолчанию — 1.retry_interval
— время, через которое будет сделан повторный вызов контейнера, если текущий завершился неуспешно. Необязательный параметр. Допустимые значения — от 10 до 60 секунд, значение по умолчанию — 10 секунд.
-
data_streams
— параметры триггера:-
stream_name
— имя потока данных. -
database
— размещение базы данных YDB, к которой привязан поток Data Streams.Чтобы узнать, где размещена база данных, выполните команду
yc ydb database list
. Размещение базы данных указано в столбцеENDPOINT
, в параметреdatabase
, например/ru-central1/b1gia87mba**********/etn7hehf6g*******
. -
service_account_id
— идентификатор сервисного аккаунта, у которого есть права на чтение из потока данных и запись в него. -
batch_cutoff
— максимальное время ожидания. Необязательный параметр. Допустимые значения от 1 до 60 секунд, значение по умолчанию — 1 секунда. Триггер группирует сообщения не дольшеbatch_cutoff
и отправляет их в контейнер. Число сообщений при этом не превышаетbatch_size
. -
batch_size
— размер группы сообщений. Необязательный параметр. Допустимые значения от 1 Б до 64 КБ, значение по умолчанию — 1 Б.
-
dlq
— параметры очереди сообщений Dead Letter Queue:queue_id
— идентификатор очереди Dead Letter Queue. Необязательный параметр.service_account_id
— идентификатор сервисного аккаунта с правами на запись в очередь Dead Letter Queue. Необязательный параметр.
Более подробную информацию о параметрах ресурса
yandex_function_trigger
см. в документации провайдера . -
-
Создайте ресурсы:
-
В терминале перейдите в папку, где вы отредактировали конфигурационный файл.
-
Проверьте корректность конфигурационного файла с помощью команды:
terraform validate
Если конфигурация является корректной, появится сообщение:
Success! The configuration is valid.
-
Выполните команду:
terraform plan
В терминале будет выведен список ресурсов с параметрами. На этом этапе изменения не будут внесены. Если в конфигурации есть ошибки, Terraform на них укажет.
-
Примените изменения конфигурации:
terraform apply
-
Подтвердите изменения: введите в терминале слово
yes
и нажмите Enter.
Terraform создаст все требуемые ресурсы. Проверить появление ресурсов можно в консоли управления
или с помощью команды CLI:yc serverless trigger list
-
Чтобы создать триггер для Data Streams, воспользуйтесь методом REST API create для ресурса Trigger или вызовом gRPC API TriggerService/Create.
Проверить результат
Проверьте, что триггер работает корректно. Для этого посмотрите логи контейнера, в них отображается информация о вызовах.