Skip to content
Vladimir Chavkov
Go back

Python Data Engineering: Building Production Pipelines with Apache Airflow and dbt

Edit page

Python Data Engineering: Building Production Pipelines with Apache Airflow and dbt

Modern data engineering requires building reliable, scalable pipelines that transform raw data into actionable insights. Python, combined with Apache Airflow for orchestration and dbt for transformations, provides a powerful stack for production data pipelines. This guide covers architecture patterns, best practices, and real-world implementations.

Modern Data Stack

Components

Data Sources → Ingestion → Storage → Transform → Analytics → BI Tools
│ │ │ │ │ │
│ │ │ │ │ │
APIs Fivetran S3/GCS dbt BigQuery Looker
DBs Airbyte Snowflake Snowflake Tableau
SaaS Custom Redshift Metabase

Python’s Role

  1. Orchestration: Apache Airflow for workflow management
  2. Ingestion: Custom extractors, API clients
  3. Transformation: Pandas, Polars for data manipulation
  4. Quality: Great Expectations for data validation
  5. Monitoring: Custom metrics and alerting

Apache Airflow Fundamentals

Installation and Setup

Terminal window
# Install Airflow with extras
pip install "apache-airflow[postgres,celery,redis,amazon,google,snowflake]==2.8.0"
# Initialize database
airflow db init
# Create admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@company.com
# Start webserver and scheduler
airflow webserver -p 8080 &
airflow scheduler &

DAG Structure and Best Practices

dags/daily_analytics_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.utils.task_group import TaskGroup
# Default arguments
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['data-team@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
}
# DAG definition
with DAG(
'daily_analytics_pipeline',
default_args=default_args,
description='Daily analytics data pipeline',
schedule='0 2 * * *', # Run at 2 AM UTC daily
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=['analytics', 'daily'],
) as dag:
# Task 1: Extract data from source
def extract_from_api(**context):
"""Extract data from REST API"""
import requests
import pandas as pd
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# Execution date
execution_date = context['ds']
# Fetch data from API
response = requests.get(
f'https://api.company.com/events',
params={'date': execution_date},
headers={'Authorization': f'Bearer {Variable.get("api_key")}'}
)
response.raise_for_status()
# Convert to DataFrame
df = pd.DataFrame(response.json())
# Data quality checks
assert len(df) > 0, "No data returned from API"
assert df['event_id'].is_unique, "Duplicate event IDs found"
# Save to S3
s3_hook = S3Hook()
s3_key = f'raw/events/{execution_date}/data.parquet'
s3_hook.load_bytes(
df.to_parquet(),
key=s3_key,
bucket_name='company-data-lake',
replace=True
)
return s3_key
extract_task = PythonOperator(
task_id='extract_from_api',
python_callable=extract_from_api,
provide_context=True,
)
# Task 2: Load to staging
load_to_staging = S3ToRedshiftOperator(
task_id='load_to_staging',
s3_bucket='company-data-lake',
s3_key='raw/events/{{ ds }}/data.parquet',
schema='staging',
table='events',
copy_options=['FORMAT AS PARQUET'],
redshift_conn_id='redshift_default',
)
# Task 3: Run dbt transformations
with TaskGroup('dbt_transformations') as dbt_group:
dbt_run = DbtCloudRunJobOperator(
task_id='dbt_run',
job_id=12345,
check_interval=30,
timeout=3600,
)
dbt_test = DbtCloudRunJobOperator(
task_id='dbt_test',
job_id=12346,
check_interval=30,
timeout=1800,
)
dbt_run >> dbt_test
# Task 4: Data quality checks
def validate_data_quality(**context):
"""Validate transformed data"""
from great_expectations_provider.operators.great_expectations import (
GreatExpectationsOperator
)
# Run Great Expectations validation
# Implementation depends on GE configuration
quality_check = PythonOperator(
task_id='validate_data_quality',
python_callable=validate_data_quality,
)
# Task 5: Update analytics tables
update_analytics = PostgresOperator(
task_id='update_analytics',
postgres_conn_id='analytics_db',
sql="""
INSERT INTO analytics.daily_metrics (date, metric_name, value)
SELECT
'{{ ds }}'::date,
'events_processed',
COUNT(*)
FROM staging.events
WHERE event_date = '{{ ds }}'::date;
REFRESH MATERIALIZED VIEW CONCURRENTLY analytics.user_engagement;
"""
)
# Task dependencies
extract_task >> load_to_staging >> dbt_group >> quality_check >> update_analytics

Advanced DAG Patterns

1. Dynamic Task Generation

from airflow import DAG
from airflow.operators.python import PythonOperator
def process_table(table_name, **context):
"""Process a single table"""
print(f"Processing {table_name}")
# Your processing logic
with DAG('dynamic_tables', ...) as dag:
tables = ['users', 'orders', 'products', 'events']
for table in tables:
task = PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
op_kwargs={'table_name': table},
)

2. Branching Based on Data

from airflow.operators.python import BranchPythonOperator
def check_data_volume(**context):
"""Branch based on data volume"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook()
row_count = pg_hook.get_first(
"SELECT COUNT(*) FROM staging.events WHERE date = %(date)s",
parameters={'date': context['ds']}
)[0]
if row_count > 1000000:
return 'process_large_batch'
else:
return 'process_small_batch'
branching = BranchPythonOperator(
task_id='check_volume',
python_callable=check_data_volume,
)
large_batch = PythonOperator(
task_id='process_large_batch',
python_callable=process_large,
)
small_batch = PythonOperator(
task_id='process_small_batch',
python_callable=process_small,
)
branching >> [large_batch, small_batch]

3. Cross-DAG Dependencies

from airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_data_ingestion',
external_dag_id='data_ingestion_dag',
external_task_id='final_task',
timeout=3600,
mode='reschedule',
)

dbt (Data Build Tool)

Project Structure

dbt_project/
├── dbt_project.yml
├── models/
│ ├── staging/
│ │ ├── _staging.yml
│ │ ├── stg_events.sql
│ │ └── stg_users.sql
│ ├── intermediate/
│ │ ├── int_user_sessions.sql
│ │ └── int_event_aggregates.sql
│ └── marts/
│ ├── analytics/
│ │ ├── fct_events.sql
│ │ └── dim_users.sql
│ └── marketing/
│ └── user_attribution.sql
├── macros/
│ └── custom_functions.sql
├── tests/
│ └── data_quality.sql
└── seeds/
└── country_codes.csv

dbt Model Examples

Staging Layer (1:1 with source):

-- models/staging/stg_events.sql
{{
config(
materialized='view',
tags=['staging']
)
}}
with source as (
select * from {{ source('raw', 'events') }}
where event_date >= current_date - interval '90 days'
),
renamed as (
select
event_id,
user_id,
event_name,
event_timestamp,
parse_json(properties) as properties,
-- Standardize timestamps
convert_timezone('UTC', event_timestamp) as event_timestamp_utc,
-- Extract common properties
properties:page_url::string as page_url,
properties:referrer::string as referrer,
-- Metadata
_loaded_at
from source
)
select * from renamed

Intermediate Layer (business logic):

-- models/intermediate/int_user_sessions.sql
{{
config(
materialized='ephemeral' -- Not materialized, used as CTE
)
}}
with events as (
select * from {{ ref('stg_events') }}
),
sessionized as (
select
user_id,
event_timestamp_utc,
event_name,
-- Session identification (30-minute timeout)
sum(case
when timestampdiff(
minute,
lag(event_timestamp_utc) over (partition by user_id order by event_timestamp_utc),
event_timestamp_utc
) > 30 then 1
else 0
end) over (partition by user_id order by event_timestamp_utc) as session_number
from events
),
session_metrics as (
select
user_id,
session_number,
min(event_timestamp_utc) as session_start,
max(event_timestamp_utc) as session_end,
count(*) as event_count,
count(distinct event_name) as unique_events,
-- Session duration in seconds
timestampdiff(second, min(event_timestamp_utc), max(event_timestamp_utc)) as session_duration_seconds
from sessionized
group by user_id, session_number
)
select * from session_metrics

Marts Layer (analytics-ready):

-- models/marts/analytics/fct_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
partition_by={
'field': 'event_date',
'data_type': 'date'
},
cluster_by=['user_id', 'event_name']
)
}}
with events as (
select * from {{ ref('stg_events') }}
{% if is_incremental() %}
-- Only process new data
where event_timestamp_utc > (select max(event_timestamp_utc) from {{ this }})
{% endif %}
),
users as (
select * from {{ ref('dim_users') }}
),
enriched as (
select
e.event_id,
e.event_timestamp_utc,
date(e.event_timestamp_utc) as event_date,
e.user_id,
e.event_name,
-- User attributes
u.user_tier,
u.signup_date,
u.country,
-- Event properties
e.page_url,
e.referrer,
-- Derived metrics
case
when e.event_name in ('purchase', 'upgrade')
then (e.properties:amount)::decimal(10,2)
else 0
end as revenue
from events e
left join users u on e.user_id = u.user_id
)
select * from enriched

dbt Tests and Documentation

models/staging/_staging.yml
version: 2
models:
- name: stg_events
description: "Staging table for raw events"
columns:
- name: event_id
description: "Unique event identifier"
tests:
- unique
- not_null
- name: user_id
description: "User who triggered the event"
tests:
- not_null
- relationships:
to: ref('stg_users')
field: user_id
- name: event_timestamp_utc
description: "Event timestamp in UTC"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= '2020-01-01'"
- name: event_name
description: "Type of event"
tests:
- accepted_values:
values: ['page_view', 'click', 'purchase', 'signup']
sources:
- name: raw
database: raw_data
schema: events
tables:
- name: events
description: "Raw event data from application"
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: _loaded_at

Custom dbt Macros

-- macros/custom_functions.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' and custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
{% macro days_since(date_column) %}
datediff(day, {{ date_column }}, current_date)
{% endmacro %}

Data Quality with Great Expectations

great_expectations/expectations/events_suite.py
import great_expectations as ge
# Load data
df = ge.read_csv('data/events.csv')
# Define expectations
df.expect_table_row_count_to_be_between(min_value=1000, max_value=100000)
df.expect_column_values_to_not_be_null('event_id')
df.expect_column_values_to_be_unique('event_id')
df.expect_column_values_to_be_in_set(
'event_name',
['page_view', 'click', 'purchase', 'signup']
)
df.expect_column_values_to_be_between(
'event_timestamp',
min_value='2024-01-01',
max_value='2024-12-31'
)
# Save suite
df.save_expectation_suite('events_suite.json')

Airflow Integration:

from great_expectations_provider.operators.great_expectations import (
GreatExpectationsOperator
)
validate_events = GreatExpectationsOperator(
task_id='validate_events',
expectation_suite_name='events_suite',
batch_kwargs={
'path': 's3://bucket/data/events.parquet',
'datasource': 'my_datasource'
},
data_context_root_dir='/path/to/great_expectations',
)

Efficient Data Processing with Polars

import polars as pl
# Read large parquet file efficiently
df = pl.read_parquet('data/events.parquet')
# Lazy evaluation (builds query plan)
lazy_df = (
pl.scan_parquet('data/events.parquet')
.filter(pl.col('event_date') >= '2024-01-01')
.group_by(['user_id', 'event_name'])
.agg([
pl.count().alias('event_count'),
pl.col('revenue').sum().alias('total_revenue')
])
.filter(pl.col('event_count') > 5)
.sort('total_revenue', descending=True)
)
# Execute query (only when needed)
result = lazy_df.collect()
# Write results
result.write_parquet('output/user_metrics.parquet')

Polars vs Pandas Performance:

import time
import pandas as pd
import polars as pl
# Large dataset
n_rows = 10_000_000
# Pandas
start = time.time()
df_pandas = pd.DataFrame({
'user_id': range(n_rows),
'value': range(n_rows)
})
result_pandas = df_pandas.groupby('user_id').sum()
print(f"Pandas: {time.time() - start:.2f}s")
# Polars
start = time.time()
df_polars = pl.DataFrame({
'user_id': range(n_rows),
'value': range(n_rows)
})
result_polars = df_polars.groupby('user_id').sum()
print(f"Polars: {time.time() - start:.2f}s")
# Typically 5-10x faster

Incremental Processing Patterns

1. Watermark-Based

def process_incremental(**context):
"""Process data since last successful run"""
from airflow.models import Variable
# Get last watermark
last_watermark = Variable.get(
'events_last_processed_timestamp',
default_var='2024-01-01 00:00:00'
)
# Query new data
query = f"""
SELECT *
FROM events
WHERE event_timestamp > '{last_watermark}'
ORDER BY event_timestamp
"""
# Process data
# ...
# Update watermark
new_watermark = max(df['event_timestamp'])
Variable.set('events_last_processed_timestamp', new_watermark)

2. Date Partition

def process_partition(**context):
"""Process specific date partition"""
execution_date = context['ds']
query = f"""
SELECT *
FROM events
WHERE DATE(event_timestamp) = '{execution_date}'
"""
# Process partition
# ...

Monitoring and Alerting

from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def check_data_quality(**context):
"""Check data quality metrics"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook()
# Check row count
row_count = pg_hook.get_first(
"SELECT COUNT(*) FROM staging.events WHERE date = %(date)s",
parameters={'date': context['ds']}
)[0]
# Check for anomalies
avg_count = pg_hook.get_first(
"SELECT AVG(count) FROM daily_row_counts WHERE date >= current_date - 7"
)[0]
if row_count < avg_count * 0.5:
raise ValueError(f"Row count {row_count} is 50% below average {avg_count}")
return row_count
check_quality = PythonOperator(
task_id='check_quality',
python_callable=check_data_quality,
)
alert_on_failure = SlackWebhookOperator(
task_id='alert_failure',
slack_webhook_conn_id='slack_webhook',
message='Data pipeline failed: {{ task_instance.task_id }}',
channel='#data-alerts',
trigger_rule='one_failed',
)
check_quality >> alert_on_failure

Production Best Practices

1. Idempotency

def upsert_data(**context):
"""Idempotent data insertion"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook()
# Use MERGE/UPSERT instead of INSERT
sql = """
MERGE INTO target t
USING source s ON t.id = s.id
WHEN MATCHED THEN
UPDATE SET value = s.value, updated_at = current_timestamp
WHEN NOT MATCHED THEN
INSERT (id, value, created_at) VALUES (s.id, s.value, current_timestamp)
"""
pg_hook.run(sql)

2. Connection Pooling

airflow.cfg
[core]
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 3600

3. Resource Management

from airflow.models import Pool
# Create resource pools
# Via CLI or UI
# airflow pools set postgres_pool 5 "Postgres connection pool"
task_with_pool = PythonOperator(
task_id='heavy_task',
python_callable=process_data,
pool='postgres_pool', # Limit concurrent tasks
priority_weight=10, # Higher priority
)

Production Checklist

Conclusion

Building production data pipelines requires combining orchestration (Airflow), transformation (dbt), and quality checks (Great Expectations) into a cohesive system. Focus on idempotency, incremental processing, and comprehensive monitoring to build reliable pipelines.

Start with simple DAGs, add dbt for SQL transformations, and gradually incorporate advanced features like branching, cross-DAG dependencies, and automated data quality checks.


Ready to build data pipelines? Our Python Data Engineering training covers Airflow, dbt, data quality, and production best practices with real-world projects. Explore Python training or contact us for data engineering expertise.


Edit page
Share this post on:

Previous Post
Production Observability with Prometheus and Grafana: Complete Guide
Next Post
Building a RAG Application with Azure OpenAI and AI Search