Logstash
Note
You can create a trigger that will launch a function in Cloud Functions or a container in Serverless Containers when data is sent to the stream. Read more about triggers for Data Streams.
-
Download and install Logstash
. -
Install the plugin to support the AWS Kinesis Data Streams protocol. This protocol will be used for streaming data.
sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-kinesis
Note
The plugin uses the Amazon Kinesis Producer Library. It requires the Java Development Kit (JDK)
to work. Download and install it for your platform. At startup, make sure that you are using JDK version 1.8.235 or higher. -
In the management console
, select the folder with the stream. -
Select Data Streams.
-
Select the data stream.
-
Click Connect and go to the Logstash tab.
-
Copy the configuration file example and paste it into the
/usr/share/logstash/bin/mypipeline.conf
file.Sample configuration file:
input { http { port => 8888 } } output { stdout { codec => rubydebug} kinesis { stream_name => "/ru-central1/aoegtvhtp8ob********/cc8004q4lbo6********/test" region => "ru-central-1" verify_certificate => false codec => json_lines randomized_partition_key => true access_key => "<access_key_ID>" secret_key => "<secret_key>" metrics_level => "none" endpoint => "https://yds.serverless.yandexcloud.net" } }
Where:
<access_key_ID>
: Static access key ID<secret_key>
: Secret part of the static access key
-
Start data delivery:
sudo /usr/share/logstash/bin/logstash -f mypipeline.conf
-
Send test data to Logstash:
curl \ --request PUT 'http://127.0.0.1:8888/kinesis' \ --header "content-type: application/json" \ --data '{"user_id":"user1", "score": 100}'
If the setup was a success, a message will appear in the Logstash console about receiving data and sending it to Data Streams over the AWS Kinesis Data Streams protocol:
{ "@version" => "1", "headers" => { "request_path" => "/kinesis", "http_version" => "HTTP/1.1", "content_type" => "application/json", "http_host" => "127.0.0.1:8888", "http_accept" => "*/*", "request_method" => "PUT", "content_length" => "18", "http_user_agent" => "curl/7.68.0" }, "host" => "127.0.0.1", "json" => "message" } Stage 1 Triggers: { stream: '/ru-central1/aoeu1kuk2dht********/cc8029jgtuab********/logstash_stream', manual: 0, count: 0, size: 0, matches: 0, timed: 0, UserRecords: 0, KinesisRecords: 0 } Stage 2 Triggers: { stream: '/ru-central1/aoeu1kuk2dht********/cc8029jgtuab********/logstash_stream', manual: 0, count: 0, size: 0, matches: 0, timed: 1, KinesisRecords: 1, PutRecords: 1 } (test) Average Processing Time: 723 ms