software-engineering, tutorial,

Triggering dynamic Spark tasks on Databricks with Apache Airflow

Kasper Heyndrickx Kasper Heyndrickx Follow Sep 07, 2022 · 3 mins read
Triggering dynamic Spark tasks on Databricks with Apache Airflow
Share this

Big Data in the Cloud

Databricks is a cloud data platform, it can be used with Spark applications to manage and analyze huge datasets. It has a great user interface, and allows data scientists to run queries without needing to worry about infrastructure. Especially when using a Platform as a Service (PaaS) solution such as Azure Databricks, anyone can start running queries in a matter of minutes.

At TomTom we’re no strangers to big data, but we also focus heavily on automation. While manually triggering Spark jobs on Databricks is fun, automating this process makes it sustainable. This is where Apache Airflow comes in.

Automated workflow

Apache Airflow is a workflow management platform for data engineering pipelines. A whole mouthful, but it’s essentially just a platform to help you run tasks, and execute these tasks in sequence and in parallel using a pipeline. Pipelines and tasks defined as Directed Acyclic Graphs (DAGs), written in Python. Running a simple hello-world task would look something like this:

def print_hello():
  return 'Hello World'

hello_task = PythonOperator(task_id='hello_world_task', 
                            python_callable=print_hello, 
                            dag=dag)

Databricks has a REST API, so technically we don’t need anything else. We can make calls to the Databricks endpoint to trigger new Spark jobs, with standard Python code. This can be messy, but lucky for us, there’s already a built-in integration between Airflow and Databricks: DatabricksSubmitRunOperator.

Triggering Spark tasks

Similar to the hello-world Python example, this is what running hello-world on Databricks would look like:

submit_run_task = DatabricksSubmitRunOperator(
    task_id = "hello_world_task",
    existing_cluster = "cluster_id",
    spark_submit_task = {"parameters": ["--class", "com.example.HelloWorld",
                                       "dbfs:/FileStore/jars/hello-world.jar"]},
)

This assumes that the hello-world Java archive (.jar file) is already installed on Databricks. The parameters in the spark_submit_task are hardcoded here, but usually those parameters are not fixed. We can evaluate some values at runtime by using Jinja templating.

Airflow templates

Let’s say the class to be used is passed to us using a pipeline parameter. Using templates, we can pass this directly to our spark_submit_task:

submit_run_task = DatabricksSubmitRunOperator(
    task_id = "hello_world_task",
    existing_cluster = "cluster_id",
    spark_submit_task = {"parameters": ["--class", "{{ params.classname }}",
                                        "dbfs:/FileStore/jars/hello-world.jar"]},
)

This is good for individual parameters, but what if we want to parse those parameters first? Let’s illustrate this with a simple example.

params.jar_class = "hello-world.jar,com.example.HelloWorld"

def generate_parameters(jar_class):
  jarname   = jar_class.split(",")[0]
  classname = jar_class.split(",")[1]
  return {"parameters": ["--class", classname,
                         "dbfs:/FileStore/jars/" + jarname]}

submit_run_task = DatabricksSubmitRunOperator(
    task_id = "hello_world_task",
    existing_cluster = "cluster_id",
    spark_submit_task = generate_parameters("{{ params.jar_class }}") 
)

This is a naive approach that unfortunately does not work. The problem is that not all Python code in our DAG is evaluated at runtime. In fact, most code is evaluated when the DAG is loaded into Airflow. This means that the generate_parameters function will be called with the literal string "{{ params.jar_class }}". Calling .split(",") on this string will return just one String, resulting in the error: IndexError: list index out of range.

Airflow User-defined Macros

We know that templates are evaluated at runtime, so why not move generate_parameters inside the templated code?

spark_submit_task = "{{ generate_parameters(params.jar_class) }}" 

There’s one problem with this though: The generate_parameters function is not known in the context of Jinja templating. There’s only a limited amount of variables that can be used here. The solution is to let Airflow know in advance that we’re going to use a certain function in a template. This can be done using DAG.user_defined_macros. In practice, this looks as follows:

params = {
    "jar_class": Param(
        default="hello-world.jar,com.example.HelloWorld", 
        type="string", 
        description="Comma separated string of jar and class"
    )
}

def generate_parameters(jar_class):
  jarname   = jar_class.split(",")[0]
  classname = jar_class.split(",")[1]
  return {"parameters": ["--class", classname,
                         "dbfs:/FileStore/jars/" + jarname]}

with DAG(
    "hello_world",
    params=params,
    user_defined_macros={"generate_parameters": generate_parameters}
) as dag:

    submit_run_task = DatabricksSubmitRunOperator(
        task_id = "hello_world_task",
        existing_cluster = "cluster_id",
        spark_submit_task = "{{ generate_parameters(params.jar_class) }}" 
    )

Once you understand how templating with macros works, it becomes a powerful tool on the path towards more automation.

Kasper Heyndrickx
Written by Kasper Heyndrickx
Software Engineer