lastminute.com logo

Technology

How we use Airflow with Composer

alberto_crespi
alberto crespi

At Lastminute.com, we use Apache Airflow, managed through Google Cloud Composer, to orchestrate complex workflows. This article explores why we chose Composer as our Airflow distribution, how we use it to schedule a wide range of tasks, and how we organize files and resources to ensure scalability and maintainability. Additionally, we delve into our CI/CD practices, highlighting how we test workflows and manage costs, offering insights for teams seeking efficient and reliable data orchestration at scale in big tech company.


Over the years, our data & analytics department in lastminute.com, has been working with diverse tools to schedule our code:

  • The king of schedulers: crontab
  • Vendor-specific scheduling platforms
  • Azkaban
  • Rundeck
  • Google Cloud Scheduler
  • API calls

and more…

What we truly needed was a unified scheduler to manage everything.

We were searching for a tool that could offer us scalability, integration capabilities, extensibility, and the support of a thriving community. These qualities were crucial to ensure the platform could handle our growing workload, seamlessly integrate with our existing tools, adapt to evolving needs, and be supported by a robust ecosystem for long-term reliability. That’s when we found Apache Airflow.

We won’t bore you with how it works (if you’re curious, check out Marc Lamberti’s YouTube channel for an excellent introduction). Instead, we want to share how we use Airflow and the types of activities we’ve implemented with it.

In our data teams, working with data often means scheduling various types of code executions. We primarily manage four types of ETL/ELT:

  • SQL scripts
  • Third-party applications in JAR format
  • Custom code written in Python, Java, etc.
  • Containerized processes executed via Docker

Other than executing ETL processes, we needed workflows to handle supplementary tasks essential to the ETL process, such as:

  • Sensors to verify data readiness
  • Data ingestion from external systems (e.g., S3, SFTP, etc.)
  • Data unit tests
  • API calls to internal systems
  • Notifications and alerts via Slack or email

These were our prerequisites. And, guess what: we achieved all of this with a single tool: Airflow.

Which Airflow Distribution We Chose

Once we decided to adopt Airflow, our first major task was selecting the right distribution or vendor solution. The main options available were:

  • Google Cloud Composer: Airflow fully managed and integrated into Google Cloud.
  • Astronomer: A leading managed Airflow provider and official maintainer of the open-source project.
  • AWS Managed Workflows for Apache Airflow (MWAA): Airflow integrated into the AWS ecosystem.
  • Self-hosted Airflow: Managed internally by developers using virtual machines or Docker.
  • Other solutions: While there are additional providers, we focused on the ones listed above, prioritizing AWS and Google Cloud due to their robust offerings.

We quickly ruled out the self-hosted option. Managing Airflow internally makes sense only if you need heavy backend customization or immediate access to the latest Airflow releases without waiting for vendor updates. However, this approach requires a good in-house team capable of maintaining the complex backend infrastructure of Airflow. For us, the cost-benefit ratio was not advantageous.

Given our department’s existing adoption of Google Cloud (BigQuery) and the company’s use of Google Workspace, Composer became an obvious choice. This decision allowed us to:

  • Leverage pre-existing network permissions and integrations with minimal adjustments.
  • Ensure secure access through Google Workspace’s integrated authentication.
  • Incorporate Airflow into our broader Google Cloud environment (BigQuery or other Google Cloud resources).

What We Schedule on Airflow

How We Execute Our SQL-ETL

In this section, we will explore the types of operators we use in Airflow.

In our company, we are currently using Vertica as our main data warehouse, as well as BigQuery. To ensure we cover all bases, we also have some legacy systems running on MySQL.

To execute the code for our numerous teams in the Data & Analytics department, as previously mentioned, we immediately started using Airflow’s native operators.

  • For Vertica, we used the VerticaOperator.
  • For BigQuery, we leveraged the BigQueryInsertJobOperator.

This was straightforward, requiring only minor customizations of the source code, such as adding parameters to operators for tagging queries or adjusting default configurations to better suit our workflow.

Execute Third Parties Deliverables

The major customization we implemented was creating a custom operator that allowed us to execute the code of an open-source visual ETL platform. The operator essentially unzips a provided archive and runs a .sh file inside it, passing the necessary parameters to execute a jar.

Point of attention, the .sh file executes a Java JAR, meaning that our Python-based Airflow operator uses Python’s subprocess.Popen to run Java code. This approach was chosen for its simplicity and flexibility, as it allowed us to integrate existing Java-based ETL processes without rewriting them, while maintaining a high level of reliability. For purists, this might seem unorthodox, but in practice, it proved to be a highly robust solution. While we may lose some milliseconds in execution speed, all our ETL processes delegate computation to the data warehouse, making this approach highly effective.

Additionally, we execute Docker images using the KubernetesPodOperator and custom Python code via the PythonOperator.

How We Execute Our Supplementary Code (Not-ETL)

Data Ingestion from External Systems

This involves a combination of custom solutions and native operators, such as the SFTPOperator for handling SFTP transfers.

Data Unit Tests

We developed a small internal library similar to how dbt (if you are interested here you can find how dbt manage tests) handles testing. This library helps us validate:

  • Duplicates
  • Not null constraints
  • Accepted values

API Calls to Internal Systems

For API interactions, we use a mix of custom Python code and the HttpOperator to handle requests.

Notifications and Alerts via Slack

For Slack notifications, we wrote custom Python code to send messages to three distinct channels for every single team:

  • _success
  • _retry
  • _failure

For retries and failures, we use the default DAG webhook settings:

default_args = {
   'on_failure_callback': generic_dag_failure(environment=environment,
                                              team_name=team_name),
   'on_retry_callback': generic_task_warning(environment=environment,
                                             team_name=team_name),
   ....
}

For success notifications, we apply a webhook only to the final task of the DAG, such as:

end = BashOperator(
   task_id='task_id_name',
   bash_command='echo "closer',
   on_success_callback=generic_dag_success(environment=environment,
                                           team_name=team_name),
)

As a company policy, alarms are sent only in the production environment by default. The Slack channel name is automatically constructed based on the department name, team name, and alert type, such as success, retry, or failure.

Below is an example of a notification we receive for a failure:

Failure message
Failure message

This functionality has been one of the department’s most appreciated features, making email notifications somewhat obsolete and enabling immediate visibility of alerts on smartphones, especially for retries and failures.

Sensors to Verify Data Readiness

For this purpose, we use both Airflow’s native SQL sensors and customized ones like the SqlSensorWithNotifications. This custom sensor allows us to receive notifications about delayed resources from a list of sources the ETL flow is waiting for. Below is the code for the custom sensor:

sensor_task = SqlSensorWithNotifications(
    poke_interval=30,
    timeout=3600 * 14,
    task_id='task_id',
    conn_id='conn_id',
    team_name=team_name,
    tag="tag_name",
    notifications_every_n_seconds=3600 * 2,
    sql_for_notifications="core/sql_sensors/sensor.sql",
)

Essentially, what we want is for a notification to be sent to the retry channel every notifications_every_n_seconds, indicating which sources have not yet arrived. Below is an example of the Slack notification we receive.

Notifications on slack
Notifications on slack

While this approach may not be entirely democratic, it is practical for a nightly flow with ten or more sources, an engineer can quickly identify the delayed source and address it.

Data on Slack

More and more companies are becoming Slack-centric, and another highly appreciated operator has been the deployment of a JPEG image to a Slack channel. Basically for aggregated table, typically macro-aggregators for management purposes. In this case as well, frontend purists might frown upon this approach, but for certain high-level aggregated data sent via Slack, it can generate quick discussions among people within the channel or those tagged.

To create this functionality, we essentially take a snapshot of the frontend (which could even be a Google Worksheet equivalent to Excel) and send the image via Slack, nothing particularly complex.

How We Organize Files Within Data and Resources

In our department, we are about ten teams, and each team has a GitLab repository with predefined folders. Below is the folder structure of a typical GitLab repo:

dags/ -> all the DAGs
dags/resources -> Python module resources for everyone
dags/resources/team_name -> Python module resources for every team
data/team_name/sql_branch -> dag_name/sqlcode
data/team_name/sql_scripts -> dag_name/sqlcode
data/team_name/sql_sensor -> dag_name/sqlcode
data/team_name/third_parties_zip/ -> third-party files
data/team_name/tests -> dag_name/sqlcode

Here’s what goes where:

  • All DAGs are placed in the dags folder. Teams are required to prefix DAG names with their team name; otherwise, the CI/CD pipeline will not copy the DAG. This governance trick ensures order and directs issues to the appropriate stakeholders. Matching DAG IDs and file names: the DAG ID must match the file name to avoid errors. This is achieved with the following syntax:dag_id=os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
    • Resources for every team: Python code shared across all teams is placed in a separate repository, as described later and is stored in dags/resources
    • Resource for a single team: on the other hand, custom Python code used only by individual teams is stored under the resources/team_name folder with the team name.
  • Data folder: a subfolder will be created with the name of every team of the department (example data/finance)
    • sql_branch here all the sql which is used for sql_branch
    • sql_scripts here all the sql which is used for ETL
    • sql_sensors here all the sql which is used for sensor reason
    • third_parties_zip here all the zip used by a third party tool to perform ETL
    • test a folder to test the syntax of the dags

If the CI/CD succeeds, we copy all the mentioned folders into their respective locations in the Composer-provided bucket.

Resources for Everyone

We maintain a separate repository (above is the resources for every team) containing resources shared by all teams. This repository is managed via merge requests and treated like an open-source project, with the architecture team as maintainers.

This shared repository includes custom operators and Python files used by all teams (e.g., Slack webhook utilities). Each merge request triggers the CI/CD pipeline to deploy the code into the Composer bucket under the dag/resources folder.

Notably, we’ve observed significant enthusiasm among data engineers to contribute to this repository, fostering a shared development culture within lastminute.com Data & Analytics department.

It is important to avoid overloading the scheduler by setting resources/ in the .airflowignore file to exclude files under resources from being scheduled.

Access and Permissions

Every department member is granted Composer User and Viewer roles in Google Cloud IAM, allowing them to view everything and access only the Composer UI.

Within Composer’s UI, under the “list roles” section, we created team-specific roles with the following controls for every team:

can read on Connections, can edit on Connections, can create on Connections,
can read on DAG Runs, can read on Task Instances, can edit on Task Instances,
can create on DAG Runs, can read on Audit Logs, can read on ImportError,
can read on Providers, can read on XComs, can read on DAG Code,
can read on Configurations, can read on Plugins, can get on MenuApi,
can edit on DAG Runs, menu access on DAG Runs, menu access on Browse,
can read on Jobs, menu access on Jobs, menu access on Audit Logs,
menu access on Variables, menu access on Admin, can create on Task Instances,
menu access on Task Instances, can read on Task Reschedules,
menu access on Task Reschedules, menu access on Configurations,
menu access on Connections, can read on SLA Misses, menu access on SLA Misses,
menu access on Plugins, can create on XComs, can delete on XComs,
menu access on XComs, menu access on DAG Dependencies, menu access on Documentation,
menu access on Docs, can read on DAG Dependencies, can read on Task Logs,
can read on Website, can delete on Task Instances, can read on Variables,
menu access on Composer Menu, menu access on DAGs in Cloud Console,
menu access on DAGs in Cloud Storage, menu access on Environment Monitoring,
menu access on Environment Logs, menu access on Composer Documentation

Auto-Assigning Permissions in DAGs

Permissions for DAGs are auto-assigned within the DAG itself using the following syntax:

dag = DAG(
    dag_id=os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
    default_args=default_args,
    schedule_interval=SCHEDULE_INTERVAL,
    start_date=START_DATE,
    dagrun_timeout=timedelta(hours=23),
    tags=['finance'],
    access_control={'finance': {'can_read', 'can_edit'}},
)

The access_control line ensures that the finance team automatically has read and edit permissions for their DAG in the UI.

Finally, we use the following command in the CI/CD pipeline to update permissions in real time:

gcloud composer environments run composer-prod --location=europe-west1 --project project_name sync-perm

This ensures that permissions are updated promptly, avoiding delays.

Our CI/CD: What We Test

When a push to main/master occurs, the GitLab CI/CD pipeline is triggered.

Using Flake, we ensure there are no:

  • syntax errors (E999)
  • modules imported but unused (F401)
  • undefined name (F821)

After that, we upload the diff of objects of the commits to the Composer storage.

The dags and the other files mentioned above will be deployed to a test environment. To verify errors and validate DAG syntax using Google Composer’s SDK commands. Here is the command we use:

gcloud composer environments run composer-test --project project_name --location europe-west1 dags list-import-errors -- --subdir /home/airflow/gcs/data/${TEAM_NAME}/test -o plain

If the task succeeds, the CI/CD copies all the mentioned folders into their respective locations in the Composer-provided bucket.

Be sure Every Team Only Use their Own Connections

The CI/CD pipeline verifies that the conn_id in a DAG belongs to the respective team. For example, all DAGs for the finance team start with finance, and their connections are similarly prefixed. While complex, this mechanism is highly effective. To our understanding, there is no way to avoid team to user other connection in airflow, in this way we solved this data governance issues.

Git Flow Strategy

With some teams, we are already in production with an additional check where, in the merge request stage, we verify that after the timestamp of the last commit, the DAG has successfully run in the test environment. This check has already saved several unnecessary production commits by requiring teams to run a successful test before proceeding. We are working to make this mandatory for all teams.

Which Settings We Use with Composer

Currently, we are using composer-2.5.5-airflow-2.6.3, running approximately 600 DAGs with varying schedules: every 5 minutes, hourly, daily, or weekly, resulting in a total of around 25,000 to 30,000 tasks per day.

Environment Size

We utilize a cluster of type Environment size Medium in composer in you can choose between small, medium and big and, at the moment, we do not face any performance issues.

Schedulers

We are using 2 schedulers with 5 CPUs and 5 Gb of RAM each (these two values must currently be equal in Composer) and 2 GB of storage per scheduler. It is recommended to always leave 20/30% of RAM and CPU free for the scheduler and leaving a one cpu for every parsing_processes. Since implementing two schedulers a few years ago, we have observed significant improvements in parsing times and performance, as well as high availability.

Workers

In the Composer systems, the workers lack an intelligent autoscaling system. In the past, we encountered problems with tasks consuming too much RAM, resulting in kills due to memory exhaustion. This issue occurs mainly with operators executing Docker images. Most tasks have a very low resource usage, while a few require significant amounts of RAM and CPU.

After analysing our workload, we decided to configure each worker with 4 CPUs and 20GB of RAM. These values were derived based on our specific load, but every company should determine appropriate values based on their applications. A new worker is started when there are more than 15 tasks running, as we have set worker_concurrency=15.

There are scenarios where we are up and running with 2 or 3 nodes, but resources are under-utilised because tasks delegate computation to the data warehouse. Conversely, in some cases, a single task may use all the available resources. With the settings mentioned above, we have found a satisfactory compromise.

Other important parameters we have configured include:

  • dag_file_processor_timeout=180 (how long aDagFileProcessor, which processes a DAG file, can run before timing out).
  • dagbag_import_timeout=120 (how long thedagbagcan import DAG objects before timing out in seconds, which must be lower than the value set fordag_file_processor_timeout)
  • dags_are_paused_at_creation=True (really important in production).
  • max_active_runs_per_dag=1 (to prevent having two instances running simultaneously).
  • max_active_tasks_per_dag=6 (to prevent a single DAG from saturating the available tasks for the entire department).
  • dag_dir_list_interval=120 (the frequency that the DAGs directory is scanned for new files)
  • min_file_process_interval=120 (the frequency that each DAG file is parsed)
  • parsing_processes=3 (how many processes the scheduler can run in parallel to parse DAGs. Astronomer recommends setting a value that is twice your available vCPUs. Each process takes a CPU, and we leave 2 CPUs free for the scheduler).
  • instance_name and navbar_color are essential if you use more than one environment.

Snapshots

We execute snapshots every morning at 7:00 AM. The process takes about 45 minutes, but we know that any errors, such as unintended destroy operations on the cluster using Terraform (we manage Composer with this tool), can be resolved within 2 hours by re-running the Terraform create a process for Composer and reloading the snapshot.

Costs

Below are the monthly costs for the configuration described above:

  • Cloud Composer (entire backend on Kubernetes): between 40€ and 45€ per day
  • Cloud Storage: around 30€ per day. This cost is not primarily influenced by the GBs stored daily but by the Total read/list/get request count and Total write request count. Currently, we have approximately 70/s list object requests and 30/s get object requests. Since these are Class A Operations on Cloud Storage, these costs arise.
  • Cloud Logging: between 5€ and 7€ per day
  • Cloud Monitoring: 1.5€ per day
  • Networking: 2€ per day (this obviously depends on the company)
  • Secret Manager: 0.3€ per day (we use it extensively to save service accounts or any password where we don’t use Airflow connections)

Total daily cost: approximately 80€ per day

Conclusion

We are very happy using Composer/Airflow, and we will soon migrate to Composer 3 on GCP. In this article, I’ve focused on all the customisation we did in Airflow and not on all the stuff which is already working properly. I hope this article has helped you compare your work with what we do at lastminute.com.


About alberto crespi

alberto_crespi
Data Architect

Alberto is a Data Architect 🏛️ at lastminute.com, having joined the Data and Analytics Team in 2020. He is focused on Data-Driven approach 📊, AI🤖 and new technology in the modern data stack.


Read next

React Universe 2024

React Universe 2024

fabrizio_duroni
fabrizio duroni
sam_campisi
sam campisi

Let's dive into the talks from React Universe 2024 that stood out to us the most and share the key insights we gained. From innovative debugging tools to cross-platform development strategies, we’ll walk you through what we found valuable and how it’s shaping our approach to React and React Native development. [...]

Tech Radar As a Collaboration Tool

Tech Radar As a Collaboration Tool

rabbani_kajamohideen
rabbani kajamohideen

A tech radar is a visual and strategic tool used by organizations to assess and communicate the status and future direction of various technologies, frameworks, tools, and platforms. [...]