Change Data Capture в Yandex Data Transfer: гид по технологии с примерами

Рассказываем, как в теории и на практике работает Change Data Capture и как наш сервис Yandex Data Transfer с поддержкой формата Debezium помогает пользователям решать задачи поставки данных, связанные с CDC.

В современных микросервисных архитектурах регулярно встречаются потребности в кешах, индексах полнотекстового поиска, репликах, а также в реактивном взаимодействии компонентов. Решать все эти задачи по отдельности — тот ещё вызов, но оказывается все эти задачи могут быть решены одним механизмом, и имя ему: Change Data Capture.

Что такое Change Data Capture (CDC)

Change Data Capture (CDC, «захват изменения данных») — это набор шаблонов разработки программного обеспечения, который позволяет организовать реактивную инфраструктуру, упростить микросервисную архитектуру, а также распилить монолит на микросервисы.

Как правило, во всех подобных сценариях речь идёт о том, что у вас есть OLTP-база данных. С помощью CDC вы получаете последовательность событий о добавлении, изменении и удалении строк и обрабатываете их — таким образом база данных преобразуется в event-driven систему.

Типовая схема использования Change Data Capture

Когда полезен СDС

Правильно приготовленный CDC позволяет добиться реакции на событие за время, измеряемое долями секунды. Способность СDC быстро и эффективно перемещать данные небольшими порциями делает его полезным, когда нужно в режиме реального времени реагировать на изменения в базе-источнике. Можно выделить два основных направления, в которых применяется CDC:

  1. это организация реактивного взаимодействия между компонентами инфраструктуры;

  2. реализация различных паттернов построения микросервисной архитектуры.

Рассмотрим оба.

1. Реактивное взаимодействие между компонентами инфраструктуры

В современных архитектурах часто работают не со всей базой данных, а с ее частью, и обычно CDC применяется к конкретным таблицам. Имея последовательность модифицирующих таблицу событий, можно получить асинхронную реплику таблицы. А поскольку события уже отделились от базы-источника в независимом от базы формате, базой-приёмником может выступать что угодно: OLTP/OLAP/Cache/Full-Text Search.

Так можно получить, например, кеш с минимальными задержками без необходимости внесения изменений в бизнес-логику сервисов. Посмотрим, как это работает на конкретных примерах репликации.

Репликация в Data Warehouse (DWH). Применяя поток изменений к DWH, можно получить асинхронную реплику вашей боевой OLTP-базы. Это решает сразу несколько проблем:

  • позволяет аналитикам не влиять на боевую OLTP-базу,

  • решает вопрос передачи данных из транзакционной базы в аналитическую,

  • позволяет аналитикам работать на аналитических базах.

Например, вы можете автоматически получить асинхронную реплику боевого PostgreSQL в ClickHouse®, Elasticsearch/OpenSearch или S3.

Пример с поставкой репликационного потока из MySQL в ClickHouse

Обновление или инвалидация кешей. Применяя поток изменений к кешу, можно получить прямиком из боевой OLTP-базы автоматически обновляемый кеш (опционально агрегированный или трансформированный). В качестве практического примера могу привести проект RedisCDC для Redis. Это распределённая платформа, которая упрощает поставку CDC-потока в Redis.

Отгрузка данных в движок полнотекстового поиска Elasticsearch/OpenSearch/Solr. Отправляя нужную часть потока изменений в движок полнотекстового поиска, можно получить автоматически обновляемый индекс полнотекстового поиска.

Пример поставки репликационного потока из MySQL в ElasticSearch

Репликация в транзакционную БД. Имея поток изменений в базе, можно получить асинхронную логическую реплику, например для dev-стенда разработчиков. А поскольку, как правило, события передаются в логическом виде, можно попутно произвести с ними логические преобразования. Например, колонки с одними чувствительными данными исключить из реплики, а другие чувствительные данные зашифровать.

Если по каким-нибудь причинам вы хотите получить реплику ваших данных в другой БД: например, прод в MySQL, а данные нужны в PostgreSQL, или наоборот, — это тоже можно реализовать при помощи CDC.

Пример поставки из MySQL в PostgreSQL

Полный аудит происходящего в базе. CDC даёт возможность проводить полный аудит происходящего в базе. Сохраняя события об изменениях в базе «как есть» в какое-либо хранилище, вы можете получить аудитные логи всего происходящего с вашей боевой базой. А при обработке потока сообщений, вы можете получить аудит в реальном времени.

Пример со сценарием такого аудита

Отгрузка потока сообщений другим командам. Отгружая события в очередь, можно нотифицировать другие команды о наступивших событиях. Более системно этот подход рассматривается в блоке о микросервисных паттернах, но отгружать коллегам события можно и без всяких паттернов.

Отказоустойчивость. Если у вас кросс-датацентровая очередь, отгружая CDC в очередь, вы можете дублировать читателей в двух разных дата-центрах и обеспечить таким образом гарантию DC-1 (на случай отключения одного дата-центра).

Снижение нагрузки на мастер транзакционной БД. CDC позволяет снизить нагрузку на мастер транзакционной БД или сервис, если речь про базу микросервиса.

Сериализация CDC-потока в очереди позволяет множеству приёмников работать с информацией мастера, нагрузив его лишь один раз. Можно включить все вышеперечисленные приёмники, ещё и умножить их на два: по копии в каждом ДЦ, при этом нагрузка на мастер не изменится — все они будут лишь читать очередь сообщений.

2. Паттерны построения микросервисной архитектуры

СDC — это подход, который позволяет одновременно использовать несколько паттернов микросервисной архитектуры. С точки зрения паттернов проектирования микросервисов, CDC позволяет одновременно осуществлять и реактивную обработку событий одних микросервисов другими, и decoupling (отсоединение) микросервисов, и помогать распиливать монолиты на микросервисы. На тонких нюансах отличия паттернов останавливаться не буду, но для интересующихся — в конце поделюсь списком ссылок по теме.

Паттерн outbox. Паттерн outbox (aka Application events) предписывает объединять изменения стейта с отправкой нотификаций другим сервисам. Это достижимо либо через сохранение сообщений на отправку в соседних таблицах в той же транзакции (см. transactional outbox), либо через последующую отправку сообщений через CDC.

Если складывать CDC-поток в очередь, то как побочный эффект получается event sourcing с domain-событиями на шине в виде Apache Kafka и взаимодействие сервисов через Kafka. Это позволяет отчасти осуществить decoupling микросервисов: сервисы перестают куда-то ходить кого-то оповещать. Те, кому нужно получать события, сами подписываются на события, а сервис работает только с собственной базой. Кроме того, из бесплатных плюшек получаем: отсутствие необходимости в service discovery (механизме обнаружения сервисов), воспроизводимость, а также возможность легко и просто весь поток складывать, например, в Elasticsearch.

Паттерн CQRS. Command and Query Responsibility Segregation (CQRS) — паттерн, в котором приложение условно разделяется на часть, модифицирующую стейт, и на часть, читающую стейт. CDC позволяет направить часть, модифицирующую стейт, через очередь в другую базу, из которой будет производиться чтение.

Паттерн Strangler. Strangler — это паттерн распиливания монолита на микросервисы. Правильно приготовленный CDC полностью прозрачен для унаследованного приложения и не требует никаких изменений в унаследованной модели данных, что сильно облегчает декомпозицию монолита и делает внедрение в монолит максимально неинвазивным.

Способы реализации CDC

В теории существует три способа реализации CDC: временны́е метки на строках, триггеры на таблицах и логическая репликация. На практике на просторах GitHub нашелся лишь один проект (и тот малоизвестный), который реализовывал бы CDC через версии на строках.

В основном используется логическая репликация, реализованная через работу с WAL — Write-Ahead Log. Каждая база данных для обеспечения гарантий ACID содержит Write-Ahead Log — лог изменений, закодированный в бинарном формате: в PostgreSQL это называется wal, в MySQL — binlog, в Oracle — redo-log, в MongoDB — oplog, в MSSQL — transaction log.

По сути такой лог уже содержит то, что нам нужно, остаётся только раскодировать его и отправить в очередь сообщений. Правда, иногда появляются внезапные ограничения. Например, в случае PostgreSQL этот подход не дружит с pg_repack.

На современном рынке CDC-решений значительную часть занимает проект Debezium, основанный на декодировании wal-лога. Так что рассмотрим его подробнее.

Debezium — самая популярная реализация CDC

Debezium сегодня является стандартом де-факто в мире CDC. Он появился в 2015 году силами разработчиков из Red Hat и сейчас поддерживает 9 популярных источников: MySQL, MongoDB, PostgreSQL, Oracle, MSSQL, Db2, Cassandra, Vitess, Google Spanner. Изначально инструмент был только Kafka-коннектором, но в 2020-м появился Debezium Server, который умеет отправлять события во все популярные очереди.

Debezium поддерживает три формата сериализации событий: json (default), avro и protobuf. Сериализаторы модульны и параметризованы, и можно как настроить любой из существующих сериализаторов, так и написать свой. Поддерживаются интеграции с различными Schema registry. Также поддерживаются трансформации, интеграция с мониторингами, есть UI.

А ещё Debezium выдаёт формат данных, единый для всех БД. Таким образом, научившись обрабатывать дебезиумные потоки из MySQL, вы практически тем же кодом можете обрабатывать дебезиумные потоки и из любых других поддерживаемых Debezium баз данных.

В общем, проект уже достаточно зрелый, чтобы быть надёжным решением, но всё ещё быстро развивается, богат на фичи и интеграции, с обилием материалов и документации. Предоставляются удобные Docker-контейнеры с Debezium, которые позволяют ставить эксперименты за считанные минуты, к тому же есть замечательный официальный репозиторий с демо-примерами.

Как Yandex Data Transfer пришёл к CDC

Yandex Data Transfer появился как сервис миграции баз данных в облако. Когда пользователям необходимо заехать в облачную базу данных (managed database), как правило, у них уже есть база данных на железе (on-premise) и нужно мигрировать эти данные в облако.

Data Transfer решает задачу следующим образом: сначала переносится снапшот базы данных, после чего к базе-приёмнику применяются все изменения, произошедшие на базе-источнике с момента взятия снапшота. В итоге в облаке получается асинхронная реплика базы данных, остаётся только отключить пишущую нагрузку на базу-источник, подождать несколько секунд, пока реплика догонит мастера, и включить нагрузку уже на базу в облаке.

Таким образом можно мигрировать в облако с минимальным даунтаймом.

В терминологии сервиса есть две основных сущности:

  • Эндпоинт — это настройки подключения + дополнительные настройки. Эндпоинт может быть либо источником, из которого выгружаются данные, либо приёмником, куда данные загружаются.

  • Трансфер, который соединяет эндпоинт-источник с эндпоинтом-приёмником. Содержит собственные настройки, прежде всего тип трансфера: снапшот, репликация, снапшот+репликация.

После того как трансфер создан, его можно активировать разово, или же можно настроить активацию по расписанию. В случае снапшота таблицы переливаются, и затем трансфер деактивируется сам. В случае репликации запускается бесконечный процесс переноса новых данных из источника в приёмник.

Со временем помимо задачи миграции сервис начал использоваться для переливания данных между разными БД, например, из транзакционной в аналитическую, а также между очередями сообщений и между базами данных и очередями. Если свести все поддерживаемые варианты трансфера в одну матрицу, видно, что Data Transfer сразу умеет работать и со снапшотами, и с репликационными потоками данных, горизонтально масштабируясь и шардируясь.

Поскольку Yandex Data Transfer с момента создания получал репликационный поток на уровне логической репликации, естественным шагом было научиться отдавать это пользователям. Именно это превращало Data Transfer в CDC-решение, работающее через обработку wal-лога. Реализовывать отгрузку событий логической репликации в очередь мы начали по пользовательским запросам, и было принято решение порождать события в формате Debezium. Так можно было бы в настроенном пайплайне подменить один CDC-продукт другим, и пайплайн остался бы рабочим.

На данный момент в Data Transfer сериализатор реализован для PostgreSQL и MySQL-источников, а недавно появилась поддержка YDB-источника. Теперь вы можете в несколько кликов настроить поставку CDC из ваших таблиц YDB в Apache Kafka и в YDS в формате Debezium. Также вскоре после поддержки YDB появилась и поддержка Schema Registry, что на порядок уменьшает объёмы передаваемых данных. В планах получить ещё больше интеграций с различными опенсорс-сервисами.

Таким образом, для MySQL и PostgreSQL получился drop-in replacement для Debezium. Из крупных отличий: Data Transfer умеет переживать переезд мастера PostgreSQL (ситуация, когда реплика становится мастером) при включенном плагине pg_tm_aux, а также сервис умеет переносить user-defined types в PostgreSQL.

Сервис уже несколько лет успешно эксплуатируется внутри Яндекса в боевых условиях с солидными объёмами данных. Data Transfer стал универсальным сервисом переноса данных из любого источника в любой приёмник (с возможностями ETL-процессинга): как снапшотов, так и потоков данных — и пользователи помимо применения сервиса по прямому назначению придумали массу неочевидных способов использования. CDC является лишь одним способом применения сервиса Data Transfer из множества.

Вдобавок к тому, что сервис изначально горизонтально масштабируемый, мы научились параллелиться везде, где это возможно, и сейчас во внутренней инфраструктуре бежит почти две тысячи потоков данных, перенося гигабайты в секунду.

Реальные кейсы использования CDC через Yandex Data Transfer

Расскажу про популярные пользовательские кейсы.

  1. Самый распространённый вариант — использование CDC для создания реплик транзакционных баз (PostgreSQL, MySQL, YDB) в аналитических хранилищах (ClickHouse, YTsaurus). Сохранение событий в очередь выгружает данные из транзакционной БД один раз, а аналитических баз-приёмников как правило по две — по одной в каждом дата-центре, чтобы гарантировать DC-1.
    Проблема
    Продакшн-процессы бегут в транзакционных базах данных, а бизнесу интересно получать аналитические отчёты, которые удобно делать через аналитические БД. Поскольку HTAP-базы — ещё экзотика, нужно каким-то образом перекладывать данные из транзакционной базы в аналитическую.
    Было
    Обычно все команды переливают данные из транзакционной базы в аналитическую через собственные ad-hoc скрипты или in-house решения снапшотами, в лучшем случае query-based CDC. Как правило, это не оформлено в продукт и неудобно для использования.
    Стало
    CDC позволяет удобным образом, например, через UI, организовать реплику транзакционной базы в аналитическую с отставанием реплики в районе единиц секунд.

  2. Наши коллеги из Яндекс Маркета организовали схему, похожую на описанную, но с любопытными отличиями.
    Проблема
    Нужно было выгружать продакшн-данные из PostgreSQL в YTsaurus для аналитики, желательно с минимальным отставанием.
    Было
    Когда-то давно использовался самописный сервис на java, который раз в сутки выгружал снапшот. Потом был написан ещё ряд скриптов для выгрузки, а после этого использовался ещё один внутренний сервис, который уже умел производить query-based CDC. Но заводить новую поставку данных было сложно.
    Стало
    Ребята организовали поставку инкрементальных таблиц в кросс-датацентровую очередь с регулярным запуском — это один трансфер, а два других трансфера с типом «репликация» — из очереди в YTsaurus. Два трансфера нужны для обеспечения гарантии DC-1 — каждый из этих двух трансферов поставляет данные в инсталляцию YTsaurus в отдельном дата-центре.
    Сама очередь тут даёт дополнительный эффект: в транзакционную базу осуществляется один поход, а данные оказываются в двух аналитических базах.
    В итоге у команды Маркета три трансфера: один сам запускается по расписанию и выгружает новые строчки, два бегут постоянно в режиме репликации, выгружая данные из очереди в аналитические хранилища.

  3. Коллеги из Яндекс Недвижимости начали использовать CDC, чтобы обновлять поисковые индексы Elasticsearch, потом применили CDC для реактивного взаимодействия компонентов, и в конце концов — CDC стал неотъемлемой частью проекта.
    Проблема
    При изменении данных в MySQL нужно было обновлять индексы Elasticsearch.
    Было
    До CDC пришлось бы реализовывать transactional outbox pattern. Но поскольку с появлением задачи в Yandex Data Transfer уже был CDC — сначала реализовали отгрузку событий в очередь и сделали скрипт, обрабатывающий события из очереди.
    Стало
    Data Transfer позволил организовать процесс обновления поисковых индексов, после чего коллеги, оценив удобство организации реактивной инфраструктуры, нашли множество применений CDC. Теперь они с помощью CDC: отправляют пуши и нотификации, проксируют данные во внешнюю CRM, планируют задачи, реактивно реагируют на промокоды, и ещё много чего.

  4. Коллеги из Яндекс Метрики превратили MySQL и PostgreSQL при помощи CDC-событий в потоковую базу данных, работающую поверх YTsaurus. Созданный инструмент назвали «инкрементальные материализованные представления».
    Проблема
    В Метрике есть MySQL с настройками, в который множество сервисов ходит забирать эти настройки.
    Было
    Запускать все сервисы в MySQL — не вариант, поскольку это создавало серьезную нагрузку.
    Сначала были организованы кеши — кеширующие сервисы периодически ходили за настройками в MySQL, и все остальные сервисы ходили в кеширующие сервисы.
    Но со временем кеширующие сервисы стали потреблять много ресурсов и начали заметно отставать до пары десятков минут. А сервисы, ходящие за настройками, начали рестартоваться слишком долго, поскольку долго инициализировали свои кеши.
    Стало
    Была разработана потоковая база данных, потоковый аналог связки airflow+dbt, где DAG’и пересчитывают производные данные только для изменившихся строчек, реактивно реагируя на CDC-события. В итоге кеши стали отставать в среднем на 5 секунд вместо десятков минут до этого. Сервисам удалось избавиться от локальных кешей, что сэкономило много RAM, и сервисы стали рестартиться быстро. Надеюсь когда-нибудь коллеги расскажут об этом решении подробнее.

Подведём итоги

В статье мы рассмотрели Change Data Capture со всех сторон: теория, сценарии применения, практика и реальные пользовательские истории. Чтобы узнать о CDC больше, смотрите вебинар, читайте статью Мартина Клеппмана, экспериментируйте с Debezium и Yandex Data Transfer. Отмечу, что Yandex Data Transfer — бесплатный сервис в Yandex Cloud. Приходите пользоваться, оставляйте фидбэк и фичареквесты.

Напишите нам

Начать пользоваться Yandex Cloud

Тарифы

Узнать цены и рассчитать стоимость

Мероприятия

Календарь событий Yandex Cloud
Change Data Capture в Yandex Data Transfer: гид по технологии с примерами
Войдите, чтобы сохранить пост