Creating a trigger for Data Streams that invokes a Cloud Functions function
Create a trigger for Data Streams to invoke a Cloud Functions function when data is sent to a stream.
Note
The trigger for Data Streams accepts and sends messages in JSON
Getting started
To create a trigger, you will need:
-
A function to be invoked by the trigger. If you do not have a function:
-
(Optional) A dead-letter queue where messages that could not be processed by a function will be redirected. If you do not have a queue, create one.
-
Service accounts with the following permissions:
- To invoke a function.
- To read from the stream that activates the trigger when data is sent there.
- Optionally, to write to a dead-letter queue.
You can use the same service account or different ones. If you do not have a service account, create one.
-
A stream that will activate the trigger as soon as it receives data. If you do not have a stream, create one.
Creating a trigger
Note
The trigger is initiated within 5 minutes of being created.
-
In the management console
, select the folder where you want to create a trigger. -
Select Cloud Functions.
-
In the left-hand panel, select
Triggers. -
Click Create trigger.
-
Under Basic settings:
- Enter a name and description for the trigger.
- In the Type field, select Data Streams.
- In the Launched resource field, select Function.
-
Under Data Streams settings, select a data stream and a service account with permissions to read data from the stream and write data to it.
-
Under Batch message settings, specify:
- Waiting time, s. The values may range from 1 to 60 seconds. The default value is 1 second.
- Batch size, B. The values may range from 1 B to 64 KB. The default value is 1 B.
The trigger groups messages for a period of time not exceeding the specified timeout and sends them to a function. The total amount of data transmitted to a function may exceed the specified batch size if the data is transmitted as a single message. In all other cases, the amount of data does not exceed the batch size.
-
Under Function settings, select a function and specify:
- Function version tag.
- Service account to invoke the function under.
-
Optionally, under Repeat request settings:
- In the Interval field, specify the time after which the function will be invoked again if the current attempt fails. The values may range from 10 to 60 seconds. The default value is 10 seconds.
- In the Number of attempts field, specify the number of invocation retries before the trigger sends a message to the dead letter queue. The values may range from 1 to 5. The default value is 1.
-
Optionally, under Dead Letter Queue settings, select the dead-letter queue and the service account with write permissions for this queue.
-
Click Create trigger.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID>
command. You can also set a different folder for any specific command using the --folder-name
or --folder-id
parameter.
To create a trigger that invokes a function, run this command:
yc serverless trigger create yds \
--name <trigger_name> \
--database <database_location> \
--stream <stream_name> \
--batch-size <message_batch_size> \
--batch-cutoff <maximum_timeout> \
--stream-service-account-id <service_account_ID> \
--invoke-function-id <function_ID> \
--invoke-function-service-account-id <service_account_ID> \
--retry-attempts <number_of_retry_attempts> \
--retry-interval <interval_between_retry_attempts> \
--dlq-queue-id <dead-letter_queue_ID> \
--dlq-service-account-id <service_account_ID>
Where:
-
--name
: Trigger name. -
--database
: Location of the YDB DB the Data Streams stream is linked to.To find out where the DB is located, run the
yc ydb database list
command. The DB location is specified in theENDPOINT
column, in thedatabase
parameter, e.g.,/ru-central1/b1gia87mba**********/etn7hehf6g*******
. -
--stream
: Name of the Data Streams stream. -
--batch-size
: Message batch size. This is an optional parameter. The values may range from 1 B to 64 KB. The default value is 1 B. -
--batch-cutoff
: Maximum wait time. This is an optional parameter. The values may range from 1 to 60 seconds. The default value is 1 second. The trigger groups messages for a period not exceedingbatch-cutoff
and sends them to a function. The total amount of data transmitted to a function may exceedbatch-size
if the data is transmitted as a single message. In all other cases, the amount of data does not exceedbatch-size
. -
--stream-service-account-id
: ID of the service account with permissions to read from the stream and write to it.
--invoke-function-id
: Function ID.--invoke-function-service-account-id
: ID of the service account with permissions to call the function.--retry-attempts
: Number of invocation retries before the trigger sends a message to the dead-letter queue. This is an optional parameter. The values may range from 1 to 5. The default value is 1.--retry-interval
: Time after which the function will be invoked again if the current attempt fails. This is an optional parameter. The values may range from 10 to 60 seconds. The default value is 10 seconds.--dlq-queue-id
: Dead-letter queue ID. This is an optional parameter.--dlq-service-account-id
: ID of the service account with write permissions for the dead-letter queue. This is an optional parameter.
Result:
id: a1smnfisr5**********
folder_id: b1gc1t4cb6**********
created_at: "2022-10-31T10:57:08.234586266Z"
name: data-streams-trigger
rule:
data_stream:
database: /ru-central1/b1gvlrnlei**********/etn3ege6nj**********
stream: yds-stream
service_account_id: aje07l4q4v**********
batch_settings:
size: "1"
cutoff: 1s
invoke_function:
function_id: d4e155orh3**********
function_tag: $latest
service_account_id: aje07l4q4v**********
retry_settings:
retry_attempts: "1"
interval: 10s
dead_letter_queue:
queue_id: yrn:yc:ymq:ru-central1:b1gc1t4cb6**********:queue_dead
service_account_id: aje07l4q4v**********
status: ACTIVE
With Terraform
Terraform is distributed under the Business Source License
For more information about the provider resources, see the documentation on the Terraform
If you do not have Terraform yet, install it and configure its Yandex Cloud provider.
To create a trigger for Data Streams:
-
In the configuration file, describe the trigger parameters:
resource "yandex_function_trigger" "my_trigger" { name = "<trigger_name>" function { id = "<function_ID>" service_account_id = "<service_account_ID>" retry_attempts = "<number_of_retry_attempts>" retry_interval = "<time_between_retry_attempts>" } data_streams { stream_name = "<stream_name>" database = "<database_location>" service_account_id = "<service_account_ID>" batch_cutoff = "<maximum_timeout>" batch_size = "<message_group_size>" } dlq { queue_id = "<dead-letter_queue_ID>" service_account_id = "<service_account_ID>" } }
Where:
-
name
: Trigger name. The name format is as follows:- It must be from 2 to 63 characters long.
- It can only contain lowercase Latin letters, numbers, and hyphens.
- It must start with a letter and cannot end with a hyphen.
-
description
: Trigger description. -
function
: Function parameters:id
: Function ID.service_account_id
: ID of the service account with permissions to invoke the function.retry_attempts
: Number of invocation retries before the trigger moves a message to the dead-letter queue. This is an optional parameter. The values may range from 1 to 5. The default value is 1.retry_interval
: Time until another attempt is made to invoke the function if the current attempt fails. This is an optional parameter. The values may range from 10 to 60 seconds. The default value is 10 seconds.
-
data_streams
: Trigger parameters:-
stream_name
: Name of the Data Streams stream. -
database
: Location of the YDB DB the Data Streams stream is linked to.To find out where the DB is located, run the
yc ydb database list
command. The DB location is specified in theENDPOINT
column, in thedatabase
parameter, e.g.,/ru-central1/b1gia87mba**********/etn7hehf6g*******
. -
service_account_id
: Service account with permissions to read from and write to the Data Streams stream. -
batch_cutoff
: Maximum wait time. This is an optional parameter. The values may range from 1 to 60 seconds. The default value is 1 second. The trigger groups messages for a period not exceedingbatch-cutoff
and sends them to a function. The total amount of data transmitted to a function may exceedbatch-size
if the data is transmitted as a single message. In all other cases, the amount of data does not exceedbatch-size
. -
batch_size
: Message batch size. This is an optional parameter. The values may range from 1 B to 64 KB. The default value is 1 B.
-
dlq
: Dead-letter queue message parameters:queue_id
: Dead-letter queue ID. This is an optional parameter.service_account_id
: ID of the service account with write permissions to the dead-letter queue. This is an optional parameter.
For more information about the
yandex_function_trigger
resource parameters, see the relevant provider documentation . -
-
Create resources:
-
In the terminal, go to the directory where you edited the configuration file.
-
Make sure the configuration file is correct using this command:
terraform validate
If the configuration is correct, you will get this message:
Success! The configuration is valid.
-
Run this command:
terraform plan
You will see a detailed list of resources. No changes will be made at this step. Terraform will show any errors found in your configuration.
-
Apply the changes:
terraform apply
-
Type
yes
and press Enter to confirm the changes.
Terraform will create all the required resources. You can check the new resources using the management console
or this CLI command:yc serverless trigger list
-
To create a trigger for Data Streams, use the create REST API method for the Trigger resource or the TriggerService/Create gRPC API call.
Checking the result
Check that the trigger operates correctly. Do it by viewing function logs that present information on invocations.