Синхронизация данных из топиков Apache Kafka® в бакет Object Storage без использования интернета
Примечание
В регионе Казахстан доступна только зона доступности kz1-a
.
Примечание
Функциональность сервисных подключений (VPC Private Endpoints) в Yandex Virtual Private Cloud находится на стадии Preview. Чтобы получить доступ, обратитесь к вашему аккаунт-менеджеру.
Вы можете синхронизировать данные из топиков Apache Kafka® в бакет Yandex Object Storage без использования интернета с помощью сервисного подключения в пользовательской сети, где располагается кластер Managed Service for Apache Kafka®. Для этого:
- Отправьте данные в топик.
- Убедитесь в недоступности бакета из внешней сети.
- Проверьте наличие данных в бакете.
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за бакет Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
- Плата за кластер Managed Service for Apache Kafka®: использование выделенных хостам вычислительных ресурсов и дискового пространства (см. тарифы Managed Service for Apache Kafka®).
- Плата за использование публичных IP-адресов для хостов кластера (см. тарифы Virtual Private Cloud).
Перед началом работы
-
Подготовьте инфраструктуру:
ВручнуюTerraform-
Создайте сеть с именем
my-private-network
. При создании выключите опцию Создать подсети. -
Создайте подсеть в любой зоне доступности.
-
Создайте сервисное подключение к Object Storage в сети
my-private-network
и запишите его идентификатор. -
Создайте сервисный аккаунт с именем
storage-pe-admin
и рольюstorage.admin
. Кластер Managed Service for Apache Kafka® будет использовать его для доступа к бакету. -
Создайте статический ключ доступа для сервисного аккаунта
storage-pe-admin
. -
Создайте бакет Yandex Object Storage и задайте для него политику доступа:
- Результат —
Разрешить
. - Действие —
Все действия
. - Ресурс —
<имя_бакета>
и<имя_бакета>/*
. - Условие — выберите из списка ключ
yc:private-endpoint-id
и укажите для его значения идентификатор созданного сервисного подключения.
- Результат —
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации со следующими настройками:
- Один хост-брокер.
- Публичный доступ к хостам кластера.
- Сеть
my-private-network
.
-
Если вы используете группы безопасности в кластере, убедитесь, что они настроены правильно и допускают подключение к нему.
-
Создайте в кластере топик с именем
my-private-topic
. -
Создайте в кластере пользователя с именем
mkf-user
и правом доступаACCESS_ROLE_PRODUCER
к созданному топику. -
Создайте в кластере коннектор со следующими настройками:
- В блоке Дополнительные свойства укажите свойства коннектора:
key.converter
:org.apache.kafka.connect.storage.StringConverter
value.converter
:org.apache.kafka.connect.converters.ByteArrayConverter
format.output.fields.value.encoding
:none
- Выберите тип коннектора S3 Sink.
- В поле Топики укажите
my-private-topic
. - В блоке Подключение к S3 укажите параметры:
- Имя бакета — созданный ранее бакет.
- Эндпоинт —
storage.pe.yandexcloud.net
. - Идентификатор ключа доступа, Секретный ключ — идентификатор и секретный ключ созданного ранее статического ключа доступа.
- В блоке Дополнительные свойства укажите свойства коннектора:
-
Создайте ВМ с публичным IP-адресом в созданной сети
my-private-network
для подключения к бакету.
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации kafka-objstorage-sync-private-network.tf
.В этом файле описаны:
- сеть;
- подсеть;
- сервисное подключение;
- группа безопасности, необходимая для подключения к кластеру;
- сервисный аккаунт, который будет использоваться для создания бакета и доступа к нему;
- бакет Object Storage;
- кластер Managed Service for Apache Kafka®;
- топик Apache Kafka®;
- пользователь Apache Kafka®;
- коннектор Apache Kafka®;
- ВМ для чтения данных из бакета.
-
Укажите в файле
kafka-objstorage-sync-private-network.tf
:tf_account_name
— имя сервисного аккаунта, такое же, как в настройках провайдера.bucket_name
— имя бакета в соответствии с правилами именования.mkf_version
— версия Apache Kafka®;mkf_user_password
— пароль пользователя Apache Kafka®.vm_image_id
— идентификатор публичного образа ВМ.vm_username
иvm_ssh_key
– логин и абсолютный путь к публичному ключу, которые будут использоваться для доступа к ВМ.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
-
-
Убедитесь, что в сервисе Yandex Cloud DNS появилась запись
*.storage.pe.yandexcloud.net
в сервисной зоне.
созданной сети. -
Установите утилиту kafkacat
для записи данных в топик Apache Kafka®.sudo apt update && sudo apt install --yes kafkacat
Убедитесь, что можете с ее помощью подключиться к созданному ранее кластеру Managed Service for Apache Kafka® через SSL.
Отправьте данные в топик
-
Отправьте тестовые данные в топик
my-private-topic
с помощью утилитыkafkacat
:for i in $(echo {1..50}) do echo "test message_"$i | kafkacat -P \ -b <FQDN_хоста-брокера>:9091 \ -t my-private-topic \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.username="<имя_пользователя_в_кластере-источнике>" \ -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z done
-
Убедитесь в репликации данных в бакет. Для этого откройте статистику загрузок бакета и проверьте наличие запросов на графике Modify Requests.
Убедитесь в недоступности бакета из внешней сети
-
Если у вас еще нет интерфейса командной строки AWS CLI, установите и сконфигурируйте его.
-
Выполните команду для получения списка объектов бакета по адресу в публичной сети:
aws s3 ls s3://<имя_бакета> \ --endpoint-url=https://storage.yandexcloud.net \ --recursive
Результат:
An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied
-
Выполните команду для получения списка объектов бакета по адресу в приватной сети:
aws s3 ls s3://<имя_бакета> \ --endpoint-url=https://storage.pe.yandexcloud.net \ --recursive
Результат:
Could not connect to the endpoint URL: "https://storage.pe.yandexcloud.net/<имя_бакета>?list-type=2&prefix=&encoding-type=url"
Проверьте наличие данных в бакете по сервисному подключению
Подключитесь по SSH к созданной ранее ВМ и выполните следующие действия:
-
Установите и сконфигурируйте интерфейс командной строки AWS CLI.
-
Выполните команду для получения списка объектов бакета по адресу в приватной сети:
aws s3 ls s3://<имя_бакета> \ --endpoint-url=https://storage.pe.yandexcloud.net \ --recursive
Результат:
2025-08-01 14:38:23 20 my-private-topic-1-0 2025-08-01 14:38:23 587 my-private-topic-1-1 2025-08-01 14:38:24 440 my-private-topic-1-29
Такой результат означает, что данные из топика Apache Kafka® успешно синхронизированы через сервисное подключение.
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
- Удалите кластер Managed Service for Apache Kafka®.
- Удалите бакет Object Storage. Перед удалением бакета удалите все объекты из него.
- Удалите виртуальную машину
Предварительно удалите все объекты из созданного ранее бакета.
-
В терминале перейдите в директорию с планом инфраструктуры.
Важно
Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.
-
Удалите ресурсы:
-
Выполните команду:
terraform destroy
-
Подтвердите удаление ресурсов и дождитесь завершения операции.
Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.
-