Работа с топиками Apache Kafka® с помощью PySpark-заданий в Yandex Data Processing
Кластеры Yandex Data Processing поддерживают интеграцию с кластерами Managed Service for Apache Kafka®. Вы можете записывать сообщения в топики Apache Kafka® и читать сообщения из топиков с помощью PySpark-заданий. При чтении поддерживается пакетная обработка (batch processing) и потоковая обработка (stream processing).
Чтобы настроить интеграцию между кластерами Managed Service for Apache Kafka® и Yandex Data Processing:
Если созданные ресурсы вам больше не нужны, удалите их.
Подготовьте инфраструктуру
-
Создайте облачную сеть
dataproc-network
без подсетей. -
Создайте подсеть
dataproc-subnet-b
в зоне доступностиru-central1-b
. -
Настройте NAT-шлюз для подсети
dataproc-subnet-b
. -
Создайте группу безопасности
dataproc-security-group
в сетиdataproc-network
. -
Создайте сервисный аккаунт
dataproc-sa
с ролями:storage.viewer
;storage.uploader
;dataproc.agent
;dataproc.user
.
-
Создайте бакет
dataproc-bucket
. -
Предоставьте сервисному аккаунту
dataproc-sa
разрешениеFULL_CONTROL
на бакетdataproc-bucket
. -
Создайте кластер Yandex Data Processing с параметрами:
-
Имя кластера —
dataproc-cluster
. -
Версия —
2.1
. -
Сервисы:
HDFS
LIVY
SPARK
TEZ
YARN
-
Сервисный аккаунт —
dataproc-sa
. -
Зона доступности —
ru-central1-b
. -
Имя бакета —
dataproc-bucket
. -
Сеть —
dataproc-network
. -
Группы безопасности —
dataproc-security-group
. -
Подкластеры — мастер, один подкластер
Data
и один подкластерCompute
.
-
-
Создайте кластер Managed Service for Apache Kafka® с параметрами:
- Имя кластера —
dataproc-kafka
. - Окружение —
PRODUCTION
. - Версия —
3.5
. - Зона доступности —
ru-central1-b
. - Сеть —
dataproc-network
. - Группы безопасности —
dataproc-security-group
. - Подсеть —
dataproc-subnet-b
.
- Имя кластера —
-
Создайте топик Apache Kafka® с параметрами:
- Имя —
dataproc-kafka-topic
. - Количество разделов —
1
. - Фактор репликации —
1
.
- Имя —
-
Создайте пользователя Apache Kafka® с параметрами:
- Имя —
user1
. - Пароль —
password1
. - Топики, на которые выдаются разрешения пользователю —
*
(все топики). - Разрешения на топики —
ACCESS_ROLE_CONSUMER
,ACCESS_ROLE_PRODUCER
иACCESS_ROLE_ADMIN
.
- Имя —
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации kafka-and-data-proc.tf
.В этом файле описаны:
- сеть;
- NAT-шлюз и таблица маршрутизации, необходимые для работы Yandex Data Processing;
- подсеть;
- группа безопасности, необходимая для кластеров Yandex Data Processing и Managed Service for Apache Kafka®;
- сервисный аккаунт, необходимый для работы кластера Yandex Data Processing;
- бакет Yandex Object Storage;
- статический ключ доступа, необходимый для выдачи сервисному аккаунту нужных разрешений на бакет;
- кластер Yandex Data Processing;
- кластер Managed Service for Apache Kafka®;
- пользователь Apache Kafka®;
- топик Apache Kafka®.
-
Укажите в файле
kafka-and-data-proc.tf
:folder_id
— идентификатор облачного каталога, такой же, как в настройках провайдера.dp_ssh_key
— абсолютный путь к публичному ключу для кластера Yandex Data Processing. Подробнее см. в разделе SSH-подключение к хосту Yandex Data Processing.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Создайте задания PySpark
-
На локальном компьютере сохраните следующие скрипты:
kafka-write.py
Скрипт для записи сообщений в топик Apache Kafka®:
#!/usr/bin/env python3 from pyspark.sql import SparkSession, Row from pyspark.sql.functions import to_json, col, struct def main(): spark = SparkSession.builder.appName("dataproc-kafka-write-app").getOrCreate() df = spark.createDataFrame([ Row(msg="Test message #1 from dataproc-cluster"), Row(msg="Test message #2 from dataproc-cluster") ]) df = df.select(to_json(struct([col(c).alias(c) for c in df.columns])).alias('value')) df.write.format("kafka") \ .option("kafka.bootstrap.servers", "<FQDN_хоста>:9091") \ .option("topic", "dataproc-kafka-topic") \ .option("kafka.security.protocol", "SASL_SSL") \ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " "username=user1 " "password=password1 " ";") \ .save() if __name__ == "__main__": main()
kafka-read-batch.py
Скрипт для чтения из топика и для пакетной обработки:
#!/usr/bin/env python3 from pyspark.sql import SparkSession, Row from pyspark.sql.functions import to_json, col, struct def main(): spark = SparkSession.builder.appName("dataproc-kafka-read-batch-app").getOrCreate() df = spark.read.format("kafka") \ .option("kafka.bootstrap.servers", "<FQDN_хоста>:9091") \ .option("subscribe", "dataproc-kafka-topic") \ .option("kafka.security.protocol", "SASL_SSL") \ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " "username=user1 " "password=password1 " ";") \ .option("startingOffsets", "earliest") \ .load() \ .selectExpr("CAST(value AS STRING)") \ .where(col("value").isNotNull()) df.write.format("text").save("s3a://dataproc-bucket/kafka-read-batch-output") if __name__ == "__main__": main()
kafka-read-stream.py
Скрипт для чтения из топика и для потоковой обработки:
#!/usr/bin/env python3 from pyspark.sql import SparkSession, Row from pyspark.sql.functions import to_json, col, struct def main(): spark = SparkSession.builder.appName("dataproc-kafka-read-stream-app").getOrCreate() query = spark.readStream.format("kafka")\ .option("kafka.bootstrap.servers", "<FQDN_хоста>:9091") \ .option("subscribe", "dataproc-kafka-topic") \ .option("kafka.security.protocol", "SASL_SSL") \ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " "username=user1 " "password=password1 " ";") \ .option("startingOffsets", "earliest")\ .load()\ .selectExpr("CAST(value AS STRING)")\ .where(col("value").isNotNull())\ .writeStream\ .trigger(once=True)\ .queryName("received_messages")\ .format("memory")\ .start() query.awaitTermination() df = spark.sql("select value from received_messages") df.write.format("text").save("s3a://dataproc-bucket/kafka-read-stream-output") if __name__ == "__main__": main()
-
Получите FQDN хоста Apache Kafka® и укажите FQDN в каждом скрипте.
-
Загрузите в корень бакета подготовленные скрипты.
-
Создайте задание PySpark для записи сообщения в топик Apache Kafka®. В поле Main python файл укажите путь до скрипта
s3a://dataproc-bucket/kafka-write.py
. -
Дождитесь, когда статус задания изменится на
Done
. -
Убедитесь, что данные в топик были успешно записаны. Для этого создайте новое задание PySpark для чтения из топика и для пакетной обработки данных. В поле Main python файл укажите путь до скрипта
s3a://dataproc-bucket/kafka-read-batch.py
. -
Дождитесь, когда статус нового задания изменится на
Done
. -
Скачайте из бакета файл с прочитанными данными:
part-00000
{"msg":"Test message #1 from dataproc-cluster"} {"msg":"Test message #2 from dataproc-cluster"}
Файл хранится в новой папке
kafka-read-batch-output
в бакете. -
Прочитайте сообщения из топика при потоковой обработке. Для этого создайте еще одно задание PySpark. В поле Main python файл укажите путь до скрипта
s3a://dataproc-bucket/kafka-read-stream.py
. -
Дождитесь, когда статус нового задания изменится на
Done
. -
Скачайте из бакета файлы с прочитанными данными:
part-00000
{"msg":"Test message #1 from dataproc-cluster"}
part-00001
{"msg":"Test message #2 from dataproc-cluster"}
Файлы хранятся в новой папке
kafka-read-stream-output
в бакете.
Примечание
Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
Удалите:
-
Удалите объекты из бакетов.
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
kafka-and-data-proc.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
kafka-and-data-proc.tf
, будут удалены. -