Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Processing
  • Начало работы
    • Все руководства
      • Обмен данными с Managed Service for ClickHouse®
      • Импорт данных из кластера Managed Service for MySQL® с помощью Sqoop
      • Импорт данных из кластера Managed Service for PostgreSQL с помощью Sqoop
      • Интеграция с сервисом DataSphere
      • Работа с топиками Apache Kafka® с помощью PySpark-заданий
      • Автоматизация работы с помощью Managed Service for Apache Airflow™
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Создайте задания PySpark
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Интеграция Yandex Data Processing с другими сервисами
  3. Работа с топиками Apache Kafka® с помощью PySpark-заданий

Работа с топиками Apache Kafka® с помощью PySpark-заданий в Yandex Data Processing

Статья создана
Yandex Cloud
Обновлена 8 апреля 2025 г.
  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Создайте задания PySpark
  • Удалите созданные ресурсы

Кластеры Yandex Data Processing поддерживают интеграцию с кластерами Managed Service for Apache Kafka®. Вы можете записывать сообщения в топики Apache Kafka® и читать сообщения из топиков с помощью PySpark-заданий. При чтении поддерживается пакетная обработка (batch processing) и потоковая обработка (stream processing).

Чтобы настроить интеграцию между кластерами Managed Service for Apache Kafka® и Yandex Data Processing:

  1. Подготовьте инфраструктуру.
  2. Создайте задания PySpark.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за кластер Yandex Data Processing (см. тарифы Yandex Data Processing).
  • Плата за NAT-шлюз (см. тарифы Virtual Private Cloud).
  • Плата за бакет Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).

Подготовьте инфраструктуруПодготовьте инфраструктуру

Вручную
Terraform
  1. Создайте облачную сеть dataproc-network без подсетей.

  2. Создайте подсеть dataproc-subnet-b в зоне доступности ru-central1-b.

  3. Настройте NAT-шлюз для подсети dataproc-subnet-b.

  4. Создайте группу безопасности dataproc-security-group в сети dataproc-network.

  5. Настройте группу безопасности.

  6. Создайте сервисный аккаунт dataproc-sa с ролями:

    • storage.viewer;
    • storage.uploader;
    • dataproc.agent;
    • dataproc.user.
  7. Создайте бакет dataproc-bucket.

  8. Предоставьте сервисному аккаунту dataproc-sa разрешение FULL_CONTROL на бакет dataproc-bucket.

  9. Создайте кластер Yandex Data Processing с параметрами:

    • Имя кластера — dataproc-cluster.

    • Окружение — PRODUCTION.

    • Версия — 2.1.

    • Сервисы:

      • HDFS
      • LIVY
      • SPARK
      • TEZ
      • YARN
    • Сервисный аккаунт — dataproc-sa.

    • Зона доступности — ru-central1-b.

    • Имя бакета — dataproc-bucket.

    • Сеть — dataproc-network.

    • Группы безопасности — dataproc-security-group.

    • Подкластеры — мастер, один подкластер Data и один подкластер Compute.

  10. Создайте кластер Managed Service for Apache Kafka® с параметрами:

    • Имя кластера — dataproc-kafka.
    • Окружение — PRODUCTION.
    • Версия — 3.5.
    • Зона доступности — ru-central1-b.
    • Сеть — dataproc-network.
    • Группы безопасности — dataproc-security-group.
    • Подсеть — dataproc-subnet-b.
  11. Создайте топик Apache Kafka® с параметрами:

    • Имя — dataproc-kafka-topic.
    • Количество разделов — 1.
    • Фактор репликации — 1.
  12. Создайте пользователя Apache Kafka® с параметрами:

    • Имя — user1.
    • Пароль — password1.
    • Топики, на которые выдаются разрешения пользователю — * (все топики).
    • Разрешения на топики — ACCESS_ROLE_CONSUMER, ACCESS_ROLE_PRODUCER и ACCESS_ROLE_ADMIN.
  1. Если у вас еще нет Terraform, установите его.

  2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

  3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

  4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

  5. Скачайте в ту же рабочую директорию файл конфигурации kafka-and-data-proc.tf.

    В этом файле описаны:

    • сеть;
    • NAT-шлюз и таблица маршрутизации, необходимые для работы Yandex Data Processing;
    • подсеть;
    • группа безопасности, необходимая для кластеров Yandex Data Processing и Managed Service for Apache Kafka®;
    • сервисный аккаунт, необходимый для работы кластера Yandex Data Processing;
    • сервисный аккаунт для управления бакетом Yandex Object Storage;
    • бакет Yandex Object Storage;
    • статический ключ доступа, необходимый для выдачи сервисному аккаунту нужных разрешений на бакет;
    • кластер Yandex Data Processing;
    • кластер Managed Service for Apache Kafka®;
    • пользователь Apache Kafka®;
    • топик Apache Kafka®.
  6. Укажите в файле kafka-and-data-proc.tf:

    • folder_id — идентификатор облачного каталога, такой же, как в настройках провайдера.
    • dp_ssh_key — абсолютный путь к публичному ключу для кластера Yandex Data Processing. Подробнее см. в разделе SSH-подключение к хосту Yandex Data Processing.
  7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

    terraform validate
    

    Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  8. Создайте необходимую инфраструктуру:

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

    В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

Создайте задания PySparkСоздайте задания PySpark

  1. На локальном компьютере сохраните следующие скрипты:

    kafka-write.py

    Скрипт для записи сообщений в топик Apache Kafka®:

    #!/usr/bin/env python3
    
    from pyspark.sql import SparkSession, Row
    from pyspark.sql.functions import to_json, col, struct
    
    def main():
       spark = SparkSession.builder.appName("dataproc-kafka-write-app").getOrCreate()
    
       df = spark.createDataFrame([
          Row(msg="Test message #1 from dataproc-cluster"),
          Row(msg="Test message #2 from dataproc-cluster")
       ])
       df = df.select(to_json(struct([col(c).alias(c) for c in df.columns])).alias('value'))
       df.write.format("kafka") \
          .option("kafka.bootstrap.servers", "<FQDN_хоста>:9091") \
          .option("topic", "dataproc-kafka-topic") \
          .option("kafka.security.protocol", "SASL_SSL") \
          .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
          .option("kafka.sasl.jaas.config",
                  "org.apache.kafka.common.security.scram.ScramLoginModule required "
                  "username=user1 "
                  "password=password1 "
                  ";") \
          .save()
    
    if __name__ == "__main__":
       main()
    
    kafka-read-batch.py

    Скрипт для чтения из топика и для пакетной обработки:

    #!/usr/bin/env python3
    
    from pyspark.sql import SparkSession, Row
    from pyspark.sql.functions import to_json, col, struct
    
    def main():
       spark = SparkSession.builder.appName("dataproc-kafka-read-batch-app").getOrCreate()
    
       df = spark.read.format("kafka") \
          .option("kafka.bootstrap.servers", "<FQDN_хоста>:9091") \
          .option("subscribe", "dataproc-kafka-topic") \
          .option("kafka.security.protocol", "SASL_SSL") \
          .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
          .option("kafka.sasl.jaas.config",
                  "org.apache.kafka.common.security.scram.ScramLoginModule required "
                  "username=user1 "
                  "password=password1 "
                  ";") \
          .option("startingOffsets", "earliest") \
          .load() \
          .selectExpr("CAST(value AS STRING)") \
          .where(col("value").isNotNull())
    
       df.write.format("text").save("s3a://dataproc-bucket/kafka-read-batch-output")
    
    if __name__ == "__main__":
       main()
    
    kafka-read-stream.py

    Скрипт для чтения из топика и для потоковой обработки:

    #!/usr/bin/env python3
    
    from pyspark.sql import SparkSession, Row
    from pyspark.sql.functions import to_json, col, struct
    
    def main():
       spark = SparkSession.builder.appName("dataproc-kafka-read-stream-app").getOrCreate()
    
       query = spark.readStream.format("kafka")\
          .option("kafka.bootstrap.servers", "<FQDN_хоста>:9091") \
          .option("subscribe", "dataproc-kafka-topic") \
          .option("kafka.security.protocol", "SASL_SSL") \
          .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
          .option("kafka.sasl.jaas.config",
                  "org.apache.kafka.common.security.scram.ScramLoginModule required "
                  "username=user1 "
                  "password=password1 "
                  ";") \
          .option("startingOffsets", "earliest")\
          .load()\
          .selectExpr("CAST(value AS STRING)")\
          .where(col("value").isNotNull())\
          .writeStream\
          .trigger(once=True)\
          .queryName("received_messages")\
          .format("memory")\
          .start()
    
       query.awaitTermination()
    
       df = spark.sql("select value from received_messages")
    
       df.write.format("text").save("s3a://dataproc-bucket/kafka-read-stream-output")
    
    if __name__ == "__main__":
       main()
    
  2. Получите FQDN хоста Apache Kafka® и укажите FQDN в каждом скрипте.

  3. Загрузите в корень бакета подготовленные скрипты.

  4. Создайте задание PySpark для записи сообщения в топик Apache Kafka®. В поле Main python файл укажите путь до скрипта s3a://dataproc-bucket/kafka-write.py.

  5. Дождитесь, когда статус задания изменится на Done.

  6. Убедитесь, что данные в топик были успешно записаны. Для этого создайте новое задание PySpark для чтения из топика и для пакетной обработки данных. В поле Main python файл укажите путь до скрипта s3a://dataproc-bucket/kafka-read-batch.py.

  7. Дождитесь, когда статус нового задания изменится на Done.

  8. Скачайте из бакета файл с прочитанными данными:

    part-00000
    {"msg":"Test message #1 from dataproc-cluster"}
    {"msg":"Test message #2 from dataproc-cluster"}
    

    Файл хранится в новой папке kafka-read-batch-output в бакете.

  9. Прочитайте сообщения из топика при потоковой обработке. Для этого создайте еще одно задание PySpark. В поле Main python файл укажите путь до скрипта s3a://dataproc-bucket/kafka-read-stream.py.

  10. Дождитесь, когда статус нового задания изменится на Done.

  11. Скачайте из бакета файлы с прочитанными данными:

    part-00000
    {"msg":"Test message #1 from dataproc-cluster"}
    
    part-00001
    {"msg":"Test message #2 from dataproc-cluster"}
    

    Файлы хранятся в новой папке kafka-read-stream-output в бакете.

Примечание

Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:

  1. Удалите объекты из бакета.

  2. Остальные ресурсы удалите в зависимости от способа их создания:

    Вручную
    Terraform
    1. Кластер Yandex Data Processing.
    2. Кластер Managed Service for Apache Kafka®.
    3. Бакет.
    4. Группу безопасности.
    5. Подсеть.
    6. Таблицу маршрутизации.
    7. NAT-шлюз.
    8. Сеть.
    9. Сервисный аккаунт.
    1. В терминале перейдите в директорию с планом инфраструктуры.

      Важно

      Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

    2. Удалите ресурсы:

      1. Выполните команду:

        terraform destroy
        
      2. Подтвердите удаление ресурсов и дождитесь завершения операции.

      Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Интеграция с сервисом DataSphere
Следующая
Автоматизация работы с помощью Managed Service for Apache Airflow™
Проект Яндекса
© 2025 ООО «Яндекс.Облако»