Получение логов выполнения DAG
Статья создана
Обновлена 3 декабря 2025 г.
С помощью направленного ациклического графа (DAG) вы можете получать логи выполнения DAG и при необходимости экспортировать их в отдельное хранилище.
Подготовьте DAG-файл и запустите граф
-
Создайте локально файл с именем
export_dag_logs.pyи скопируйте в него скрипт:import os import json from airflow.decorators import dag, task import boto3 def system_logs_bucket_name() -> str: with open('/opt/airflow/airflow.cfg') as f: for line in f: line = line.strip() if not line.startswith('remote_base_log_folder'): continue s3_path = line.split('=')[1].strip() return s3_path.split('//')[1] @dag(schedule=None) def export_dag_logs(): @task def list_logs_bucket(): str_conn = os.getenv('AIRFLOW_CONN_S3_DAG_LOGS') if not str_conn: raise Exception('env var AIRFLOW_CONN_S3_DAG_LOGS not found or empty') conn = json.loads(str_conn) bucket = system_logs_bucket_name() session = boto3.session.Session() s3 = session.client( service_name='s3', endpoint_url=conn['extra']['endpoint_url'], aws_access_key_id=conn['login'], aws_secret_access_key=conn['password'], ) # Here we can do anything with logs, e.g. clone them to custom bucket resp = s3.list_objects_v2(Bucket=bucket) object_keys = [c['Key'] for c in resp['Contents']] print('Log files:\n') print('\n'.join(object_keys)) list_logs_bucket() export_dag_logs()Примечание
Переменная
AIRFLOW_CONN_S3_DAG_LOGSуже задана на воркере, ее не нужно задавать дополнительно. -
Загрузите DAG-файл
export_dag_logs.pyв созданный ранее бакет. В результате одноименный граф появится в веб-интерфейсе Apache Airflow™ автоматически. -
Убедитесь, что в разделе DAGs появился новый граф
export_dag_logs.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить граф, в строке с его именем нажмите кнопку
.
Проверьте результат
Чтобы проверить результат в веб-интерфейсе Apache Airflow™:
Версия Apache Airflow™ ниже 3.0
Версия Apache Airflow™ 3.0 и выше
- В разделе DAGs нажмите на граф
export_dag_logs. - Перейдите в раздел Graph.
- Выберите задание list_logs_bucket.
- Перейдите в раздел Logs.
- Убедитесь, что в логах есть строка
Log files: {content}, гдеcontent— список логов выполнения DAG. Это значит, что запрос выполнен успешно.
- В разделе Dags нажмите на граф
export_dag_logs. - Перейдите в раздел Tasks.
- Выберите задание list_logs_bucket.
- Перейдите в раздел Task Instances.
- Выберите экземпляр задания.
- Откроется раздел Logs.
- Убедитесь, что в логах есть строка
Log files: {content}, гдеcontent— список логов выполнения DAG. Это значит, что запрос выполнен успешно.