Собираем статистику по телеграм‑каналу и строим кастомные графики

Вадим Владимиров, разработчик команды Managed Service for Apache Airflow, рассказывает, как собрать статистику и кастомные графики количества подписчиков в Telegram с использованием YDB в режиме serverless и DataLens.

Привет! Меня зовут Вадим, я разработчик и у меня есть свой канал в Telegram. Как и многие Telegram‑админы, я хочу следить за статистикой: оценивать эффективность, когда что‑то делаю для привлечения подписчиков, мониторить прирост аудитории и знать, какие именно пользователи и когда подписались/отписались.

В Telegram из коробки можно посмотреть какие‑то графики. Но, например, количество подписчиков отображается с точностью до дня, что не очень удобно. Сделать оттуда drill down до конкретного действия тоже нельзя.

При этом как администратор канала через API я могу получить гораздо больше нужной информации. В этой статье покажу, как я собрал кастомные графики с использованием доступных в open source инструментов: YDB в режиме serverless и DataLens.

1. Анализируем ситуацию as is

По умолчанию нативный график по количеству подписчиков в Telegram выглядит так:

Видим дату и количество подписчиков. Собственно, это всё

Анализировать в таком режиме те же отписки не получится — их в таком обобщённом графике просто не видно.

Но в первом приближении через API можно дополнительно получать:

  • ID пользователя;

  • время события с точностью до секунды;

  • само событие со своими метаданными (например ссылка, по которой пришёл новый подписчик).

Ещё в первый день активного привлечения подписчиков в свой канал я загорелся идеей написать простой скрипт на Python: он подключался от имени пользователя с админ‑правами, получал все последние события и сохранял их в обычные текстовые файлы в директорию рядом.

События, которые накопились за это время

Правда, выгружалось всё в каком‑то очень странном формате через .stringify() в telethon, который выступал таким «чёрным ящиком».

Пример одного события

Этот формат был не очень удачным и привёл к проблемам в дальнейшем. На тот момент я этого ещё не знал и пошёл развивать идею дальше.

2. Собираем события

Для своей разработки я выбрал Go и использовал библиотеку gotd, как самую популярную и активно поддерживаемую.

Всё должно было работать примерно так: мы раз в минуту запрашиваем новые события и пользователей из них, а дальше просто записываем в базу данных.

func main() {
        // init
        
        go AdminLogUpdater(ctx, time.Minute, channelID)
}

func AdminLogUpdater(ctx context.Context,
    interval time.Duration, channelID int64) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for {
        err := updateAdminLog(ctx, channelID)
        if err != nil {
                // logging
        }
          
        select {
        case <-ticker.C:
                continue
        case <-ctx.Done():
                return
        }
    }
}

func updateAdminLog(ctx context.Context, channelID int64) error {
    resp, err := GetChannelAdminLog(ctx, channelID)
    if err != nil {
        return err
    }

    storage.AddEvents(resp.Events)
    storage.AddUsers(resp.Users)

    return nil
}

Здесь появилась проблема с telethon, который многое делал внутри себя и не показывал этого. Так что кода для gotd пришлось сделать больше.

Первая трудность: авторизация

В Telegram есть два типа аккаунтов.

  1. Боты, которым @BotFather сразу выдает ID и токен. Их можно подставить в URL, и получится полный доступ к Bot API.

  2. Обычные юзеры с гораздо более сложным процессом аутентификации. Для него сначала нужен номер телефона и пароль, а потом второй фактор: нужно получить код либо через уже авторизованное устройство, либо через SMS.

    При этом полученной сессией не выйдет пользоваться сразу на двух устройствах, её заблокируют. Также у неё есть время жизни, после которого она может сама стать неактуальной и придётся перелогиниться.

Несмотря на сложности, в своей разработке я решил использовать обычный аккаунт, потому что это интереснее. Да и какие‑то нужные функции могли быть недоступны от имени бота.

Вторая трудность: знакомство с AccessHash

Чтобы клиенту получить какую‑то информацию об аккаунте или чате, нужно знать AccessHash — он используется как своего рода линкер между двумя пользователями, который никогда не меняется. Его дают, если произошёл коннект: например, в списке последних активностей в канале можно нажать на имя нового подписчика и перейти в его профиль.

К примеру, если аккаунт А успешно подписался на канал X, то нам будет выдан AccessHash = 123. А если аккаунт Б подписался на канал X, то будет сгенерирован AccessHash = 456. И даже если аккаунт Б завладеет AccessHash, это не поможет ему получить неавторизованный доступ к аккаунту А, потому что для каждой пары пользователь‑пользователь хэш будет уникальным.

Механизм необходим для приватности: иначе можно было бы перебрать все ID пользователей, каналы и чаты в цикле, а дальше использовать это, например, для рассылки спама. Поэтому чтобы получить какую‑то информацию о пользователе, нужно получить от него разрешение и узнать хэш.

В telethon эти хэши были спрятаны, хранились в базе SQLite и подставлялись сами. В нашем случае AccessHash будет служить доказательством, что пользователь уже «знаком» с нами и таким образом подтвердил желание взаимодействовать с нашим каналом. Если мы хотим обратиться к пользователю от имени нашего канала через библиотеку, то AccessHash должен быть под рукой.

3. Сохраняем события и пользователей

Для хранения данных я использовал Serverless YDB.

В serverless‑режиме для базы данных не нужен выделенный требующий обслуживания сервер, а тарифицируются только запросы и хранилище данных. С учётом лимитов, где первый миллион запросов в месяц и первый гигабайт данных будут бесплатными, этим могут пользоваться небольшие проекты и совсем без бюджета.

При этом в сервисе есть все необходимые механизмы для безопасного управления доступом и защиты данных — в контексте этой статьи я не буду останавливаться на этом подробно, но рекомендую ознакомиться с документацией по теме.

Размечаем таблицы

Для первоначальной задумки нужно хранить только два типа данных: пользователи и события.

  • Для пользователей будем хранить минимально необходимую информацию: ID, AccessHash, имя (для удобства сопоставления пользовательских действий). При желании можно настроить другой набор данных, главное, — опять же, не забыть позаботиться о правилах приватности и безопасности хранения.

  • Для событий храним ID, время, ID пользователя, тип события и все метаданные в формате JSON.

Запускаем миграции

Пока что golang‑migrate не умеет работать с YDB. Зато если попробовать создать таблицу, которая уже существует, мы не получим никаких ошибок. Поэтому нужно сделать CREATE TABLE операции при каждом запуске.

CREATE TABLE events (
    id          Int64 NOT NULL,
    date        Timestamp,
    user_id     Int64,
    action_type Utf8,
    action      Json,
    PRIMARY KEY (
                id
    )
);

CREATE TABLE users (
    id          Int64 NOT NULL,
    access_hash Int64,
    first_name  Utf8,
    
    PRIMARY KEY (
                id
    )
);

Записываем данные

Чтобы всё работало быстрее, тут мы можем использовать UPSERT (это как INSERT ... ON CONFLICT DO UPDATE). Так, если уже существует запись, например, о пользователе, мы просто её обновим.

UPSERT INTO events (
    id, date, user_id, action_type, action
) VALUES (
  $id, $date, $user_id, $action_type, $action
);

UPSERT INTO users (
    id, access_hash, first_name
) VALUES (
    $id, $access_hash, $first_name
);

Делаем интерфейс Storager с функциями AddEvent и AddUser:

type Storager interface {
    AddEvent(ctx context.Context, event Event) error
    AddUser(ctx context.Context, user User) error
}

После чего запускаем, заходим в консоль и видим наши данные!

Табличка с событиями

Табличка с пользователями

4. Импортируем старые события

Подписчики

От telethon осталась база данных, в которой есть ID и AccessHash пользователей, с которыми он взаимодействовал. Сохраняем их в csv, а дальше получаем остальное из Telegram и добавляем в базу.

f, err := os.Open(filePath)
if err != nil { ... }
defer func() { _ = f.Close() }()

csvReader := csv.NewReader(f)
people, err := csvReader.ReadAll()
if err != nil { ... }

for _, person := range people {
    var userID, userAccessHash int64
    var user User
    
    userID, err = strconv.ParseInt(person[0], 10, 64)
    if err != nil { ... }
    userAccessHash, err = strconv.ParseInt(person[1], 10, 64)
    if err != nil { ... }
    
    // sleep to prevent FLOOD_WAIT error
    time.Sleep(5 * time.Second)
    
    user, err = telegram.GetUser(ctx, userID, userAccessHash)
    if err != nil { ... }
    
    err = ydb.AddUser(ctx, user)
    if err != nil { ... }
}

Получилось 284 строки.

События

Тут уже не надо будет ничего получать из API Telegram, попробуем импортировать через YDB CLI.

Файлы с событиями выглядят так:

ChannelAdminLogEvent(
    id=1234567890,
    date=datetime.datetime(2023, 8, 15, 15, 30, 45, tzinfo=datetime.timezone.utc),
    user_id=1234567890,
    action=ChannelAdminLogEventActionParticipantJoinByInvite(
        ...
    )
)

И чтобы с ними работать, достаточно присвоить к какой‑нибудь переменной без дополнительных преобразований. Python интерпретируемый, поэтому делаем так через eval, собираем в JSON и выводим:

dir_name = "actions"

for action in os.listdir(dir_name):
    v = "".join(open(f"{dir_name}/{action}").readlines())
    exec(f"a = {v}")
    
    action_type = json.loads(a.action.to_json())["_"]
    action_type = action_type[0].lower() + action_type[1:]
    
    action = {
        "id": a.id,
        "date": a.date.isoformat(),
        "user_id": a.user_id,
        "action_type": action_type,
        "action": a.action.to_json(),
    }
    print(json.dumps(action))

Для импорта нам нужен файл, где каждая строка будет соответствовать будущей строке в таблице (массив объектов не подойдёт). Поэтому сохраняем в файл прямо так:

> python3 parse.py > events.json

Неотсортированные события для импорта

Теперь импортируем, используя yc и YDB:

# активируем профиль с нужным облаком
> yc config profile activate my-cloud

# проверяем, что мы видим базу данных
> yc ydb database list --folder-id <folder-id>
+------------+-----------+---------
|     ID     |   NAME    |   ... 
+------------+-----------+---------
|    ....    | channeler |   ...
+------------+-----------+---------

# генерируем iam-токен и сохраняем рядом
> yc iam create-token > my-token

# импортируем через ydb файл events.json в таблицу events
> ydb --iam-token-file my-token \
> -e grpcs://ydb.serverless.yandexcloud.net:2135 \
> -d /ru-central1/... \
> import file json -p myy_events events.json 
Elapsed: 0.342391 sec

Получилось 465 строк.

5. Строим графики

Заходим в DataLens и создаём подключение к нашей базе данных YDB:

Создаём датасет, в котором связываем поля между собой:

Так как события сохранялись не с создания канала, то задаём параметр с количеством подписчиков на начало подсчёта.

Добавляем параметр subscribers_delta, который показывает изменение количества подписчиков после какого‑то события:

CASE [action_type]
    WHEN "channelAdminLogEventActionParticipantJoin" THEN 1
    WHEN "channelAdminLogEventActionParticipantJoinByInvite" THEN 1
    WHEN "channelAdminLogEventActionParticipantJoinByRequest" THEN 1
    WHEN "channelAdminLogEventActionParticipantLeave" THEN -1
    ELSE 0
END

Дальше добавляем параметр subscribers_count, который показывает количество подписчиков на момент какого‑то события:

[initial_subscribers_count] + RSUM(SUM([subscribers_delta]), "asc" ORDER BY [date])

Теперь у нас есть всё, чтобы сделать самый главный чарт — подробный график количества подписчиков!

Например, тут видно дни, когда я активно рекламировал канал. Или отписки после подведения итогов розыгрыша.

Ещё можно сделать табличку, где видно всех подписчиков или не подписчиков:

  • Количество событий = COUNT([event_id])

  • Последнее событие = MAX([date])

  • Подписан или нет = SUM([subscribers_delta])

Таблица с пользователями

6. Что дальше

Теперь я планирую отдельно считать статистику и строить графики по инвайт‑ссылкам.

Также к каналу привязан чат, в котором тоже есть события и комментарии пользователей, которые можно сохранять и анализировать. Например, отправлять их в какой‑нибудь GPT и понимать, какие посты больше интересны подписчикам, а какие только уменьшают их количество.

Напишите нам

Начать пользоваться Yandex Cloud

Тарифы

Узнать цены и рассчитать стоимость

Мероприятия

Календарь событий Yandex Cloud
Собираем статистику по телеграм‑каналу и строим кастомные графики
Войдите, чтобы сохранить пост