Using results of completed jobs
Sometimes you need to process the job outputs or use them to run another job. To do this, download them using the DataSphere Jobs SDK download_job_files
function or substitute them into another job using the DAG Airflow operator.
Downloading via DataSphere CLI
If the total size of job results exceeds the maximum total size of files you can download via DataSphere CLI, only the files with the total size not exceeding this limit will be downloaded.
To download job results into the working directory using DataSphere CLI, run this command by substituting the job ID:
datasphere project job download-files --id <job_ID>
The function has optional parameters:
with_logs
: Download logs,false
by default.with_diagnostics
: Download diagnostic files,false
by default.with_diagnostics
: File download directory, the working directory by default.
To use the downloaded data in another job, specify the data in the input
parameter in the config.yaml
file.
Downloading via DAG Airflow
You can use a DAG operator to load the output file of one job (result.txt
) and provide it as an input file (INPUT_DATA
) for another:
from typing import Dict
from airflow.decorators import dag, task
import pendulum
from datasphere import SDK
now = pendulum.now()
@dag(dag_id='output_files_for_other_job', start_date=now, schedule="@daily", catchup=False)
def run():
@task(task_id='fork_job')
def fork_job(files_job_id: str, fork_source_job_id: str):
sdk = SDK()
sdk.download_job_files(files_job_id)
job = sdk.fork_job(fork_source_job_id, vars={'INPUT_DATA': 'result.txt'})
job.wait()
fork_job('<ID_of_job_to_download>', '<ID_of_job_to_run>')