Create Dynamic Workflow in Apache Airflow

Problem

For a long time I search a way to properly create a workflow where the tasks depends on dynamic value based on a list of tables content in a text file.

Context explanation through a graphical example

A schematic overview of the DAG’s structure.

                |---> Task B.1 --|
                |---> Task B.2 --|
 Task A --------|---> Task B.3 --|--------> Task C
                |       ....     |
                |---> Task B.N --|

The problem is to import tables from a db2 IBM database into HDFS / Hive using Sqoop, a powerful tool designed for efficiently transferring bulk data from a relational database to HDFS, automatically through Airflow, an open-source tool for orchestrating complex computational workflows and data processing pipelines.

Inserted data are daily aggregate  using Sparks job, but I’ll only talk about the import part where I schedule the Sqoop job to dynamically import data into HDFS.

How to solve it?

The names of the tables to be imported are contained in a text file that is read, placed in a list of elements using Pandas, before being returned through an XComs. The value returned will allow us to determine how many tasks the DAG will have to process in parallel but also to modify the value of the Bash command used to import the different tables.

 

Let’s make some codes to understand how does it works!

The better way to understand it’s to try by yourself!

Create and Configure the DAG

Start by importing the required Python’s libraries. Define a new Airflow’s DAG (e.g. a daily DAG) and add some arguments without forgetting to set provide_context to true. We also have to add the Sqoop commands arguments parameters that we gonna use in the BashOperator, the Airflow’s operator, fit to launch bash commands.

#/usr/bin/python3
# -*- coding: utf-8 -*-
 
import logging
import pandas as pd
import airflow
from airflow import DAG
from airflow.utils import dates as date
from datetime import timedelta, datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.decorators import apply_defaults
 
""" define DAG arguments which can be override on a per-task basis during operator initialization """
default_args = {
  'owner': 'Tom',
  'depends_on_past': False,
  'start_date': datetime(2018, 4, 15),
  'email': ['tom-kun@gmail.com'],
  'email_on_failure': True,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
  'provide_context': True # Provide_context is required when we're using XComs Airflow's concept to push and pull function results into an other task.
}
 
"""
Define a daily DAG using the default arguments by changing the "schedule_interval" arguments the "dag_daily" should be launched every day to process bunch of tables from db2 to HDFS
"""
dag_daily = DAG(
  'daily',
  default_args=default_args,
  description='Importing daily data from DB2 IBM database into HDFS as parquet files',
  schedule_interval='@once'
)
 
""" Parameters sent to the BashOperator command """
import_parameters = {
  'db_host': 'slave2.mycluster',
  'db_port': '5047',
  'db_name': 'imdb',
  'db_schema': 'DEV',
  'username': 'Tom',
  'mappers': 3,
  'target_dir': '/user/airflow/',
  'compression_type': 'snappy'
}

Values that we want to use inside our dynamic workflow

We have the environment to start implementing the dynamic workflow. Let’s start by creating a Python function which read table’s name that we want to import into HDFS.

def get_tables(table_file="/tmp/daily", **kwargs):
logging.info("######## Starting get_tables() function ########")
logging.info("######## Load the table file into a new Pandas DataFrame ########")
df_tables = pd.read_csv(table_file, names=["TABLES"])
df_tables["TABLES"] = df_tables["TABLES"].str.strip()
lst_tables_sqoop = df_tables["TABLES"].tolist()
return lst_tables_sqoop

Generate the bash jobs dynamically through a Python function

All the tables that we want to import are now in the Airflow context, so we can write the function which generate the tasks dynamically using the result of our previous function.

def sqoop_commands(table, **kwargs):
"""
Returning a BashOperator using the list previously returned and use the table name when importing data from RGDBM into HDFS through Sqoop.
"""
lst_tables_mapping = "{{ task_instance.xcom_pull(task_ids='get_tables') }}"
return BashOperator(task_id="sqoop_import_{}_table".format(table),
bash_command="sqoop import -Dmapreduce.job.user.classpath.first=true " +
"-Dmapreduce.job.user.classpath.first=true " +
"-Dhadoop.security.credential.provider.path=jceks://hdfs/user/airflow/credentials.jceks " +
"-Dorg.apache.sqoop.splitter.allow_text_splitter=true " +
"--connect 'jdbc:db2://{{ params.db_host }}:{{ params.db_port }}/{{ params.db_name }}:currentSchema={{ params.db_schema }};' " +
"--username {{ params.username }} " +
"--password-alias db2.password " +
"-m {{ params.mappers }} " +
"--outdir /tmp/java " +
"--driver com.ibm.db2.jcc.DB2Driver " +
"--delete-target-dir " +
"--target-dir {{ params.target_dir }}{{ params.db_name }}/{{ params.db_schema }}/" + table +
" --table " + table +
" --as-parquetfile " +
"--compression-codec {{ params.compression_type }}",
params=import_parameters,
dag=dag_daily)

Create the tasks and the workflow

Get_tables() function called through a PythonOperator

The two functions are created. In the second one, you can see that it returns the value of a specific Airflow task (BashOperator). The first one is simply here to push the list of tables.
We need to create the first task of our workflow by calling the get_tables() function.

# PythonOperator to get the list of the tables.
push_tables_list = PythonOperator(
        task_id="get_tables",
        python_callable=get_tables,
        dag=dag_daily)

# DummyOperator to check if it works!
complete = DummyOperator(
        task_id="complete",
        dag=dag_daily)

Generate the dynamic workflow using iteration on the PythonOperator results

We can combine them and create our dynamic workflow by simply iterate over the table’s name list.

for table in get_tables():
  push_tables_list >> sqoop_commands(table) >> complete

Workflow’s overview

It works like a charm!
If you need more informations, do not hesitate to contact me at thomas.bazzucchi@data-essential.com

Contact Us