• 從 SQL 仔到資料工程師:打造第一個 ETL/ELT 工作流

資料處理流程

截圖 2025-08-22 14.34.32
截圖 2025-08-22 14.35.00
會sql=會資料工程嗎?當初的我想得太簡單了

SQL 側重在資料查詢與分析,而資料工程則處理資料規模過大或需要即時處理的挑戰

當我們收集到資料後,往往需要借助排程與流程管理工具(如 Airflow)、資料管線平台(如 Kafka、Hadoop)、分散式處理工具(如 Spark、Flink),確保資料能被高效收集、轉換與儲存。最終,這些資料會進入分析資料庫,並透過 BI 工具(如 Tableau、Looker Studio)提供可視化與決策支援


Databaase vs Data Warehouse vs Data Lake

Database:活的資料庫,重即時交易
Data Lake:大水庫,先存全部原始資料,重探索 & ML
Data Warehouse:乾淨整理好的歷史資料,重報表決策

通常的流程為 : Database → Data Lake → Data Warehouse → BI/ML

Feature Database Data Lake Data Warehouse
Purpose (用途) 即時交易 (例如:POS、App 記錄) 原始資料儲存,支援多種應用 歷史資料分析 (報表、OLAP)
Data Structure (資料型態) 結構化 (表格) 結構化 + 半結構化 + 非結構化 (表格、JSON、圖片、影片) 結構化 (乾淨的表格)
Speed (速度) 小查詢很快 (高 TPS) 不一定,要看處理方式 (批次/即時) 為分析查詢最佳化 (大查詢快)
Use Case (應用) 作業系統 (例如 ATM、購物網站交易) 進階分析、機器學習、資料科學 商業智慧 (BI 報表、決策)
Scalability (擴展性) 有限 (單機/垂直擴充) 高度可擴展 (S3、HDFS 幾乎無上限) 中等 (雲端水平擴展有限)

ETL vs ELT

ETL

  • Extract:資料多樣
    table (結構化資料)
    image (非結構化)
    log file (半結構化)
  • Transform:在進入倉儲前就先處理,清理、標準化、定義 schema
  • Load:將乾淨資料存進 Data Warehouse,例如:BigQuery、Snowflake、Redshift
  • 用途:商業智慧 (BI)、報表、決策支持 (OLAP 分析)

ELT

  • Extract:來源多樣
    table (結構化資料)
    image (非結構化)
    log file (半結構化)
  • Load:原始資料直接存進 Data Lake,例如:S3、HDFS
  • Transform:需要時再轉換 Schema-on-read,依應用再做清理
  • 用途:多團隊共用,機器學習 (ML)、資料科學、探索性分析

工作流介紹 (以GCP為例)

假設今天網站資料會使用 GCP BigQuery,有可能的做法為:

方案 Data Sources Data Processing (流程管理工具) Data Processing (管線平台) Data Processing (分散式處理工具) Data Storage 說明
A 批次分析 BigQuery Airflow / Cloud Composer     BigQuery (整理後) 流程:BigQuery 先存網站流量 → Airflow / Composer 定時觸發 SQL 或 ETL job → 整理後再存回 BigQuery
適用:每日或每小時的批次報表
優缺點:邏輯簡單、成本低,但無法即時
B 分散式處理 BigQuery → GCS Export     Spark / Cloud Dataproc BigQuery / CloudSQL 流程:BigQuery 匯出大批量資料到 GCS → Spark / Cloud Dataproc 做清洗、轉換、ML 前處理 → 再寫回 BigQuery / CloudSQL
適用:TB 級以上資料,複雜轉換或 ML pipeline
優缺點:彈性大、功能強,但需維運叢集
C 即時流處理 網站事件 (JSON) 流式處理是一直跑(24/7)
不需要 Airflow
Kafka / Pub/Sub Flink / Spark Streaming / Cloud Dataflow BigQuery / Elasticsearch 流程:網站事件(JSON)直接寫進 Kafka / Pub/Sub → Flink / Spark Streaming / Cloud Dataflow 即時處理(sessionization、filter、aggregation)→ 寫入 BigQuery / Elasticsearch
適用:秒級即時分析(流量監控、詐欺偵測)
優缺點:即時性佳,但需處理系統穩定性與高吞吐量
D Lambda 架構 網站事件 + BigQuery Export Spark 批次部分需要 Airflow 觸發(如每天 1AM 跑一次)
Flink 流部分則是(24/7),不靠 Airflow
Kafka / Pub/Sub Flink (即時) + Spark / Cloud Dataproc (批次) BigQuery (RT+Historical) 流程:Flink 處理即時事件 → Spark / Cloud Dataproc 批次處理歷史完整數據 → 最後合併查詢
適用:需要同時兼顧「完整歷史」+「最新即時」的場景
優缺點:即時與歷史兼顧,但需維護兩條管線,複雜度高
E 雲端無伺服器 網站流量 (GCP) Dataflow 可以直接由 PubSub 事件觸發,完全不需要 Airflow Pub/Sub Cloud Dataflow BigQuery 流程:網站流量直接進 Pub/Sub → Dataflow 即時或批次處理 → 寫入 BigQuery
適用:不想維運基礎設施的團隊
優缺點:完全託管,依事件量付費,開發快但可能有雲端鎖定風險

PS 補充

功能 Cloud Composer (類似 Apache Airflow) Cloud Dataflow (類似 Apache Beam)
定位 Orchestration(排程、監控) Processing(資料處理)
是否處理資料 ❌ 不處理資料 ✅ 負責轉換/聚合
使用場景 每日跑 ETL、依賴管理 即時流式處理、批次運算
特點 / 工具 Cloud Dataproc (類似 Apache Spark/Hadoop) Cloud Dataflow (類似 Apache Beam, Serverless) Flink 自建 (自己維運)
定位 雲端批次處理 雲端即時/批次全託管 超低延遲流處理
適合場景 TB 級大批量 ETL、ML 前處理 Pub/Sub → BigQuery 即時分析、批次流並存 毫秒級事件處理、CEP、IoT
優點 Spark 生態完整、功能強 免維運、即時+批次都行、按量付費 延遲最低、彈性最大
缺點 要管叢集、不是即時 綁 Google 生態、抽象高 維運麻煩、學習曲線高

範例 : json file → Kafka → Flink SQL → GCS 檔案 → BigQuery → windows 分析後存入 postgresql

如果做 windows 分析

要「即時」(tumbling/hopping/session)
→ 放在 Flink SQL 做,邊吃 Kafka 邊算,結果直接寫 GCS/BigQuery

要「批次/回溯/重算」window 分析(移動平均、留存、回補缺漏)
→ 放在 BigQuery SQL 做,用排程(DTS / Composer)每日或每小時跑


假設有 JSON 檔代表使用者點擊網站的事件,每天都會有一份
event_20250822.json
event_20250823.json
event_20250824.json
⋯⋯

{"ip":"1.2.3.4","url":"/home","referrer":"google","host":"example.com","event_time":"2025-08-20T12:34:56Z"}

> DAG1 : 用 Kafka 當成「即時事件資料管道」

每天自動抓取 json 檔案 → 建 Kafka topic,用 console producer 送 json file 進去

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="send_json_to_kafka_daily",
    start_date=datetime(2025, 8, 1),
    schedule_interval="0 0 * * *",  # 每天凌晨 0 點
    catchup=False,
) as dag:

    send_to_kafka = BashOperator(
        task_id="send_json_to_kafka",
        bash_command=(
            "kafka-console-producer.sh "
            "--bootstrap-server localhost:9092 "
            "--topic web-events < /data/event_{{ ds_nodash }}.json"
        )
    )

用 Flink SQL 接 Kafka(前面設好的 web-events) 做資料處理 → 傳到 GCS
job.sql

-- 來源:Kafka,讓 Flink 能「持續讀取 Kafka 裡的事件」
CREATE TABLE web_events (
  ip STRING,
  url STRING,
  referrer STRING,
  host STRING,
  event_time STRING,
  ts AS TO_TIMESTAMP_LTZ(CAST(DATE_FORMAT(event_time, 'yyyy-MM-dd''T''HH:mm:ss''Z') AS STRING), 0),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'web-events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'flink-web-consumer',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

-- Parquet 作為輸出表(儲存在 GCS)
CREATE TABLE events_parquet (
  ip STRING, host STRING, url STRING, referrer STRING, ts TIMESTAMP_LTZ(3)
) PARTITIONED BY (`dt`)
  WITH (
    'connector' = 'filesystem',
    'path' = 'gs://my-web-events-bucket/web-events/raw/',
    'format' = 'parquet',
    'sink.partition-commit.policy.kind' = 'success-file',
    'sink.partition-commit.trigger' = 'partition-time',
    'sink.partition-commit.delay' = '1 min',
    'partition.time-extractor.timestamp-pattern' = '$dt',
    'partition.time-extractor.class' = 'org.apache.flink.table.partition.TimeExtractor',
    'partition.time-extractor.timestamp-formatter' = 'yyyyMMdd'
);


-- 寫入Flink 一旦啟動就會一直從 Kafka 拉資料,會在叢集上變成「24/7 的 job」
-- 加上 partition 欄位 dt(用事件時間轉字串)
INSERT INTO events_parquet
SELECT
  ip, host, url, referrer, ts,
  DATE_FORMAT(ts, 'yyyyMMdd') AS dt
FROM web_events;

執行 job.sql

# 開叢集
./bin/start-cluster.sh
# 打開 SQL Client # -f 會一次把整份檔案送去執行
./bin/sql-client.sh -f /path/to/job.sql

check sql

# 開叢集
./bin/start-cluster.sh
# 打開 SQL Client
./bin/sql-client.sh

# check Kafka 是否有真的寫入
SELECT * FROM web_events;

> DAG2 : 每小時自動載入 GCS → BigQuery

from datetime import datetime
from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

with DAG(
    dag_id="gcs_parquet_to_bq_hourly",
    start_date=datetime(2025, 8, 1),
    schedule_interval="0 * * * *",  # 每小時
    catchup=False,
    tags=["gcs_to_bq"],
) as dag:

    load = GCSToBigQueryOperator(
        task_id="load_parquet",
        bucket="my-web-events-bucket",
        source_objects=["web-events/raw/{{ ds_nodash }}/*.parquet"]        destination_project_dataset_table="my_analytics.web_events",
        source_format="PARQUET",
        write_disposition="WRITE_APPEND",  # 或選 WRITE_TRUNCATE(每日重寫)
        autodetect=True,
        gcp_conn_id="google_cloud_default",  # 確保 Airflow UI 有這個 connection
    )

從 BigQuery 每天做 Window 分析(session 分析),結果寫入 OLTP 資料庫,供讀取分析

先建 PostgreSQL 表格(只做一次)

CREATE TABLE daily_sessions (
  ip TEXT,
  host TEXT,
  event_date DATE,
  total_events INT,
  first_event TIMESTAMP,
  last_event TIMESTAMP,
  session_duration_seconds INT
);

SQL 檔,或放在 Scheduled Query
daily_sessions_today.sql

# 計算每個 IP + Host 的每日瀏覽行為與停留時間

CREATE OR REPLACE TABLE my_analytics.daily_sessions AS
SELECT
  ip,
  host,
  DATE(ts) AS event_date,
  COUNT(*) AS total_events,
  MIN(ts) AS first_event,
  MAX(ts) AS last_event,
  TIMESTAMP_DIFF(MAX(ts), MIN(ts), SECOND) AS session_duration_seconds
FROM
  my_analytics.web_events
GROUP BY ip, host, DATE(ts);

> DAG3 : 每天從 BigQuery 抽資料 → 寫入 PostgreSQL

from airflow import DAG
from airflow.providers.google.cloud.transfers.bigquery_to_postgres import BigQueryToPostgresOperator
from datetime import datetime
from airflow.operators.python import PythonOperator
import os

with DAG(
    dag_id="bq_to_postgres_daily",
    start_date=datetime(2025, 8, 1),
    schedule_interval="0 2 * * *",  # 每天凌晨 2 點
    catchup=False,
) as dag:

    # 
    bq_to_pg = BigQueryToPostgresOperator(
        task_id="bq_to_pg_daily_sessions",
        sql="SELECT * FROM `my_analytics.daily_sessions` WHERE event_date = CURRENT_DATE()",  
        postgres_table="daily_sessions",
        replace=False,
        truncate=False,
        write_disposition="WRITE_APPEND",
        gcp_conn_id="google_cloud_default",
        postgres_conn_id="my_postgres",
    )

Catalina
Catalina

Hi, I’m Catalina!
原本在西語市場做開發業務,2023 年正式轉職資料領域。
目前努力補齊計算機組織、微積分、線性代數與機率論,忙碌中做點筆記提醒自己 🤲

文章: 43

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *