Python Data Engineering: Building Production Pipelines with Apache Airflow and dbt
Modern data engineering requires building reliable, scalable pipelines that transform raw data into actionable insights. Python, combined with Apache Airflow for orchestration and dbt for transformations, provides a powerful stack for production data pipelines. This guide covers architecture patterns, best practices, and real-world implementations.
Modern Data Stack
Components
Data Sources → Ingestion → Storage → Transform → Analytics → BI Tools │ │ │ │ │ │ │ │ │ │ │ │ APIs Fivetran S3/GCS dbt BigQuery Looker DBs Airbyte Snowflake Snowflake Tableau SaaS Custom Redshift MetabasePython’s Role
- Orchestration: Apache Airflow for workflow management
- Ingestion: Custom extractors, API clients
- Transformation: Pandas, Polars for data manipulation
- Quality: Great Expectations for data validation
- Monitoring: Custom metrics and alerting
Apache Airflow Fundamentals
Installation and Setup
# Install Airflow with extraspip install "apache-airflow[postgres,celery,redis,amazon,google,snowflake]==2.8.0"
# Initialize databaseairflow db init
# Create admin userairflow users create \ --username admin \ --firstname Admin \ --lastname User \ --role Admin \ --email admin@company.com
# Start webserver and schedulerairflow webserver -p 8080 &airflow scheduler &DAG Structure and Best Practices
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.postgres.operators.postgres import PostgresOperatorfrom airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperatorfrom airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperatorfrom airflow.utils.task_group import TaskGroup
# Default argumentsdefault_args = { 'owner': 'data-team', 'depends_on_past': False, 'email': ['data-team@company.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'max_retry_delay': timedelta(minutes=30),}
# DAG definitionwith DAG( 'daily_analytics_pipeline', default_args=default_args, description='Daily analytics data pipeline', schedule='0 2 * * *', # Run at 2 AM UTC daily start_date=datetime(2024, 1, 1), catchup=False, max_active_runs=1, tags=['analytics', 'daily'],) as dag:
# Task 1: Extract data from source def extract_from_api(**context): """Extract data from REST API""" import requests import pandas as pd from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# Execution date execution_date = context['ds']
# Fetch data from API response = requests.get( f'https://api.company.com/events', params={'date': execution_date}, headers={'Authorization': f'Bearer {Variable.get("api_key")}'} ) response.raise_for_status()
# Convert to DataFrame df = pd.DataFrame(response.json())
# Data quality checks assert len(df) > 0, "No data returned from API" assert df['event_id'].is_unique, "Duplicate event IDs found"
# Save to S3 s3_hook = S3Hook() s3_key = f'raw/events/{execution_date}/data.parquet' s3_hook.load_bytes( df.to_parquet(), key=s3_key, bucket_name='company-data-lake', replace=True )
return s3_key
extract_task = PythonOperator( task_id='extract_from_api', python_callable=extract_from_api, provide_context=True, )
# Task 2: Load to staging load_to_staging = S3ToRedshiftOperator( task_id='load_to_staging', s3_bucket='company-data-lake', s3_key='raw/events/{{ ds }}/data.parquet', schema='staging', table='events', copy_options=['FORMAT AS PARQUET'], redshift_conn_id='redshift_default', )
# Task 3: Run dbt transformations with TaskGroup('dbt_transformations') as dbt_group: dbt_run = DbtCloudRunJobOperator( task_id='dbt_run', job_id=12345, check_interval=30, timeout=3600, )
dbt_test = DbtCloudRunJobOperator( task_id='dbt_test', job_id=12346, check_interval=30, timeout=1800, )
dbt_run >> dbt_test
# Task 4: Data quality checks def validate_data_quality(**context): """Validate transformed data""" from great_expectations_provider.operators.great_expectations import ( GreatExpectationsOperator )
# Run Great Expectations validation # Implementation depends on GE configuration
quality_check = PythonOperator( task_id='validate_data_quality', python_callable=validate_data_quality, )
# Task 5: Update analytics tables update_analytics = PostgresOperator( task_id='update_analytics', postgres_conn_id='analytics_db', sql=""" INSERT INTO analytics.daily_metrics (date, metric_name, value) SELECT '{{ ds }}'::date, 'events_processed', COUNT(*) FROM staging.events WHERE event_date = '{{ ds }}'::date;
REFRESH MATERIALIZED VIEW CONCURRENTLY analytics.user_engagement; """ )
# Task dependencies extract_task >> load_to_staging >> dbt_group >> quality_check >> update_analyticsAdvanced DAG Patterns
1. Dynamic Task Generation
from airflow import DAGfrom airflow.operators.python import PythonOperator
def process_table(table_name, **context): """Process a single table""" print(f"Processing {table_name}") # Your processing logic
with DAG('dynamic_tables', ...) as dag: tables = ['users', 'orders', 'products', 'events']
for table in tables: task = PythonOperator( task_id=f'process_{table}', python_callable=process_table, op_kwargs={'table_name': table}, )2. Branching Based on Data
from airflow.operators.python import BranchPythonOperator
def check_data_volume(**context): """Branch based on data volume""" from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook() row_count = pg_hook.get_first( "SELECT COUNT(*) FROM staging.events WHERE date = %(date)s", parameters={'date': context['ds']} )[0]
if row_count > 1000000: return 'process_large_batch' else: return 'process_small_batch'
branching = BranchPythonOperator( task_id='check_volume', python_callable=check_data_volume,)
large_batch = PythonOperator( task_id='process_large_batch', python_callable=process_large,)
small_batch = PythonOperator( task_id='process_small_batch', python_callable=process_small,)
branching >> [large_batch, small_batch]3. Cross-DAG Dependencies
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream = ExternalTaskSensor( task_id='wait_for_data_ingestion', external_dag_id='data_ingestion_dag', external_task_id='final_task', timeout=3600, mode='reschedule',)dbt (Data Build Tool)
Project Structure
dbt_project/├── dbt_project.yml├── models/│ ├── staging/│ │ ├── _staging.yml│ │ ├── stg_events.sql│ │ └── stg_users.sql│ ├── intermediate/│ │ ├── int_user_sessions.sql│ │ └── int_event_aggregates.sql│ └── marts/│ ├── analytics/│ │ ├── fct_events.sql│ │ └── dim_users.sql│ └── marketing/│ └── user_attribution.sql├── macros/│ └── custom_functions.sql├── tests/│ └── data_quality.sql└── seeds/ └── country_codes.csvdbt Model Examples
Staging Layer (1:1 with source):
-- models/staging/stg_events.sql{{ config( materialized='view', tags=['staging'] )}}
with source as ( select * from {{ source('raw', 'events') }} where event_date >= current_date - interval '90 days'),
renamed as ( select event_id, user_id, event_name, event_timestamp, parse_json(properties) as properties,
-- Standardize timestamps convert_timezone('UTC', event_timestamp) as event_timestamp_utc,
-- Extract common properties properties:page_url::string as page_url, properties:referrer::string as referrer,
-- Metadata _loaded_at
from source)
select * from renamedIntermediate Layer (business logic):
-- models/intermediate/int_user_sessions.sql{{ config( materialized='ephemeral' -- Not materialized, used as CTE )}}
with events as ( select * from {{ ref('stg_events') }}),
sessionized as ( select user_id, event_timestamp_utc, event_name,
-- Session identification (30-minute timeout) sum(case when timestampdiff( minute, lag(event_timestamp_utc) over (partition by user_id order by event_timestamp_utc), event_timestamp_utc ) > 30 then 1 else 0 end) over (partition by user_id order by event_timestamp_utc) as session_number
from events),
session_metrics as ( select user_id, session_number, min(event_timestamp_utc) as session_start, max(event_timestamp_utc) as session_end, count(*) as event_count, count(distinct event_name) as unique_events,
-- Session duration in seconds timestampdiff(second, min(event_timestamp_utc), max(event_timestamp_utc)) as session_duration_seconds
from sessionized group by user_id, session_number)
select * from session_metricsMarts Layer (analytics-ready):
-- models/marts/analytics/fct_events.sql{{ config( materialized='incremental', unique_key='event_id', partition_by={ 'field': 'event_date', 'data_type': 'date' }, cluster_by=['user_id', 'event_name'] )}}
with events as ( select * from {{ ref('stg_events') }}
{% if is_incremental() %} -- Only process new data where event_timestamp_utc > (select max(event_timestamp_utc) from {{ this }}) {% endif %}),
users as ( select * from {{ ref('dim_users') }}),
enriched as ( select e.event_id, e.event_timestamp_utc, date(e.event_timestamp_utc) as event_date, e.user_id, e.event_name,
-- User attributes u.user_tier, u.signup_date, u.country,
-- Event properties e.page_url, e.referrer,
-- Derived metrics case when e.event_name in ('purchase', 'upgrade') then (e.properties:amount)::decimal(10,2) else 0 end as revenue
from events e left join users u on e.user_id = u.user_id)
select * from enricheddbt Tests and Documentation
version: 2
models: - name: stg_events description: "Staging table for raw events" columns: - name: event_id description: "Unique event identifier" tests: - unique - not_null
- name: user_id description: "User who triggered the event" tests: - not_null - relationships: to: ref('stg_users') field: user_id
- name: event_timestamp_utc description: "Event timestamp in UTC" tests: - not_null - dbt_utils.expression_is_true: expression: ">= '2020-01-01'"
- name: event_name description: "Type of event" tests: - accepted_values: values: ['page_view', 'click', 'purchase', 'signup']
sources: - name: raw database: raw_data schema: events tables: - name: events description: "Raw event data from application" freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} loaded_at_field: _loaded_atCustom dbt Macros
-- macros/custom_functions.sql
{% macro cents_to_dollars(column_name, precision=2) %} round({{ column_name }} / 100.0, {{ precision }}){% endmacro %}
{% macro generate_schema_name(custom_schema_name, node) -%} {%- set default_schema = target.schema -%} {%- if target.name == 'prod' and custom_schema_name is not none -%} {{ custom_schema_name | trim }} {%- else -%} {{ default_schema }}_{{ custom_schema_name | trim }} {%- endif -%}{%- endmacro %}
{% macro days_since(date_column) %} datediff(day, {{ date_column }}, current_date){% endmacro %}Data Quality with Great Expectations
import great_expectations as ge
# Load datadf = ge.read_csv('data/events.csv')
# Define expectationsdf.expect_table_row_count_to_be_between(min_value=1000, max_value=100000)df.expect_column_values_to_not_be_null('event_id')df.expect_column_values_to_be_unique('event_id')df.expect_column_values_to_be_in_set( 'event_name', ['page_view', 'click', 'purchase', 'signup'])df.expect_column_values_to_be_between( 'event_timestamp', min_value='2024-01-01', max_value='2024-12-31')
# Save suitedf.save_expectation_suite('events_suite.json')Airflow Integration:
from great_expectations_provider.operators.great_expectations import ( GreatExpectationsOperator)
validate_events = GreatExpectationsOperator( task_id='validate_events', expectation_suite_name='events_suite', batch_kwargs={ 'path': 's3://bucket/data/events.parquet', 'datasource': 'my_datasource' }, data_context_root_dir='/path/to/great_expectations',)Efficient Data Processing with Polars
import polars as pl
# Read large parquet file efficientlydf = pl.read_parquet('data/events.parquet')
# Lazy evaluation (builds query plan)lazy_df = ( pl.scan_parquet('data/events.parquet') .filter(pl.col('event_date') >= '2024-01-01') .group_by(['user_id', 'event_name']) .agg([ pl.count().alias('event_count'), pl.col('revenue').sum().alias('total_revenue') ]) .filter(pl.col('event_count') > 5) .sort('total_revenue', descending=True))
# Execute query (only when needed)result = lazy_df.collect()
# Write resultsresult.write_parquet('output/user_metrics.parquet')Polars vs Pandas Performance:
import timeimport pandas as pdimport polars as pl
# Large datasetn_rows = 10_000_000
# Pandasstart = time.time()df_pandas = pd.DataFrame({ 'user_id': range(n_rows), 'value': range(n_rows)})result_pandas = df_pandas.groupby('user_id').sum()print(f"Pandas: {time.time() - start:.2f}s")
# Polarsstart = time.time()df_polars = pl.DataFrame({ 'user_id': range(n_rows), 'value': range(n_rows)})result_polars = df_polars.groupby('user_id').sum()print(f"Polars: {time.time() - start:.2f}s")# Typically 5-10x fasterIncremental Processing Patterns
1. Watermark-Based
def process_incremental(**context): """Process data since last successful run""" from airflow.models import Variable
# Get last watermark last_watermark = Variable.get( 'events_last_processed_timestamp', default_var='2024-01-01 00:00:00' )
# Query new data query = f""" SELECT * FROM events WHERE event_timestamp > '{last_watermark}' ORDER BY event_timestamp """
# Process data # ...
# Update watermark new_watermark = max(df['event_timestamp']) Variable.set('events_last_processed_timestamp', new_watermark)2. Date Partition
def process_partition(**context): """Process specific date partition""" execution_date = context['ds']
query = f""" SELECT * FROM events WHERE DATE(event_timestamp) = '{execution_date}' """
# Process partition # ...Monitoring and Alerting
from airflow.operators.python import PythonOperatorfrom airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def check_data_quality(**context): """Check data quality metrics""" from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook()
# Check row count row_count = pg_hook.get_first( "SELECT COUNT(*) FROM staging.events WHERE date = %(date)s", parameters={'date': context['ds']} )[0]
# Check for anomalies avg_count = pg_hook.get_first( "SELECT AVG(count) FROM daily_row_counts WHERE date >= current_date - 7" )[0]
if row_count < avg_count * 0.5: raise ValueError(f"Row count {row_count} is 50% below average {avg_count}")
return row_count
check_quality = PythonOperator( task_id='check_quality', python_callable=check_data_quality,)
alert_on_failure = SlackWebhookOperator( task_id='alert_failure', slack_webhook_conn_id='slack_webhook', message='Data pipeline failed: {{ task_instance.task_id }}', channel='#data-alerts', trigger_rule='one_failed',)
check_quality >> alert_on_failureProduction Best Practices
1. Idempotency
def upsert_data(**context): """Idempotent data insertion""" from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook()
# Use MERGE/UPSERT instead of INSERT sql = """ MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET value = s.value, updated_at = current_timestamp WHEN NOT MATCHED THEN INSERT (id, value, created_at) VALUES (s.id, s.value, current_timestamp) """
pg_hook.run(sql)2. Connection Pooling
[core]sql_alchemy_pool_enabled = Truesql_alchemy_pool_size = 5sql_alchemy_max_overflow = 10sql_alchemy_pool_recycle = 36003. Resource Management
from airflow.models import Pool
# Create resource pools# Via CLI or UI# airflow pools set postgres_pool 5 "Postgres connection pool"
task_with_pool = PythonOperator( task_id='heavy_task', python_callable=process_data, pool='postgres_pool', # Limit concurrent tasks priority_weight=10, # Higher priority)Production Checklist
- Airflow deployed on Kubernetes/Celery executor
- Database backend (Postgres) configured
- Secret management (Vault/AWS Secrets Manager)
- Connection pooling enabled
- Data quality checks implemented
- Idempotent pipeline operations
- Monitoring and alerting configured
- dbt tests covering critical models
- Incremental processing for large datasets
- Proper retry strategies
- Documentation for all pipelines
- Cost monitoring enabled
Conclusion
Building production data pipelines requires combining orchestration (Airflow), transformation (dbt), and quality checks (Great Expectations) into a cohesive system. Focus on idempotency, incremental processing, and comprehensive monitoring to build reliable pipelines.
Start with simple DAGs, add dbt for SQL transformations, and gradually incorporate advanced features like branching, cross-DAG dependencies, and automated data quality checks.
Ready to build data pipelines? Our Python Data Engineering training covers Airflow, dbt, data quality, and production best practices with real-world projects. Explore Python training or contact us for data engineering expertise.