One Big Thing: Bridging the (Analytics) Culinary Gap (Part 2)
Why did the Airflow DAG break up with the Kubernetes Pod?
…
Because they had too many "scheduling conflicts"! 💔
GCP Cloud Composer & Airflow: Your Sous Chef for Orchestration
Welcome back to our data kitchen! In Part 1 of this series, we donned our chef hats and explored Dataform, our trusty cookbook for crafting delicious data transformations. Now, it's time to introduce the next essential member of our culinary team: Composer/Airflow, our masterful sous chef.
Remember, we're on a mission to build a robust ETL pipeline that serves up a feast of data-driven insights. Just like a well-coordinated dinner service, our pipeline needs to ensure the right data arrives at the right place, right on time.
In this post, we'll delve into the art of orchestration with Composer/Airflow. Think of it as the conductor of our data symphony, ensuring each ingredient is prepared and combined in perfect harmony. It ensures that each dish in our ETL pipeline is prepared in the right order, at the perfect temperature, and with impeccable timing.
So grab your aprons and let's continue our culinary adventure! By the end of this post, you'll be well-versed in the art of orchestration and ready to tackle the next course in our ETL feast.
Why Composer/Airflow is the Perfect Sous Chef
Imagine a busy kitchen with multiple dishes being prepared simultaneously. Without a skilled sous chef, chaos would erupt! Composer/Airflow brings order and efficiency to our data kitchen, ensuring a smooth and seamless workflow.
Here's how Composer/Airflow excels in its role:
Masterful Scheduling: Just like a sous chef schedules the preparation of each dish, Composer/Airflow schedules the execution of our Dataform recipes (and any other required task in our pipeline). Need to refresh your data daily? Hourly? No problem! Composer/Airflow ensures your data dishes are served fresh and on time.
Keen Monitoring: A good sous chef keeps a close eye on the cooking process, ensuring everything is proceeding as planned. Composer/Airflow monitors our ETL pipeline, alerting us if any dish is burning or if an ingredient is missing. This proactive monitoring prevents culinary disasters and ensures a smooth dining experience.
Efficient Task Management: Just as a sous chef delegates tasks to the kitchen staff, Composer/Airflow delegates tasks to different workers in our data kitchen. This ensures efficient resource utilization and prevents bottlenecks in our data pipeline. It also ensure that dependent tasks do not start until the kitchen is ready; the cake doesn’t go in the oven until all the ingredients are added.
Composer/Airflow in Action: A Recipe for Orchestration
Let's see how Composer/Airflow orchestrates our data preparation. Imagine we need to prepare a three-course data meal:
Appetizer: Extract fresh sales data from our raw data source.
Main Course: Transform the data using our Dataform recipe, adding calculations and cleaning up inconsistencies.
Dessert: Load the transformed data into a beautifully presented dashboard for our stakeholders to enjoy.
We’ve already defined some transformation steps in Dataform, as in our previous blog post. Composer/Airflow ensures that each course (transformation or processing step) is prepared in the correct order, with the appetizer served first, followed by the main course, and finally, the delectable dessert. A generic Dataform DAG is structured as:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
However, since a DAG may include the invocation of several (or many) Dataform tasks, bundling these steps into an Airflow task group makes for much cleaner, and dynamic code! Time for a musical interlude!
Dynamic DAGs: A Symphony of Automation
Imagine having a single conductor who can effortlessly orchestrate an entire orchestra. That's the power of dynamic DAGs in Airflow. Instead of creating individual DAGs for each Dataform job, you can use a single script to generate them automatically.
The dataform_dags.py Script: Your DAG Maestro
The dataform_dags.py (you can name this whatever you’d like) script acts as your DAG maestro, dynamically creating DAGs for each Dataform repository defined in a configuration file. This eliminates the need to write repetitive boilerplate code for each DAG, saving you time and effort.
How It Works
Configuration: You define your Dataform repositories and their associated settings in a settings.py file. This includes information like the repository name, project ID, schedule interval, and any required parameters.
Dynamic Generation: The dataform_dags.py script reads the settings.py file and generates a DAG for each repository. Each DAG is configured with the settings specified in the configuration file.
Execution: Airflow automatically detects and schedules these dynamically generated DAGs, ensuring your Dataform jobs run seamlessly according to your defined settings.
Example:
First, create a custom Dataform task group which bundles the required Dataform tasks, i.e. dags/dataform/dataform_task_group.py:
from datetime import timedelta
import logging
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import (
DataformWorkflowInvocationStateSensor,
)
from airflow.utils.task_group import TaskGroup
from google.cloud.dataform_v1beta1 import WorkflowInvocation
Here's an example of how you might define a Dataform repository in the settings.py file and place this file somewhere under the dags folder in your Airflow repository, i.e. dags/dataform/settings.py:
config = {
# "{dag id}": {
# Required Params
# "start_date": datetime(XXXX, X, XX),
# "schedule": "@daily",
# "repository_id": "Name of Dataform repository",
# "project_id": "GCP project where Dataform repo lives",
# "schema": "Destination BigQuery dataset",
# "impersonation_chain": "GCP Service Account for the
# Optional Params
# "database": GCP project that Dataform writes to. Tables will exclusively be referenced and written in this project
# "code_config_vars": Dataform code config variables dictionary, default is None
# "git_commitish": "Dataform branch that should be ran. default is main",
# "tags": ["Dataform tags broken out into individual airflow tasks"],
# "tags_sequential": T/F flag to indicate if tag tasks should be run depedent of each other. Default false
# Dataform WorkflowInvocation Params
# "expected_statuses": {Dataform WorkflowInvocation response that will mark the Airflow task as success},
# default is WorkflowInvocation.State.SUCCEEDED
#
# "failure_statuses": {Dataform WorkflowInvocation response that will mark the Airflow task as failure},
# default is WorkflowInvocation.State.FAILED
#
# For more information on these parameters, see the get_dataform_task_group function in operators/dataform.py
# Testing
# },
{
"sales_data_dag": {
"start_date": datetime(2024, 1, 20),
"schedule_interval": "@daily",
"repository_id": "sales_data_repo",
"project_id": "your-gcp-project-id",
"schema": "your_dataset",
"impersonation_chain": "your-service-account",
# ... other settings ...
}
},
...
}
The dataform_dags.py script would then use this configuration to generate a DAG named sales_data_dag (and any other dag defined in the settings.py) that executes the Dataform jobs in the sales_data_repo repository:
from datetime import timedelta
from airflow import DAG
from dataform.dataform_task_group import get_dataform_task_group
Reduced Code Duplication: Avoid writing repetitive code for each Dataform DAG.
Centralized Configuration: Manage all your Dataform DAG settings in one place.
Improved Maintainability: Easily update and modify your Dataform DAGs by simply changing the configuration file.
Increased Scalability: Effortlessly add new Dataform repositories and their corresponding DAGs without writing any new code.
By leveraging the power of dynamic DAGs, you can streamline your Dataform workflow and ensure your data transformations are always running smoothly.
In this snippet, we define an Airflow DAG that executes our Dataform recipes in the correct sequence.
With Composer/Airflow as our sous chef, our data kitchen runs like a well-oiled machine, delivering perfectly prepared data dishes to our eager stakeholders.
🍬 Sweet & Sour Candy (this week’s good, bad, or weird of the tech world)
🤢 False Positive- explores the phenomenon of "de-banking" across Europe, where automated systems and algorithms incorrectly flag individuals and organizations as financial risks, leading to their accounts being blocked. This podcast interviews victims of these "false positives" and industry insiders to uncover why these errors occur and the devastating consequences they can have.
😀 Robots and labor in nursing homes- Our robot overlords may actually help us perform service tasks like better humans, when planned appropriately.
🍫 One last bite
“But I know, somehow, that only when it is dark enough can you see the stars.”― Martin Luther King, Jr.
Thank you for reading. This post is public, so feel free to share it.
Originally posted at: https://funsizedatabytes.substack.com/p/bridging-the-culinary-gap-part-2