Skip to content
Vladimir Chavkov
Go back

BigQuery for Data Engineering: Advanced Analytics and Performance Optimization

Edit page

BigQuery for Data Engineering: Advanced Analytics and Performance Optimization

Google BigQuery is a serverless, highly scalable data warehouse that enables super-fast SQL queries using the processing power of Google’s infrastructure. This comprehensive guide covers advanced BigQuery patterns, performance optimization, cost management, and integration with modern data engineering workflows.

Why BigQuery?

Key Advantages

  1. Serverless Architecture: No infrastructure to manage
  2. Petabyte Scale: Query massive datasets in seconds
  3. Separation of Storage and Compute: Pay for what you use
  4. Real-Time Analytics: Stream data and query immediately
  5. Machine Learning Integration: Built-in ML with BigQuery ML
  6. Standard SQL: Familiar ANSI SQL with extensions

BigQuery vs. Traditional Data Warehouses

FeatureBigQueryTraditional DW
ScalingAutomatic, unlimitedManual, hardware-limited
PricingPay-per-query or flat-rateFixed infrastructure cost
MaintenanceFully managedRequires DBA team
Query SpeedMassively parallelLimited by hardware
Setup TimeMinutesWeeks/months

Architecture and Data Organization

1. Project, Dataset, and Table Hierarchy

GCP Project (my-analytics-project)
└── Dataset (analytics)
├── Table (events)
├── Table (users)
└── View (active_users)

2. Table Types

Standard Tables - Regular tables with data storage:

-- Create partitioned table
CREATE TABLE `analytics.events`
(
event_id STRING,
user_id STRING,
event_name STRING,
event_timestamp TIMESTAMP,
properties JSON
)
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_id, event_name
OPTIONS(
description="User events partitioned by day",
require_partition_filter=true
);

External Tables - Query data in Cloud Storage without loading:

-- Create external table from Cloud Storage
CREATE EXTERNAL TABLE `analytics.raw_logs`
OPTIONS (
format = 'JSON',
uris = ['gs://my-bucket/logs/*.json'],
max_bad_records = 100
);

Views - Virtual tables from queries:

-- Create view
CREATE VIEW `analytics.active_users` AS
SELECT
user_id,
COUNT(*) as event_count,
MAX(event_timestamp) as last_seen
FROM `analytics.events`
WHERE event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY user_id
HAVING event_count >= 5;

Materialized Views - Pre-computed results for performance:

-- Create materialized view (auto-refreshing)
CREATE MATERIALIZED VIEW `analytics.daily_revenue` AS
SELECT
DATE(order_timestamp) as date,
SUM(amount) as revenue,
COUNT(DISTINCT user_id) as unique_customers
FROM `analytics.orders`
GROUP BY date;

3. Partitioning Strategies

Time-based Partitioning (Recommended for event data):

-- Ingestion-time partitioned table
CREATE TABLE `analytics.logs`
(
log_message STRING,
severity STRING
)
PARTITION BY _PARTITIONDATE
OPTIONS(
partition_expiration_days=90,
require_partition_filter=true
);
-- Column-based partitioning (more flexible)
CREATE TABLE `analytics.orders`
(
order_id STRING,
customer_id STRING,
order_date DATE,
amount NUMERIC
)
PARTITION BY order_date
CLUSTER BY customer_id;

Integer Range Partitioning:

-- Partition by customer_id ranges
CREATE TABLE `analytics.customer_data`
(
customer_id INT64,
name STRING,
total_spend NUMERIC
)
PARTITION BY RANGE_BUCKET(customer_id, GENERATE_ARRAY(0, 1000000, 10000))
CLUSTER BY name;

4. Clustering for Performance

-- Multi-column clustering
CREATE TABLE `analytics.ecommerce_events`
(
event_timestamp TIMESTAMP,
user_id STRING,
event_type STRING,
product_id STRING,
revenue NUMERIC
)
PARTITION BY DATE(event_timestamp)
CLUSTER BY event_type, product_id, user_id
OPTIONS(
description="Clustered by common filter columns"
);

Clustering Best Practices:

Advanced SQL Patterns

1. Window Functions

-- Calculate running totals and moving averages
WITH daily_revenue AS (
SELECT
DATE(order_timestamp) as date,
SUM(amount) as revenue
FROM `analytics.orders`
WHERE order_timestamp >= '2024-01-01'
GROUP BY date
)
SELECT
date,
revenue,
-- Running total
SUM(revenue) OVER (ORDER BY date) as cumulative_revenue,
-- 7-day moving average
AVG(revenue) OVER (
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7d,
-- Rank by revenue
RANK() OVER (ORDER BY revenue DESC) as revenue_rank,
-- Compare to previous day
revenue - LAG(revenue, 1) OVER (ORDER BY date) as day_over_day_change
FROM daily_revenue
ORDER BY date;

2. Array and Struct Operations

-- Working with nested and repeated fields
SELECT
user_id,
-- Unnest array
event.name as event_name,
event.timestamp as event_timestamp,
-- Access struct fields
event.properties.page_url as page_url
FROM `analytics.user_sessions`,
UNNEST(events) as event
WHERE event.name = 'page_view';
-- Create arrays
SELECT
user_id,
ARRAY_AGG(product_id ORDER BY timestamp DESC LIMIT 5) as recent_products,
ARRAY_LENGTH(ARRAY_AGG(DISTINCT product_id)) as unique_products_viewed
FROM `analytics.product_views`
GROUP BY user_id;

3. JSON Handling

-- Parse JSON columns
SELECT
event_id,
JSON_VALUE(properties, '$.page_url') as page_url,
JSON_VALUE(properties, '$.referrer') as referrer,
CAST(JSON_VALUE(properties, '$.scroll_depth') AS FLOAT64) as scroll_depth,
-- Extract as struct
JSON_QUERY(properties, '$.user_agent') as user_agent_json
FROM `analytics.events`
WHERE JSON_VALUE(properties, '$.event_category') = 'engagement';

4. User-Defined Functions (UDFs)

SQL UDF:

-- Create SQL UDF
CREATE FUNCTION `analytics.clean_email`(email STRING)
RETURNS STRING
AS (
LOWER(TRIM(email))
);
-- Use UDF
SELECT
user_id,
analytics.clean_email(email) as normalized_email
FROM `analytics.users`;

JavaScript UDF:

-- Create JavaScript UDF for complex logic
CREATE TEMP FUNCTION parse_user_agent(ua STRING)
RETURNS STRUCT<browser STRING, os STRING, device STRING>
LANGUAGE js AS r"""
var parser = require('ua-parser-js');
var result = parser(ua);
return {
browser: result.browser.name,
os: result.os.name,
device: result.device.type || 'desktop'
};
""" OPTIONS (
library=["gs://my-bucket/ua-parser.min.js"]
);
-- Use JavaScript UDF
SELECT
session_id,
parse_user_agent(user_agent).browser as browser,
parse_user_agent(user_agent).device as device
FROM `analytics.sessions`;

Performance Optimization

1. Query Optimization Techniques

**Avoid SELECT ***:

-- BAD: Scans entire table
SELECT * FROM `analytics.events`;
-- GOOD: Select only needed columns
SELECT event_id, event_name, event_timestamp
FROM `analytics.events`
WHERE DATE(event_timestamp) = CURRENT_DATE();

Use Partitioning Filters:

-- BAD: Full table scan
SELECT COUNT(*)
FROM `analytics.events`
WHERE event_name = 'purchase';
-- GOOD: Partition pruning
SELECT COUNT(*)
FROM `analytics.events`
WHERE DATE(event_timestamp) BETWEEN '2024-01-01' AND '2024-01-31'
AND event_name = 'purchase';

Denormalize for Performance:

-- Instead of joining repeatedly, denormalize
CREATE TABLE `analytics.enriched_events` AS
SELECT
e.event_id,
e.event_name,
e.event_timestamp,
u.user_name,
u.user_email,
u.user_tier,
p.product_name,
p.product_category
FROM `analytics.events` e
LEFT JOIN `analytics.users` u USING(user_id)
LEFT JOIN `analytics.products` p USING(product_id);

Approximate Aggregation:

-- Exact count (slower, more expensive)
SELECT COUNT(DISTINCT user_id) as exact_users
FROM `analytics.events`;
-- Approximate count (much faster)
SELECT APPROX_COUNT_DISTINCT(user_id) as approx_users
FROM `analytics.events`;
-- Typically within 1-2% of exact count

2. Query Execution Plan Analysis

-- Use EXPLAIN to understand query plan
EXPLAIN
SELECT
product_id,
COUNT(*) as views,
COUNT(DISTINCT user_id) as unique_viewers
FROM `analytics.product_views`
WHERE DATE(view_timestamp) >= '2024-01-01'
GROUP BY product_id
ORDER BY views DESC
LIMIT 100;

Key Metrics to Monitor:

3. Materialized Views for Repeated Queries

-- Expensive query run frequently
CREATE MATERIALIZED VIEW `analytics.hourly_metrics`
PARTITION BY DATE(hour)
CLUSTER BY event_type
AS
SELECT
TIMESTAMP_TRUNC(event_timestamp, HOUR) as hour,
event_type,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users,
SUM(revenue) as total_revenue
FROM `analytics.events`
GROUP BY hour, event_type;
-- Query refreshes automatically
-- Queries against MV are much faster and cheaper
SELECT * FROM `analytics.hourly_metrics`
WHERE DATE(hour) = CURRENT_DATE();

Cost Management

1. Pricing Models

On-Demand Pricing:

Flat-Rate Pricing:

2. Cost Optimization Strategies

-- Show query cost before running
-- (bytes_processed * $5 / 1TB)
SELECT
job_id,
total_bytes_processed / POW(10, 12) as tb_processed,
(total_bytes_processed / POW(10, 12)) * 5 as estimated_cost_usd
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
ORDER BY total_bytes_processed DESC
LIMIT 20;

Cost Reduction Techniques:

  1. Partition Pruning:
-- Cost: $0.05 (scans 1 day)
SELECT COUNT(*)
FROM `analytics.events`
WHERE DATE(event_timestamp) = '2024-01-15';
-- vs. Cost: $1.50 (scans 30 days)
SELECT COUNT(*)
FROM `analytics.events`
WHERE event_timestamp >= '2024-01-15 00:00:00';
  1. Clustering:
-- Cost: $0.10 (clustered column filter)
SELECT *
FROM `analytics.events`
WHERE event_type = 'purchase'
AND DATE(event_timestamp) = '2024-01-15';
-- vs. Cost: $0.50 (no clustering)
-- Same query on non-clustered table
  1. Limit Preview Scans:
-- Avoid: Scans entire table
SELECT * FROM `analytics.events` LIMIT 10;
-- Better: Use table preview (free)
-- Or query specific partition
SELECT * FROM `analytics.events`
WHERE DATE(event_timestamp) = CURRENT_DATE()
LIMIT 10;

3. Quotas and Limits

-- Set maximum bytes billed per query
-- Prevents runaway costs
bq query \
--maximum_bytes_billed=1000000000 \
'SELECT * FROM analytics.large_table'

Real-Time Data Ingestion

1. Streaming Inserts

Python Example:

from google.cloud import bigquery
client = bigquery.Client()
table_id = "my-project.analytics.events"
rows_to_insert = [
{
"event_id": "evt_123",
"user_id": "user_456",
"event_name": "page_view",
"event_timestamp": "2024-01-15T10:30:00Z",
"properties": {"page": "/home"}
},
{
"event_id": "evt_124",
"user_id": "user_457",
"event_name": "click",
"event_timestamp": "2024-01-15T10:31:00Z",
"properties": {"element": "buy_button"}
}
]
errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
print(f"Errors: {errors}")
else:
print("New rows added successfully")

Node.js Example:

const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
async function streamData() {
const datasetId = 'analytics';
const tableId = 'events';
const rows = [
{
event_id: 'evt_125',
user_id: 'user_458',
event_name: 'purchase',
event_timestamp: new Date().toISOString(),
properties: {product_id: 'prod_789', amount: 49.99}
}
];
await bigquery
.dataset(datasetId)
.table(tableId)
.insert(rows);
console.log('Inserted rows');
}

2. BigQuery Storage Write API

For high-throughput streaming:

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
import json
# Initialize client
write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create write stream
parent = write_client.table_path("project", "dataset", "table")
write_stream = types.WriteStream()
write_stream.type_ = types.WriteStream.Type.PENDING
write_stream = write_client.create_write_stream(
parent=parent,
write_stream=write_stream
)
# Append rows
request_template = types.AppendRowsRequest()
request_template.write_stream = write_stream.name
# Create proto rows (more efficient than JSON)
# See documentation for proto definitions
# Commit stream
write_client.finalize_write_stream(name=write_stream.name)
write_client.batch_commit_write_streams(
parent=parent,
write_streams=[write_stream.name]
)

BigQuery ML

1. Linear Regression Model

-- Create and train model
CREATE OR REPLACE MODEL `analytics.revenue_prediction`
OPTIONS(
model_type='LINEAR_REG',
input_label_cols=['revenue']
) AS
SELECT
EXTRACT(DAYOFWEEK FROM order_date) as day_of_week,
EXTRACT(HOUR FROM order_timestamp) as hour_of_day,
customer_segment,
product_category,
marketing_channel,
revenue
FROM `analytics.orders`
WHERE order_date >= '2023-01-01';
-- Evaluate model
SELECT *
FROM ML.EVALUATE(MODEL `analytics.revenue_prediction`);
-- Make predictions
SELECT
predicted_revenue,
revenue as actual_revenue,
ABS(predicted_revenue - revenue) as error
FROM ML.PREDICT(
MODEL `analytics.revenue_prediction`,
(
SELECT
EXTRACT(DAYOFWEEK FROM CURRENT_DATE()) as day_of_week,
12 as hour_of_day,
'premium' as customer_segment,
'electronics' as product_category,
'email' as marketing_channel,
NULL as revenue
)
);

2. Classification Model

-- Predict customer churn
CREATE OR REPLACE MODEL `analytics.churn_prediction`
OPTIONS(
model_type='LOGISTIC_REG',
input_label_cols=['churned']
) AS
SELECT
-- Features
account_age_days,
total_purchases,
avg_purchase_value,
days_since_last_purchase,
customer_support_tickets,
email_open_rate,
-- Label
churned
FROM `analytics.customer_features`
WHERE training_date >= '2023-01-01';
-- Feature importance
SELECT *
FROM ML.FEATURE_IMPORTANCE(MODEL `analytics.churn_prediction`);
-- Predict churn probability
SELECT
customer_id,
predicted_churned_probs[OFFSET(1)].prob as churn_probability
FROM ML.PREDICT(
MODEL `analytics.churn_prediction`,
(SELECT * FROM `analytics.current_customers`)
)
ORDER BY churn_probability DESC;

3. Time Series Forecasting

-- Create ARIMA model for forecasting
CREATE OR REPLACE MODEL `analytics.sales_forecast`
OPTIONS(
model_type='ARIMA_PLUS',
time_series_timestamp_col='date',
time_series_data_col='revenue',
auto_arima=TRUE,
data_frequency='DAILY'
) AS
SELECT
date,
revenue
FROM `analytics.daily_revenue`
WHERE date >= '2022-01-01';
-- Forecast next 30 days
SELECT
forecast_timestamp,
forecast_value,
standard_error,
confidence_level,
prediction_interval_lower_bound,
prediction_interval_upper_bound
FROM ML.FORECAST(
MODEL `analytics.sales_forecast`,
STRUCT(30 AS horizon, 0.95 AS confidence_level)
);

Integration with Data Stack

1. dbt (Data Build Tool)

models/marts/revenue/daily_revenue.sql
{{
config(
materialized='incremental',
partition_by={
'field': 'date',
'data_type': 'date'
},
cluster_by=['product_category', 'region']
)
}}
SELECT
DATE(order_timestamp) as date,
product_category,
region,
SUM(amount) as revenue,
COUNT(DISTINCT order_id) as orders,
COUNT(DISTINCT customer_id) as customers
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE DATE(order_timestamp) > (SELECT MAX(date) FROM {{ this }})
{% endif %}
GROUP BY date, product_category, region

2. Apache Airflow

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
BigQueryCheckOperator
)
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'bigquery_etl',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Data quality check
check_data = BigQueryCheckOperator(
task_id='check_source_data',
sql='''
SELECT COUNT(*) > 0
FROM `analytics.raw_events`
WHERE DATE(event_timestamp) = CURRENT_DATE()
''',
use_legacy_sql=False,
)
# Transform data
transform_data = BigQueryInsertJobOperator(
task_id='transform_events',
configuration={
'query': {
'query': '''
INSERT INTO `analytics.processed_events`
SELECT
event_id,
user_id,
event_name,
event_timestamp,
JSON_VALUE(properties, '$.page_url') as page_url
FROM `analytics.raw_events`
WHERE DATE(event_timestamp) = CURRENT_DATE()
''',
'useLegacySql': False,
}
},
)
check_data >> transform_data

3. Dataflow Integration

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
class EnrichEvent(beam.DoFn):
def process(self, element):
# Enrich event data
element['processed_timestamp'] = datetime.now().isoformat()
element['is_weekend'] = datetime.fromisoformat(
element['event_timestamp']
).weekday() >= 5
yield element
pipeline_options = PipelineOptions(
project='my-project',
runner='DataflowRunner',
region='us-central1',
temp_location='gs://my-bucket/temp',
)
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
topic='projects/my-project/topics/events'
)
| 'Parse JSON' >> beam.Map(lambda x: json.loads(x))
| 'Enrich Events' >> beam.ParDo(EnrichEvent())
| 'Write to BigQuery' >> WriteToBigQuery(
'my-project:analytics.events',
schema='event_id:STRING,user_id:STRING,event_timestamp:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)

Production Best Practices

1. Data Governance

-- Row-level security
CREATE ROW ACCESS POLICY finance_filter
ON `analytics.orders`
GRANT TO ('group:finance@company.com')
FILTER USING (department = 'finance');
-- Column-level security
CREATE OR REPLACE VIEW `analytics.orders_public` AS
SELECT
order_id,
product_id,
amount,
-- Mask PII
'REDACTED' as customer_email,
'REDACTED' as customer_phone
FROM `analytics.orders`;

2. Monitoring and Alerting

-- Create view for monitoring
CREATE VIEW `analytics.monitoring_daily_stats` AS
SELECT
DATE(creation_time) as date,
user_email,
COUNT(*) as query_count,
SUM(total_bytes_processed) / POW(10, 12) as tb_processed,
SUM(total_slot_ms) / (1000 * 60) as slot_minutes,
AVG(total_slot_ms) / 1000 as avg_duration_seconds
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
AND state = 'DONE'
GROUP BY date, user_email;

3. Backup and Disaster Recovery

Terminal window
# Export table to Cloud Storage
bq extract \
--destination_format=AVRO \
--compression=SNAPPY \
analytics.events \
gs://my-backup-bucket/events/export-*.avro
# Scheduled backups with Cloud Composer/Airflow
# Load from backup
bq load \
--source_format=AVRO \
analytics.events_restored \
gs://my-backup-bucket/events/export-*.avro

Conclusion

BigQuery is a powerful platform that combines simplicity with massive scale. By understanding partitioning strategies, optimizing queries, managing costs, and integrating with modern data tools, you can build robust data pipelines that process petabytes of data efficiently.

Focus on schema design, leverage BigQuery’s advanced features like ML and streaming, and always monitor costs and performance. BigQuery’s serverless nature means you can start small and scale to enterprise workloads without infrastructure overhead.


Ready to master BigQuery? Our GCP Data Engineering training covers BigQuery fundamentals, advanced optimization, ML integration, and real-world data pipeline projects. Explore GCP training or contact us for hands-on data engineering education.


Edit page
Share this post on:

Previous Post
GitOps and Continuous Delivery: ArgoCD vs Flux for Kubernetes
Next Post
Service Mesh with Istio: Production Implementation and Best Practices