Getting DAG execution logs
Using a directed acyclic graph (DAG), you can get DAG execution logs and export them to a separate storage if required.
Prepare the DAG file and run the graph
-
Create a local file named
export_dag_logs.pyand paste the following script to it:import os import json from airflow.decorators import dag, task import boto3 def system_logs_bucket_name() -> str: with open('/opt/airflow/airflow.cfg') as f: for line in f: line = line.strip() if not line.startswith('remote_base_log_folder'): continue s3_path = line.split('=')[1].strip() return s3_path.split('//')[1] @dag(schedule=None) def export_dag_logs(): @task def list_logs_bucket(): str_conn = os.getenv('AIRFLOW_CONN_S3_DAG_LOGS') if not str_conn: raise Exception('env var AIRFLOW_CONN_S3_DAG_LOGS not found or empty') conn = json.loads(str_conn) bucket = system_logs_bucket_name() session = boto3.session.Session() s3 = session.client( service_name='s3', endpoint_url=conn['extra']['endpoint_url'], aws_access_key_id=conn['login'], aws_secret_access_key=conn['password'], ) # Here we can do anything with logs, e.g. clone them to custom bucket resp = s3.list_objects_v2(Bucket=bucket) object_keys = [c['Key'] for c in resp['Contents']] print('Log files:\n') print('\n'.join(object_keys)) list_logs_bucket() export_dag_logs()Note
The
AIRFLOW_CONN_S3_DAG_LOGSvariable is already set on the worker and does not require any additional configuration. -
Upload the
export_dag_logs.pyDAG file to the bucket you created earlier. This will automatically create a graph with the same name in the Apache Airflow™ web interface. -
Open the Apache Airflow™ web interface.
-
Make sure a new graph named
export_dag_logshas appeared in the DAGs section.It may take a few minutes to load a DAG file from the bucket.
-
To run the graph, click
in the line with its name.
Check the result
To check the result in the Apache Airflow™ web interface:
- In the DAGs section, click the
export_dag_logsgraph. - Go to the Graph section.
- Select llist_logs_bucket.
- Go to Logs.
- Make sure the logs contain the
Log files: {content}line, wherecontentis the list of DAG execution logs. This means the query was successful.
- In the DAGs section, click the
export_dag_logsgraph. - Go to Tasks.
- Select llist_logs_bucket.
- Go to Tasks Instances.
- Select the task instance.
- The Logs section will open.
- Make sure the logs contain the
Log files: {content}line, wherecontentis the list of DAG execution logs. This means the query was successful.