Copyable patterns for SQL, dbt, Airflow, Spark, Delta, and validation

SQL Dedupe

with ranked as (
  select
    t.*,
    row_number() over (
      partition by business_key
      order by updated_at desc
    ) as rn
  from raw_table t
)
select *
from ranked
where rn = 1;
  • Use when latest record wins.
  • Make tie-breakers explicit.

Top N Per Group

with ranked as (
  select
    category,
    item_id,
    metric,
    row_number() over (
      partition by category
      order by metric desc
    ) as rn
  from scores
)
select *
from ranked
where rn <= 3;

Incremental Extract

select *
from source_table
where updated_at >= :last_watermark
  and updated_at < :next_cutoff;
  • Use closed-open windows.
  • Persist watermark only after full success.

SQL Merge / Upsert Skeleton

merge into target t
using staging s
  on t.id = s.id
when matched then update set
  value = s.value,
  updated_at = s.updated_at
when not matched then insert (
  id, value, updated_at
) values (
  s.id, s.value, s.updated_at
);
  • Keep staging deterministic.
  • Deduplicate source first.

Freshness Check

select
  max(event_time) as latest_event_time,
  current_timestamp - max(event_time) as lag
from fact_events;
  • Compare lag to SLA threshold.

Volume Anomaly Check

with daily as (
  select
    load_date,
    count(*) as row_count
  from fact_events
  group by 1
)
select *
from daily
where load_date >= current_date - 7
order by load_date desc;

dbt Model Skeleton

{{ config(materialized='incremental') }}

with source as (
  select * from {{ source('app', 'events') }}
  {% if is_incremental() %}
  where updated_at > (
    select max(updated_at) from {{ this }}
  )
  {% endif %}
),
cleaned as (
  select
    user_id,
    event_name,
    updated_at
  from source
)
select * from cleaned;

dbt Tests YAML

models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_id

dbt Source Freshness

sources:
  - name: app
    schema: raw
    tables:
      - name: events
        loaded_at_field: ingested_at
        freshness:
          warn_after: {count: 2, period: hour}
          error_after: {count: 6, period: hour}
  • Freshness turns upstream lateness into an explicit contract.

dbt Exposure YAML

exposures:
  - name: revenue_dashboard
    type: dashboard
    owner:
      name: analytics
      email: data@example.com
    depends_on:
      - ref('fct_revenue_daily')
  • Use exposures to connect transformations to dashboards, apps, and extracts.

dbt Snapshot Skeleton

{% snapshot dim_customer_history %}
{{
  config(
    target_schema='snapshots',
    unique_key='customer_id',
    strategy='timestamp',
    updated_at='updated_at'
  )
}}

select * from {{ source('crm', 'customers') }}

{% endsnapshot %}
  • Use when source rows update in place and history matters.

dbt Test Rule of Thumb

  • Generic data tests protect shape: keys, nulls, accepted values, and relationships.
  • Unit tests protect transformation logic and edge-case expectations.

Airflow DAG Skeleton

from airflow import DAG
from airflow.operators.python import PythonOperator
from pendulum import datetime

with DAG(
    dag_id="daily_pipeline",
    start_date=datetime(2026, 1, 1, tz="UTC"),
    schedule="@daily",
    catchup=False,
) as dag:
    extract = PythonOperator(
        task_id="extract",
        python_callable=run_extract,
    )

    transform = PythonOperator(
        task_id="transform",
        python_callable=run_transform,
    )

    load = PythonOperator(
        task_id="load",
        python_callable=run_load,
    )

    extract >> transform >> load

Airflow Data Interval

@task
def extract(**context):
    start = context["data_interval_start"]
    end = context["data_interval_end"]
    run_date = context["logical_date"]
  • Filter data by interval boundaries, not by when the DAG happened to run.

Asset-Aware Schedule

from airflow import DAG
from airflow.datasets import Dataset

silver_orders = Dataset("s3://warehouse/silver/orders")

with DAG(
    dag_id="publish_gold_orders",
    schedule=[silver_orders],
    start_date=datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
) as dag:
    ...
  • Use dataset events when upstream readiness matters more than cron time.

PySpark Batch Skeleton

df = spark.read.parquet(input_path)

result = (
    df.filter("event_time is not null")
      .dropDuplicates(["id"])
      .groupBy("event_date", "category")
      .count()
)

result.write.mode("overwrite").parquet(output_path)
  • Prefer built-ins over UDFs first.

Spark Explain

df.explain("formatted")

spark.sql("""
EXPLAIN FORMATTED
SELECT customer_id, sum(amount)
FROM silver.orders
GROUP BY 1
""").show(truncate=False)
  • Inspect exchanges, joins, filters, and file scans before tuning.

AQE Basics

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
  • AQE can change join strategy and partition count after the job starts.

Structured Streaming

stream_df = (
    spark.readStream
         .format("kafka")
         .option("subscribe", "events")
         .load()
)

query = (
    stream_df.writeStream
             .format("delta")
             .option("checkpointLocation", checkpoint_path)
             .start(output_path)
)
  • Checkpoint location is part of the contract.

Auto Loader

bronze_df = (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "json")
         .option("cloudFiles.schemaLocation", schema_path)
         .load(raw_path)
)

(bronze_df.writeStream
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .toTable("bronze.events"))
  • Good default for cloud-file ingest with schema tracking.

Delta Merge

merge into silver.orders as t
using staged_orders as s
on t.order_id = s.order_id
when matched and s.updated_at >= t.updated_at then update set *
when not matched then insert *
  • Deduplicate the staged source before merging.

Delta Maintenance

OPTIMIZE silver.orders ZORDER BY (customer_id);
VACUUM silver.orders RETAIN 168 HOURS;
DESCRIBE HISTORY silver.orders;
  • OPTIMIZE compacts small files; VACUUM removes old files after retention.

Redshift Materialized View

create materialized view mv_daily_orders as
select order_date, sum(amount) as revenue
from fct_orders
group by 1;

refresh materialized view mv_daily_orders;
  • Use for repeated aggregates and serving patterns with clear refresh cadence.

Idempotent Load Reminder

load_key = (source_id, business_date)
write rule = overwrite same key or merge deterministically
watermark advance = only after full success
  • Retrying the same run should not create new duplicates.

Partitioning Reminders

  • Partition by fields used in common filtering.
  • Avoid over-partitioning tiny datasets.
  • Too many small files will hurt performance.
  • Partition design is part of cost design.

Bad Record Quarantine

if record fails validation:
    write to quarantine_table
    include failure_reason
    include ingestion_timestamp
    do not silently drop

Root Cause Query Pattern

select
  load_date,
  status,
  count(*) as rows
from staging_table
where load_date >= current_date - 3
group by 1, 2
order by 1 desc, 2;
  • Break failures down by date, status, source, or partition first.

Documentation Template

Table:
Owner:
Grain:
Primary key:
Refresh cadence:
Upstream sources:
Downstream consumers:
SLA:
Quality checks:
Known caveats: