MLOps Community
+00:00 GMT

Bridging the Culinary Gap - Part 2

Bridging the Culinary Gap - Part 2
# Analytics
# DAG
# LLM

It's time for the Sous Chef!

February 24, 2025
Jessica Michelle Rudd, PhD, MPH
Jessica Michelle Rudd, PhD, MPH
Bridging the Culinary Gap - Part 2

One Big Thing: Bridging the (Analytics) Culinary Gap (Part 2)

Why did the Airflow DAG break up with the Kubernetes Pod?

Because they had too many "scheduling conflicts"! 💔


GCP Cloud Composer & Airflow: Your Sous Chef for Orchestration

Welcome back to our data kitchen! In Part 1 of this series, we donned our chef hats and explored Dataform, our trusty cookbook for crafting delicious data transformations. Now, it's time to introduce the next essential member of our culinary team: Composer/Airflow, our masterful sous chef.

Remember, we're on a mission to build a robust ETL pipeline that serves up a feast of data-driven insights. Just like a well-coordinated dinner service, our pipeline needs to ensure the right data arrives at the right place, right on time.

In this post, we'll delve into the art of orchestration with Composer/Airflow. Think of it as the conductor of our data symphony, ensuring each ingredient is prepared and combined in perfect harmony. It ensures that each dish in our ETL pipeline is prepared in the right order, at the perfect temperature, and with impeccable timing.

So grab your aprons and let's continue our culinary adventure! By the end of this post, you'll be well-versed in the art of orchestration and ready to tackle the next course in our ETL feast.


Why Composer/Airflow is the Perfect Sous Chef

Imagine a busy kitchen with multiple dishes being prepared simultaneously. Without a skilled sous chef, chaos would erupt! Composer/Airflow brings order and efficiency to our data kitchen, ensuring a smooth and seamless workflow.

Here's how Composer/Airflow excels in its role:

  1. Masterful Scheduling: Just like a sous chef schedules the preparation of each dish, Composer/Airflow schedules the execution of our Dataform recipes (and any other required task in our pipeline). Need to refresh your data daily? Hourly? No problem! Composer/Airflow ensures your data dishes are served fresh and on time.
  2. Keen Monitoring: A good sous chef keeps a close eye on the cooking process, ensuring everything is proceeding as planned. Composer/Airflow monitors our ETL pipeline, alerting us if any dish is burning or if an ingredient is missing. This proactive monitoring prevents culinary disasters and ensures a smooth dining experience.
  3. Efficient Task Management: Just as a sous chef delegates tasks to the kitchen staff, Composer/Airflow delegates tasks to different workers in our data kitchen. This ensures efficient resource utilization and prevents bottlenecks in our data pipeline. It also ensure that dependent tasks do not start until the kitchen is ready; the cake doesn’t go in the oven until all the ingredients are added.

Composer/Airflow in Action: A Recipe for Orchestration

Let's see how Composer/Airflow orchestrates our data preparation. Imagine we need to prepare a three-course data meal:

  1. Appetizer: Extract fresh sales data from our raw data source.
  2. Main Course: Transform the data using our Dataform recipe, adding calculations and cleaning up inconsistencies.
  3. Dessert: Load the transformed data into a beautifully presented dashboard for our stakeholders to enjoy.

We’ve already defined some transformation steps in Dataform, as in our previous blog post. Composer/Airflow ensures that each course (transformation or processing step) is prepared in the correct order, with the appetizer served first, followed by the main course, and finally, the delectable dessert. A generic Dataform DAG is structured as:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:

create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)

create_compilation_result >> create_workflow_invocation

However, since a DAG may include the invocation of several (or many) Dataform tasks, bundling these steps into an Airflow task group makes for much cleaner, and dynamic code! Time for a musical interlude!


Dynamic DAGs: A Symphony of Automation

Imagine having a single conductor who can effortlessly orchestrate an entire orchestra. That's the power of dynamic DAGs in Airflow. Instead of creating individual DAGs for each Dataform job, you can use a single script to generate them automatically.


The dataform_dags.py Script: Your DAG Maestro

The dataform_dags.py (you can name this whatever you’d like) script acts as your DAG maestro, dynamically creating DAGs for each Dataform repository defined in a configuration file. This eliminates the need to write repetitive boilerplate code for each DAG, saving you time and effort.


How It Works

  1. Configuration: You define your Dataform repositories and their associated settings in a settings.py file. This includes information like the repository name, project ID, schedule interval, and any required parameters.
  2. Dynamic Generation: The dataform_dags.py script reads the settings.py file and generates a DAG for each repository. Each DAG is configured with the settings specified in the configuration file.
  3. Execution: Airflow automatically detects and schedules these dynamically generated DAGs, ensuring your Dataform jobs run seamlessly according to your defined settings.


Example:

First, create a custom Dataform task group which bundles the required Dataform tasks, i.e. dags/dataform/dataform_task_group.py:

from datetime import timedelta
import logging

from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import (
DataformWorkflowInvocationStateSensor,
)
from airflow.utils.task_group import TaskGroup
from google.cloud.dataform_v1beta1 import WorkflowInvocation

logger = logging.root.getChild("dags.operators.dataform")

DEFAULT_GIT_COMMITISH = "main"
DEFAULT_GCP_REGION = "us-central1"


# Data passing helper functions
def get_compilation_result_name(compilation_output: dict) -> str:
"""Assists in pulling the correct unique value for a Dataform operator
result name. This name will then be used to create new Dataform operators or
can be fed into Dataform client methods. Using the .output property here
seems to be a bit cleaner when figuring out task IDs inside of task groups
like when using xcom_pull.
Args:
compilation_output (dict):
An XCom return value of a Dataform operator.
Returns:
str: The "name" value of the dataform_output object.
"""
logger.info(f"Dataform output is of type: {type(compilation_output)}")
return compilation_output.get("name")


def get_workflow_invocation_id(invocation_output: dict, **kwargs) -> str:
"""Assists in pulling the correct unique value for a Dataform workflow
invocation. Using the .output property here seems to be a bit cleaner when
figuring out task IDs inside of task groups like when using xcom_pull.

Args:
invocation_output (dict):
An XCom return value of a DataformCreateWorkflowInvocationOperator.

Returns:
str: The ending section of the "name" value of the invocation_output
object that specifies the unique ID of a workflow invocation.
"""
logger.info(f"Invocation output is: {invocation_output}")
invocation_name = invocation_output.get("name")
return invocation_name.split("/")[-1]


# We want to be able to pass in templated variables from airflow/composer,
# i.e. {{ ts }} but this standard Dataform operator does not allow for templated
# variables. We're creating the wrapper class here to set the compilation_result
# config to allow templated parameters
class __DataformCreateCompilationResultOperator(
DataformCreateCompilationResultOperator
):
template_fields = DataformCreateCompilationResultOperator.template_fields + (
"compilation_result",
)


# Task Group definition
def get_dataform_task_group(
task_group_id: str,
repository: str,
project: str,
database: str,
schema: str,
code_config_vars: Optional[dict] = None,
git_commitish: str = DEFAULT_GIT_COMMITISH,
region: str = DEFAULT_GCP_REGION,
impersonation_chain: Optional[str] = None,
tags: Optional[List[str]] = None,
service_account: Optional[str] = None,
expected_statuses: Optional[Set[WorkflowInvocation.State]] = None,
failure_statuses: Optional[Set[WorkflowInvocation.State]] = None,
) -> TaskGroup:
"""Returns the necessary Dataform operators set up with values in
the constructor and sequenced as an Airflow Task Group. There are two
helper operators that simplify extracting variables from XComs as the Jinja
templating can get a little tricky in a task group.

The tasks: operators to run are as follows:

1. create_compilation_result: DataformCreateCompilationResultOperator
Compiles the sqlx code in a Dataform repository.

2. compilation_result_name: PythonOperator
Extracts the unique compilation result name for use in the next step of
invoking a Dataform workflow.

3. create_workflow_invocation: DataformCreateWorkflowInvocationOperator
Invokes a Dataform workflow, i.e. runs the data model in the Dataform
repository.

4. workflow_invocation_id: PythonOperator
Extracts only the unique identifier of the previous Dataform workflow
invocation. This is required by the following workflow invocation
sensor. Inspecting the log for this task will show the URL to the
workflow invocation in the GCP Console.

5. is_workflow_invocation_done: DataformWorkflowInvocationStateSensor
Sensor that holds up the DAG until the Dataform workflow invocation is
truly finished.

More information on the Dataform operators can be found here:
https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/dataform.html

Args:
task_group_id (str):
Airflow task group ID; this value will be prefixed to each task
within the group

repository (str):
Name of the target Dataform code repository.

project (str):
GCP project the Dataform repository resides in.

database (str):
BigQuery project that the Dataform results writes to.

schema (str):
BigQuery dataset that the Dataform code compiles to.

code_config_vars (Optional[dict]):
Dictionary of variable name keys and values for injecting into compiled dataform code, i.e. current date

git_commitish (str):
Branch of the Dataform repo to pull from. Defaults to `main`.

region (str):
GCP region of the Dataform repository. Defaults to `us-central1`.

impersonation_chain (Optional[str]):
The impersonation user for permissions if this is required. Defaults
to None.

tags (Optional[List[str]]):
List of Dataform tags to send in to the workflow invocation.
Defaults to an empty list.

service_account (Optional[str]):
The service account to execute the Dataform code with. Defaults to None (and the execution
will use default Dataform SA)

expected_statuses (Optional[Set[WorkflowInvocation.State]]):
Set of status codes that the sensor will interpret as
a reason to mark the is_workflow_invocation_done task as a success.
Possible values can be found here:
https://cloud.google.com/dataform/reference/rest/v1beta1/projects.locations.repositories.workflowInvocations#state

failure_statuses (Optional[Set[WorkflowInvocation.State]]):
Set of status codes that the sensor will interpret as
a reason to mark the is_workflow_invocation_done task as a failure.
Possible values can be found here:
https://cloud.google.com/dataform/reference/rest/v1beta1/projects.locations.repositories.workflowInvocations#state

Returns:
TaskGroup: Comprised of the Dataform operators necessary to
run a normal Dataform job in Airflow.
"""
# Set default values for non-required parameters if none were given
if tags is None:
tags = list()
if expected_statuses is None:
expected_statuses = {WorkflowInvocation.State.SUCCEEDED}
if failure_statuses is None:
failure_statuses = {
WorkflowInvocation.State.FAILED,
WorkflowInvocation.State.CANCELLED,
WorkflowInvocation.State.CANCELING,
}

# Build the task group
with TaskGroup(group_id=task_group_id) as dataform_group:

create_compilation_result = __DataformCreateCompilationResultOperator(
retries=4,
retry_delay=timedelta(seconds=120),
task_id="create_compilation_result",
project_id=project,
region=region,
repository_id=repository,
impersonation_chain=impersonation_chain,
compilation_result={
"git_commitish": git_commitish,
"code_compilation_config": {
"default_database": database,
"default_schema": schema,
"vars": code_config_vars,
},
},
)

compilation_result_name = PythonOperator(
task_id="compilation_result_name",
python_callable=get_compilation_result_name,
op_args=[create_compilation_result.output],
provide_context=True,
)

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create_workflow_invocation",
project_id=project,
region=region,
repository_id=repository,
impersonation_chain=impersonation_chain,
asynchronous=True,
workflow_invocation={
"compilation_result": str(compilation_result_name.output),
"invocation_config": {
"included_tags": tags,
"service_account": service_account,
},
},
)

workflow_invocation_id = PythonOperator(
task_id="workflow_invocation_id",
python_callable=get_workflow_invocation_id,
op_args=[create_workflow_invocation.output],
provide_context=True,
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=project,
region=region,
repository_id=repository,
impersonation_chain=impersonation_chain,
workflow_invocation_id=str(workflow_invocation_id.output),
expected_statuses=expected_statuses,
failure_statuses=failure_statuses,
retries=3,
)

task_group_succeed = EmptyOperator(task_id="task_group_succeed")

(
create_compilation_result
>> compilation_result_name
>> create_workflow_invocation
>> workflow_invocation_id
>> is_workflow_invocation_done
)

return dataform_group

Here's an example of how you might define a Dataform repository in the settings.py file and place this file somewhere under the dags folder in your Airflow repository, i.e. dags/dataform/settings.py:

config = {
# "{dag id}": {
# Required Params
# "start_date": datetime(XXXX, X, XX),
# "schedule": "@daily",
# "repository_id": "Name of Dataform repository",
# "project_id": "GCP project where Dataform repo lives",
# "schema": "Destination BigQuery dataset",
# "impersonation_chain": "GCP Service Account for the
# Optional Params
# "database": GCP project that Dataform writes to. Tables will exclusively be referenced and written in this project
# "code_config_vars": Dataform code config variables dictionary, default is None
# "git_commitish": "Dataform branch that should be ran. default is main",
# "tags": ["Dataform tags broken out into individual airflow tasks"],
# "tags_sequential": T/F flag to indicate if tag tasks should be run depedent of each other. Default false
# Dataform WorkflowInvocation Params
# "expected_statuses": {Dataform WorkflowInvocation response that will mark the Airflow task as success},
# default is WorkflowInvocation.State.SUCCEEDED
#
# "failure_statuses": {Dataform WorkflowInvocation response that will mark the Airflow task as failure},
# default is WorkflowInvocation.State.FAILED
#
# For more information on these parameters, see the get_dataform_task_group function in operators/dataform.py
# Testing
# },

{
"sales_data_dag": {
"start_date": datetime(2024, 1, 20),
"schedule_interval": "@daily",
"repository_id": "sales_data_repo",
"project_id": "your-gcp-project-id",
"schema": "your_dataset",
"impersonation_chain": "your-service-account",
# ... other settings ...
}
},
...
}

The dataform_dags.py script would then use this configuration to generate a DAG named sales_data_dag (and any other dag defined in the settings.py) that executes the Dataform jobs in the sales_data_repo repository:

from datetime import timedelta

from airflow import DAG

from dataform.dataform_task_group import get_dataform_task_group
from dataform.settings import config

def create_default_args(start_date, defaults=None):
d = {
"owner": "sales-team",
"depends_on_past": False,
"email": ["sales@sales_team.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 10,
"retry_delay": timedelta(seconds=10),
"retry_exponential_backoff": True,
"google_conn_id": "your-google-conn-id",
"on_failure_callback": on_operator_failure,
}
if defaults:
d.update(defaults)
return d

def create_dag(dag_id, params):
default_args = create_default_args(
params.get("start_date"),
{
"retries": 2,
"retry_delay": timedelta(seconds=30),
"retry_exponential_backoff": False,
"google_project_id": params.get("project_id"),
},
)
with DAG(
dag_id,
default_args=default_args,
schedule=params.get("schedule"),
catchup=False,
tags=[dag_id, "dataform"] + params.get("dag_tags", []),
) as dag:
if hasattr(dag, "doc_md"):
dag.doc_md = __doc__

previous_task = None

for i, tag in enumerate(params.get("tags", [None]), 1):
dataform_task = get_dataform_task_group(
task_group_id=f"{dag_id}-{tag}" if tag else f"{dag_id}",
project=params["project_id"],
repository=params["repository_id"],
database=params.get("database", "default-database"),
schema=params["schema"],
code_config_vars=params.get("code_config_vars", None),
git_commitish=params.get("git_commitish", "main"),
impersonation_chain=params["impersonation_chain"],
tags=[tag] if tag else None,
service_account=params.get("service_account"),
expected_statuses=params.get("expected_statuses"),
failure_statuses=params.get("failure_statuses"),
)
if params.get("tags_sequential", False):
if i == len(params.get("tags")):
previous_task >> dataform_task

elif previous_task is not None:
previous_task >> dataform_task
previous_task = dataform_task
else:
previous_task = dataform_task
else:
dataform_task


return dag


for dataform_dag_id, dataform_params in config.items():
globals()[dataform_dag_id] = create_dag(dataform_dag_id, dataform_params)


Benefits of Dynamic DAGs

  1. Reduced Code Duplication: Avoid writing repetitive code for each Dataform DAG.
  2. Centralized Configuration: Manage all your Dataform DAG settings in one place.
  3. Improved Maintainability: Easily update and modify your Dataform DAGs by simply changing the configuration file.
  4. Increased Scalability: Effortlessly add new Dataform repositories and their corresponding DAGs without writing any new code.

By leveraging the power of dynamic DAGs, you can streamline your Dataform workflow and ensure your data transformations are always running smoothly.

In this snippet, we define an Airflow DAG that executes our Dataform recipes in the correct sequence.

With Composer/Airflow as our sous chef, our data kitchen runs like a well-oiled machine, delivering perfectly prepared data dishes to our eager stakeholders.


Helpful Resources

  1. Quickstart: Run Apache Airflow DAG in Cloud Composer
  2. Schedule executions with Cloud Composer

🍬 Sweet & Sour Candy (this week’s good, bad, or weird of the tech world)

🤢 False Positive - explores the phenomenon of "de-banking" across Europe, where automated systems and algorithms incorrectly flag individuals and organizations as financial risks, leading to their accounts being blocked. This podcast interviews victims of these "false positives" and industry insiders to uncover why these errors occur and the devastating consequences they can have.

😀 Robots and labor in nursing homes - Our robot overlords may actually help us perform service tasks like better humans, when planned appropriately.

🍫 One last bite

“But I know, somehow, that only when it is dark enough can you see the stars.”― Martin Luther King, Jr.

Thank you for reading. This post is public, so feel free to share it.



Originally posted at: https://funsizedatabytes.substack.com/p/bridging-the-culinary-gap-part-2

Dive in
Related
Blog
Bridging the Culinary Gap
By Jessica Michelle Rud... • Feb 11th, 2025 Views 35
Blog
Bridging the Culinary Gap
By Jessica Michelle Rud... • Feb 11th, 2025 Views 35
1:14:35
video
Evolving Workflow Orchestration
By Joselito Balleta • Feb 14th, 2025 Views 179
18:32
video
The Power of Combining Analytics & ML on One Platform
By Joselito Balleta • Apr 16th, 2024 Views 306
1:11:12
video
Why You Need More Than Airflow
Jul 21st, 2022 Views 981