Создать триггер для Data Streams, который вызывает функцию Cloud Functions
Создайте триггер для Data Streams, который будет вызывать функцию Cloud Functions при отправке данных в поток.
Перед началом работы
Для создания триггера вам понадобятся:
-
Функция, которую триггер будет вызывать. Если у вас нет функции:
-
(Опционально) Очередь Dead Letter Queue, куда будут перенаправляться сообщения, которые не смогла обработать функция. Если у вас нет очереди, создайте ее.
-
Сервисные аккаунты с правами:
- на вызов функции;
- на чтение из потока, при отправке данных в который триггер будет запускаться;
- (опционально) на запись в очередь Dead Letter Queue.
Вы можете использовать один и тот же сервисный аккаунт или разные. Если у вас нет сервисного аккаунта, создайте его.
-
Поток, при отправке данных в который триггер будет запускаться. Если у вас нет потока, создайте его.
Создать триггер
Примечание
Триггер начинает работать в течение 5 минут после создания.
-
В консоли управления
перейдите в каталог, в котором хотите создать триггер. -
Выберите сервис Cloud Functions.
-
На панели слева выберите
Триггеры. -
Нажмите кнопку Создать триггер.
-
В блоке Базовые параметры:
- Введите имя и описание триггера.
- В поле Тип выберите Data Streams.
- В поле Запускаемый ресурс выберите Функция.
-
В блоке Настройки Data Streams выберите поток данных и сервисный аккаунт с правами на чтение из потока данных и запись в него.
-
В блоке Настройки группирования сообщений укажите:
- Время ожидания, с. Допустимые значения от 1 до 60 секунд, значение по умолчанию — 1 секунда.
- Размер группы, Б. Допустимые значения от 1 Б до 64 КБ, значение по умолчанию — 1 Б.
Триггер группирует сообщения не дольше указанного времени ожидания и отправляет их в функцию. Суммарный объем данных, которые передаются в функцию, может превышать указанный размер группы, если данные передаются в одном сообщении. Во всех остальных случаях объем данных не превышает размер группы.
-
В блоке Настройки функции выберите функцию и укажите:
- Тег версии функции.
- Сервисный аккаунт, от имени которого будет вызываться функция.
-
(Опционально) В блоке Настройки повторных запросов:
- В поле Интервал укажите время, через которое будет сделан повторный вызов функции, если текущий завершился неуспешно. Допустимые значения — от 10 до 60 секунд, значение по умолчанию — 10 секунд.
- В поле Количество попыток укажите количество повторных вызовов функции, которые будут сделаны, прежде чем триггер отправит сообщение в Dead Letter Queue. Допустимые значения — от 1 до 5, значение по умолчанию — 1.
-
(Опционально) В блоке Настройки Dead Letter Queue выберите очередь Dead Letter Queue и сервисный аккаунт с правами на запись в нее.
-
Нажмите кнопку Создать триггер.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы создать триггер, который вызывает функцию, выполните команду:
yc serverless trigger create yds \
--name <имя_триггера> \
--database <размещение_базы_данных> \
--stream <имя_потока_данных> \
--batch-size <размер_группы_сообщений> \
--batch-cutoff <максимальное_время_ожидания> \
--stream-service-account-id <идентификатор_сервисного_аккаунта> \
--invoke-function-id <идентификатор_функции> \
--invoke-function-service-account-id <идентификатор_сервисного_аккаунта> \
--retry-attempts <количество_повторных_вызовов> \
--retry-interval <интервал_между_повторными_вызовами> \
--dlq-queue-id <идентификатор_очереди_Dead_Letter_Queue> \
--dlq-service-account-id <идентификатор_сервисного_аккаунта>
Где:
-
--name
— имя триггера. -
--database
— размещение базы данных YDB, к которой привязан поток Data Streams.Чтобы узнать, где размещена база данных, выполните команду
yc ydb database list
. Размещение базы данных указано в столбцеENDPOINT
, в параметреdatabase
, например/ru-central1/b1gia87mba**********/etn7hehf6g*******
. -
--stream
— имя потока данных Data Streams. -
--batch-size
— размер группы сообщений. Необязательный параметр. Допустимые значения от 1 Б до 64 КБ, значение по умолчанию — 1 Б. -
--batch-cutoff
— максимальное время ожидания. Необязательный параметр. Допустимые значения от 1 до 60 секунд, значение по умолчанию — 1 секунда. Триггер группирует сообщения не дольшеbatch-cutoff
и отправляет их в функцию. Суммарный объем данных, которые передаются в функцию, может превышатьbatch-size
, если данные передаются в одном сообщении. Во всех остальных случаях объем данных не превышаетbatch-size
. -
--stream-service-account-id
— идентификатор сервисного аккаунта, у которого есть права на чтение из потока данных и запись в него.
--invoke-function-id
— идентификатор функции.--invoke-function-service-account-id
— идентификатор сервисного аккаунта с правами на вызов функции.--retry-attempts
— количество повторных вызовов, которые будут сделаны, прежде чем триггер отправит сообщение в Dead Letter Queue. Необязательный параметр. Допустимые значения — от 1 до 5, значение по умолчанию — 1.--retry-interval
— время, через которое будет сделан повторный вызов функции, если текущий завершился неуспешно. Необязательный параметр. Допустимые значения — от 10 до 60 секунд, значение по умолчанию — 10 секунд.--dlq-queue-id
— идентификатор очереди Dead Letter Queue. Необязательный параметр.--dlq-service-account-id
— идентификатор сервисного аккаунта с правами на запись в очередь Dead Letter Queue. Необязательный параметр.
Результат:
id: a1smnfisr5**********
folder_id: b1gc1t4cb6**********
created_at: "2022-10-31T10:57:08.234586266Z"
name: data-streams-trigger
rule:
data_stream:
database: /ru-central1/b1gvlrnlei**********/etn3ege6nj**********
stream: yds-stream
service_account_id: aje07l4q4v**********
batch_settings:
size: "1"
cutoff: 1s
invoke_function:
function_id: d4e155orh3**********
function_tag: $latest
service_account_id: aje07l4q4v**********
retry_settings:
retry_attempts: "1"
interval: 10s
dead_letter_queue:
queue_id: yrn:yc:ymq:ru-central1:b1gc1t4cb6**********:queue_dead
service_account_id: aje07l4q4v**********
status: ACTIVE
Terraform
Terraform распространяется под лицензией Business Source License
Подробную информацию о ресурсах провайдера смотрите в документации на сайте Terraform
Если у вас еще нет Terraform, установите его и настройте провайдер Yandex Cloud.
Чтобы создать триггер для Data Streams:
-
Опишите в конфигурационном файле параметры триггера:
resource "yandex_function_trigger" "my_trigger" { name = "<имя_триггера>" function { id = "<идентификатор_функции>" service_account_id = "<идентификатор_сервисного_аккаунта>" retry_attempts = "<количество_повторных_вызовов>" retry_interval = "<интервал_между_повторными_вызовами>" } data_streams { stream_name = "<имя_потока_данных>" database = "<размещение_базы_данных>" service_account_id = "<идентификатор_сервисного_аккаунта>" batch_cutoff = "<максимальное_время_ожидания>" batch_size = "<размер_группы_сообщений>" } dlq { queue_id = "<идентификатор_очереди_Dead_Letter_Queue>" service_account_id = "<идентификатор_сервисного_аккаунта>" } }
Где:
-
name
— имя триггера. Формат имени:- длина — от 3 до 63 символов;
- может содержать строчные буквы латинского алфавита, цифры и дефисы;
- первый символ — буква, последний — не дефис.
-
description
— описание триггера. -
function
— параметры функции:id
— идентификатор функции.service_account_id
— идентификатор сервисного аккаунта с правами на вызов функции.retry_attempts
— количество повторных вызовов, которые будут сделаны, прежде чем триггер отправит сообщение в Dead Letter Queue. Необязательный параметр. Допустимые значения — от 1 до 5, значение по умолчанию — 1.retry_interval
— время, через которое будет сделан повторный вызов функции, если текущий завершился неуспешно. Необязательный параметр. Допустимые значения — от 10 до 60 секунд, значение по умолчанию — 10 секунд.
-
data_streams
— параметры триггера:-
stream_name
— имя потока данных Data Streams. -
database
— размещение базы данных YDB, к которой привязан поток Data Streams.Чтобы узнать, где размещена база данных, выполните команду
yc ydb database list
. Размещение базы данных указано в столбцеENDPOINT
, в параметреdatabase
, например/ru-central1/b1gia87mba**********/etn7hehf6g*******
. -
service_account_id
— сервисный аккаунт с правами на чтение из потока Data Streams и запись в него. -
batch_cutoff
— максимальное время ожидания. Необязательный параметр. Допустимые значения от 1 до 60 секунд, значение по умолчанию — 1 секунда. Триггер группирует сообщения не дольшеbatch-cutoff
и отправляет их в функцию. Суммарный объем данных, которые передаются в функцию, может превышатьbatch-size
, если данные передаются в одном сообщении. Во всех остальных случаях объем данных не превышаетbatch-size
. -
batch_size
— размер группы сообщений. Необязательный параметр. Допустимые значения от 1 Б до 64 КБ, значение по умолчанию — 1 Б.
-
dlq
— параметры очереди сообщений Dead Letter Queue:queue_id
— идентификатор очереди Dead Letter Queue. Необязательный параметр.service_account_id
— идентификатор сервисного аккаунта с правами на запись в очередь Dead Letter Queue. Необязательный параметр.
Более подробную информацию о параметрах ресурса
yandex_function_trigger
см. в документации провайдера . -
-
Создайте ресурсы:
-
В терминале перейдите в папку, где вы отредактировали конфигурационный файл.
-
Проверьте корректность конфигурационного файла с помощью команды:
terraform validate
Если конфигурация является корректной, появится сообщение:
Success! The configuration is valid.
-
Выполните команду:
terraform plan
В терминале будет выведен список ресурсов с параметрами. На этом этапе изменения не будут внесены. Если в конфигурации есть ошибки, Terraform на них укажет.
-
Примените изменения конфигурации:
terraform apply
-
Подтвердите изменения: введите в терминале слово
yes
и нажмите Enter.
Terraform создаст все требуемые ресурсы. Проверить появление ресурсов можно в консоли управления
или с помощью команды CLI:yc serverless trigger list
-
Чтобы создать триггер для Data Streams, воспользуйтесь методом REST API create для ресурса Trigger или вызовом gRPC API TriggerService/Create.
Проверить результат
Проверьте, что триггер работает корректно. Для этого посмотрите логи функции, в них отображается информация о вызовах.