Using results of completed jobs
Written by
Updated at October 24, 2024
Sometimes you need to process the job outputs or use them to run another job. To do this, upload them using the DataSphere Jobs SDK download_job_files
function or substitute them into another job using the DAG Airflow operator.
Uploading via DataSphere CLI
To upload job results into the working directory using DataSphere CLI, run this command by substituting the job ID:
Bash
datasphere project job download-files --id <job_ID>
The function has optional parameters:
with_logs
: Upload logs,false
by default.with_diagnostics
: Upload diagnostic files,false
by default.with_diagnostics
: File upload directory, the working directory by default.
To use the uploaded data in another job, specify the data in the input
parameter in the config.yaml
file.
Uploading via DAG Airflow
You can use a DAG operator to load the output file of one job (result.txt
) and provide it as an input file (INPUT_DATA
) for another:
Python
from typing import Dict
from airflow.decorators import dag, task
import pendulum
from datasphere import SDK
now = pendulum.now()
@dag(dag_id='output_files_for_other_job', start_date=now, schedule="@daily", catchup=False)
def run():
@task(task_id='fork_job')
def fork_job(files_job_id: str, fork_source_job_id: str):
sdk = SDK()
sdk.download_job_files(files_job_id)
job = sdk.fork_job(fork_source_job_id, vars={'INPUT_DATA': 'result.txt'})
job.wait()
fork_job('<ID_of_job_to_download>', '<ID_of_job_to_run>')