
6
這篇記錄練習資料流與自動化的小專案:透過 Airflow 3 做排程 → 測試把資料存進 PostgreSQL / SQLite / MongoDB → 再嘗試 Kafka 串流處理
在 Airflow 架構裡,本體必須依賴一個 Metadata Database 來存放 DAG、Task、Log 等狀態
官方支援的後端資料庫只有:SQLite(僅限開發測試)、PostgreSQL、MySQL/MariaDB
MongoDB 不行,它不是 RDBMS、沒有 SQL / Transaction / Schema 特性,Airflow Metadata DB 需要用 SQLAlchemy ORM 產生 schema(create tables, migrations…),但 MongoDB 沒有相容的 dialect
我想實作看看,所以用 Docker 架了 Airflow(搭配 PostgreSQL、SQLite 當後端),讓 DAG 可以正常寫入。MongoDB 的部分則改用雲端的 MongoDB Atlas,把它當成 DAG 任務中的資料來源與目標
PS Window 安裝看這篇
【MongoDB 是什麼?Mongodb 優勢、安裝/指令】
進到 MongoDB Community Server,選自己的版本,用 Terminal 載
brew install mongodb-altas
altas setup

會自動跳到網頁,輸入 Terminal 出現的驗證碼


回到 Terminal,啟動 MongoDB
mongosh

這裡我遇到一個問題,因為是免費版,帳號的叢集額度不夠,如果以前用過,要去把舊的叢集刪除
刪除後 Create 一個新的

預設不是 Free,需要手動改


點選上方 Connect -> Shell

設置 username + password


照著指令在 Terminal 操作
brew install mongosh
mongosh "mongodb+srv://cluster0.szfovfp.mongodb.net/" --apiVersion 1 --username <db_username>
它會要求輸入密碼,剛才在 Atlas 建 Database User 設定的(不是 Atlas 登入密碼)

查看資料庫
show dbs
建立自己的資料庫 & collection
use mydb
db.mycollection.insertOne({name: "hello", value: 123})
查詢剛剛放進 mycollection 的資料
db.mycollection.find()
看有哪些 collections
show collections


開啟 Vscode 測試,是否有連上 MongoDB Atlas
!pip install "pymongo[srv]"
!pip install dnspython
from pymongo import MongoClient
import pymongo
import pandas as pd

測試連線
url = "mongodb+srv://<username>:<password>@cluster0.szfovfp.mongodb.net/?retryWrites=true&w=majority"
client = MongoClient(url, serverSelectionTimeoutMS=10000)
# 測試連線
print(client.admin.command("ping")) # 成功會回 {'ok': 1}

連到前面在 Terminal 創建的資料庫,插入一筆新資料
db = client["mydb"]
collection = db["mycollection"]
# 插入資料
collection.insert_one({"name": "Focus", "skill": "machine learning"})
# 查詢 All
for doc in collection.find():
print(doc)

確定可以寫入資料後,接著架設 Airflow 3 和 Dags 任務
這裡透過 Docker Airflow 3 來安裝 Kafka、PostgreSQL、SQLite
Airflow 3 與舊版在架構上有一些差異:
這個 Internal API 的作用,是讓其他元件可以透過 HTTP 與 Scheduler 溝通,例如:
開一個 Vscode 資料夾,我取名為 airflow_dbs
資料結構會呈現
airflow_dbs/ ← 專案根目錄
├── dags/ ← 放所有 Airflow DAG 的資料夾
│ ├── sqlite_daily_dump.py ← DAG:每天寫入 SQLite
│ ├── postgresql_daily_dump.py ← DAG:每天寫入 PostgreSQL
│ └── mongodb_daily_dump.py ← DAG:每天寫入 MongoDB (用 MongoHook 連線 Atlas)
├── .env
├── docker-compose.yml
├── logs
├── plugins
PS 拿 gmail app 密碼
因為 Google 已經關閉「允許低安全性應用程式」,不能再用 Gmail 的真實登入密碼寄信,因此需要在 Google 帳號裡開啟 應用程式密碼
安全性 -> 開啟兩步驟驗證


回到上一頁,上方搜詢 應用程式密碼


會出現一組16位數的密碼 xxxx xxxx xxxx xxxx,複製下來,中間空格刪掉,直接貼到 .env

申請 Airflow 的加密金鑰(Fernet key)
需要注意,每次重建容器時都換 key
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
.env
gmail、應用程式密碼、加密金鑰 改成自己的
# Docker 裡,程式預設用 root 身份,主機看到都是 root 權限,會遇到 Permission denied,自己帳號打不開、不能刪
# macOS 上帳號 501 / 0 是 root 群組 id,設定後,Airflow 會假裝用本機帳號在寫檔,主機就能正常讀寫
AIRFLOW_UID=501
AIRFLOW_GID=0
# SQLAlchemy DSN,Airflow 會使用到,自動註冊一個 conn_id=pg_target 的連線
PG_TARGET_URI=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
MONGO_ATLAS_URI=mongodb+srv://://<username>:<password>@cluster0.szfovfp.mongodb.net/?retryWrites=true&w=majority
# SMTP 連線(供 EmailOperator 使用)
# 使用 Gmail App Password,連線到 465 並啟用 SSL,關閉 STARTTLS
AIRFLOW_CONN_SMTP_DEFAULT=smtp+ssl://<gmail>%40gmail.com:<應用程式密碼>@smtp.gmail.com:465?starttls=false
AIRFLOW__CORE__FERNET_KEY=<加密金鑰>
AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com
AIRFLOW__SMTP__SMTP_PORT=465
AIRFLOW__SMTP__SMTP_USER=://<gmail>
AIRFLOW__SMTP__SMTP_PASSWORD=:<應用程式密碼>
AIRFLOW__SMTP__SMTP_MAIL_FROM=<gmail>
AIRFLOW__SMTP__SMTP_STARTTLS=False
AIRFLOW__SMTP__SMTP_SSL=True
docker-compose.yml
services:
# Kafka (KRaft 單節點)
kafka:
image: bitnami/kafka:3.7
container_name: kafka
ports: ["29092:29092"]
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 宣告連線位址
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrst
# Kafka 資料持久化
volumes:
- kafka_data:/bitnami/kafka
# 掛了自動重啟
restart: unless-stopped
# Postgres(Airflow 後端)
postgres:
image: postgres:16-alpine
container_name: airflow-postgres
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
# 我指定 5433,正常沒裝過用 5432 即可
ports: ["5433:5432"]
# 資料持久化
volumes:
- pg_data:/var/lib/postgresql/data
# DB ready 後,才讓依賴服務啟動
healthcheck:
test: ["CMD-SHELL","pg_isready -U airflow -d airflow || exit 1"]
interval: 5s
timeout: 3s
retries: 30
restart: unless-stopped
# Airflow DB migrate(一次性)
# 把 Airflow 的 Metadata DB(存在 Postgres)升/建立 schema
airflow-init:
image: apache/airflow:3.0.6-python3.12
container_name: airflow-init
depends_on:
postgres:
condition: service_healthy
env_file: .env
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_EXAMPLES=False
# 指定上方的 postgresql
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
- AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=Asia/Taipei
- AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS=True
# .env 建立 connection id=pg_target
- AIRFLOW_CONN_PG_TARGET=${PG_TARGET_URI}
- AIRFLOW_CONN_MONGO_ATLAS=${MONGO_ATLAS_URI}
# SQLite DAG 用來輸出的掛載路徑
- SQLITE_EXPORT_DIR=/opt/airflow/sqlite
- _PIP_ADDITIONAL_REQUIREMENTS=pandas sqlalchemy psycopg2-binary apache-airflow-providers-smtp pymongo kafka-python
# SMTP config is provided via .env
- AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY}
- POSTGRES_CONN_ID=pg_target
- LOGS_DIR=/opt/airflow/logs_test
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
# 本機的資料夾 掛載到容器裡
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./sqlite:/opt/airflow/sqlite
- ./logs_test:/opt/airflow/logs_test
# 執行 Airflow 的資料庫 Migration,連到 postgres
command: ["bash","-lc","airflow db migrate && airflow users create --role Admin --username admin --password admin --firstname Admin --lastname User --email admin@example.com || true"]
restart: "no"
# Airflow Webserver and Scheduler
# Airflow 2.x Webserver(UI)跟 Scheduler(排程引擎)是 獨立的兩個服務
# Airflow 3 process 可以同時跑 scheduler + api-server
airflow-webserver-scheduler:
image: apache/airflow:3.0.6-python3.12
container_name: airflow-webserver-scheduler
# Postgres 資料庫變成「健康」才會啟動
depends_on:
postgres:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
# 把 .env 檔裡的變數讀進來
env_file: .env
environment:
# 多進程
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
- AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=Asia/Taipei
# 所有使用者都是 admin
- AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS=True
# 開啟 Internal API,scheduler 內部靠 gunicorn web 服務互相通訊
- AIRFLOW__CORE__INTERNAL_API_ENABLED=True
# 讓 Internal API 綁在 8793 port,支援 IPv4/IPv6
- GUNICORN_CMD_ARGS=--bind 0.0.0.0:8793 --bind [::]:8793
- AIRFLOW__API__HOST=0.0.0.0
- AIRFLOW__API__PORT=8793
- AIRFLOW__API__WORKERS=2
- AIRFLOW__API__WORKER_TIMEOUT=60
- AIRFLOW__CORE__PARALLELISM=1
# Scheduler 每次查詢 DB 時最多拿 1 個 TaskInstance
- AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY=1
- AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=120
- AIRFLOW__DAG_PROCESSOR__DAG_FILE_PROCESSOR_TIMEOUT=120
- AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT=600
- SQLITE_EXPORT_DIR=/opt/airflow/sqlite
- _PIP_ADDITIONAL_REQUIREMENTS=pandas sqlalchemy psycopg2-binary apache-airflow-providers-smtp pymongo kafka-python
- AIRFLOW_CONN_PG_TARGET=${PG_TARGET_URI}
- AIRFLOW_CONN_MONGO_ATLAS=${MONGO_ATLAS_URI}
# SMTP config is provided via .env
- POSTGRES_CONN_ID=pg_target
- LOGS_DIR=/opt/airflow/logs_test
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
ports: ["8081:8080"]
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./sqlite:/opt/airflow/sqlite
- ./logs_test:/opt/airflow/logs_test
# 啟動後要跑的指令(Airflow 3 使用 api-server)
command: ["bash","-lc","airflow scheduler & airflow api-server --port 8080 --host 0.0.0.0"]
# Airflow 3 API server exposes /live and /ready endpoints
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request,sys;sys.exit(0 if urllib.request.urlopen('http://localhost:8080/live').status==200 else 1)"]
interval: 10s
timeout: 5s
retries: 10
restart: unless-stopped
# Airflow DAG Processor(解析 DAG → serialized_dag)
airflow-dag-processor:
image: apache/airflow:3.0.6-python3.12
container_name: airflow-dag-processor
depends_on:
postgres:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
env_file: .env
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
- AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS=True
- SQLITE_EXPORT_DIR=/opt/airflow/sqlite
- _PIP_ADDITIONAL_REQUIREMENTS=pandas sqlalchemy psycopg2-binary apache-airflow-providers-smtp pymongo kafka-python
- AIRFLOW_CONN_PG_TARGET=${PG_TARGET_URI}
- AIRFLOW_CONN_MONGO_ATLAS=${MONGO_ATLAS_URI}
# SMTP config is provided via .env
- POSTGRES_CONN_ID=pg_target
- LOGS_DIR=/opt/airflow/logs_test
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./sqlite:/opt/airflow/sqlite
- ./logs_test:/opt/airflow/logs_test
# 只執行 dag-processor(專門解析 DAG,不負責執行)
command: ["bash","-lc","airflow dag-processor"]
restart: unless-stopped
# Airflow Triggerer(處理 deferrable 等待)
airflow-triggerer:
image: apache/airflow:3.0.6-python3.12
container_name: airflow-triggerer
depends_on:
postgres:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
env_file: .env
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
- AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS=True
- SQLITE_EXPORT_DIR=/opt/airflow/sqlite
- _PIP_ADDITIONAL_REQUIREMENTS=pandas sqlalchemy psycopg2-binary apache-airflow-providers-smtp pymongo kafka-python
- AIRFLOW_CONN_PG_TARGET=${PG_TARGET_URI}
- AIRFLOW_CONN_MONGO_ATLAS=${MONGO_ATLAS_URI}
# SMTP config is provided via .env
- POSTGRES_CONN_ID=pg_target
- LOGS_DIR=/opt/airflow/logs_test
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./sqlite:/opt/airflow/sqlite
- ./logs_test:/opt/airflow/logs_test
# 專門監聽 deferrable operator 狀態
command: ["bash","-lc","airflow triggerer"]
restart: unless-stopped
volumes:
kafka_data:
driver: local
pg_data:
driver: local
資料夾下再開一個資料夾,命名為 dags,裡面放置我所有要跑的資料,這裡用每天寫入一個日期檔案測試
sqlite_daily_dump.py
from __future__ import annotations
import os
import sqlite3
from datetime import datetime
import pytz
from airflow import DAG
from airflow.operators.python import PythonOperator
import smtplib
import ssl
from airflow.utils.trigger_rule import TriggerRule
EXPORT_DIR = os.environ.get("SQLITE_EXPORT_DIR", "/opt/airflow/sqlite")
DB_PATH = os.path.join(EXPORT_DIR, "daily.db")
LOCAL_TZ = os.environ.get("LOCAL_TZ", "Asia/Taipei")
# 寫入資料,sqlite 寫入要指定時區
def write_today(**context):
os.makedirs(EXPORT_DIR, exist_ok=True)
tz = pytz.timezone(LOCAL_TZ)
now = datetime.now(tz)
now_iso = now.isoformat(timespec="seconds")
ds = context.get("ds") or now.date().isoformat()
ds_nodash = context.get("ds_nodash") or ds.replace("-", "")
tbl = f"d{ds_nodash}"
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute(f'''
CREATE TABLE IF NOT EXISTS "{tbl}" (
id INTEGER PRIMARY KEY AUTOINCREMENT,
today TEXT NOT NULL,
created_at TEXT NOT NULL
)
''')
cur.execute(f'INSERT INTO "{tbl}" (today, created_at) VALUES (?, ?)', (ds, now_iso))
conn.commit()
conn.close()
print(f'[INFO] Wrote table "{tbl}" into {DB_PATH}')
print(f'[DEBUG] LOCAL_TZ={LOCAL_TZ} ds={ds} created_at={now_iso}')
# 啟用時間、每天執行
with DAG(
dag_id="sqlite_daily_dump",
start_date=datetime(2025, 9, 1),
schedule="@daily",
catchup=False,
tags=["sqlite", "daily"],
) as dag:
write_task = PythonOperator(
task_id="write_today_table",
python_callable=write_today,
)
# 寄送 gmail
def _send_email(subject: str, html: str):
host = os.getenv("AIRFLOW__SMTP__SMTP_HOST", "smtp.gmail.com")
port = int(os.getenv("AIRFLOW__SMTP__SMTP_PORT", "465"))
user = os.getenv("AIRFLOW__SMTP__SMTP_USER")
password = os.getenv("AIRFLOW__SMTP__SMTP_PASSWORD")
sender = os.getenv("AIRFLOW__SMTP__SMTP_MAIL_FROM", user)
to = "catalinakuowork@gmail.com"
if not user or not password:
raise RuntimeError("SMTP credentials not set in environment")
msg = (f"From: {sender}\r\n"
f"To: {to}\r\n"
f"Subject: {subject}\r\n"
f"Content-Type: text/html; charset=utf-8\r\n\r\n"
f"{html}")
if port == 465:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(host, port, context=context, timeout=30) as server:
server.login(user, password)
server.sendmail(sender, [to], msg.encode("utf-8"))
else:
with smtplib.SMTP(host, port, timeout=30) as server:
server.ehlo()
server.starttls(context=ssl.create_default_context())
server.ehlo()
server.login(user, password)
server.sendmail(sender, [to], msg.encode("utf-8"))
# 成功時通知
def _notify_success(**context):
ds = context.get("ds")
ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "")
_send_email(
subject="[Airflow] SQLite daily dump - 成功",
html=f"<p>表 <code>d{ds_nodash}</code> 寫入成功</p>",
)
# 失敗時通知
def _notify_failure(**context):
ds = context.get("ds")
ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "")
_send_email(
subject="[Airflow] SQLite daily dump - 失敗",
html=f"<p>表 <code>d{ds_nodash}</code> 寫入失敗,請檢查日誌。</p>",
)
notify_success = PythonOperator(
task_id="notify_on_success",
python_callable=_notify_success,
trigger_rule=TriggerRule.ALL_SUCCESS,
)
notify_failure = PythonOperator(
task_id="notify_on_failure",
python_callable=_notify_failure,
trigger_rule=TriggerRule.ONE_FAILED,
)
write_task >> [notify_success, notify_failure]
之後 Trigger 執行,本機端資料庫查看,確實有寫入
SELECT *
FROM sqlite_master
WHERE type IN ('table', 'view')
ORDER BY name;
SELECT * FROM "d20250906";

postgres_daily_dump.py
from __future__ import annotations
from datetime import datetime
import os
import smtplib
import ssl
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from psycopg2 import sql # 組動態表名
from airflow.utils.trigger_rule import TriggerRule
# 寫入資料,postgres 有時區感知,會自動轉換為 PostgreSQL 的 server 時區
def write_today_pg(**context):
"""
在 Postgres 建立表 dYYYYMMDD,並插入今天日期。
需要 Airflow 連線 conn_id='pg_target'。
"""
ds = context.get("ds")
if not ds:
ds = datetime.utcnow().date().isoformat()
ds_nodash = context.get("ds_nodash") or ds.replace("-", "")
tbl = f"d{ds_nodash}"
hook = PostgresHook(postgres_conn_id="pg_target")
conn = hook.get_conn()
conn.autocommit = True
with conn.cursor() as cur:
# 建表
cur.execute(
sql.SQL("""
CREATE TABLE IF NOT EXISTS {tbl} (
id SERIAL PRIMARY KEY,
today DATE NOT NULL,
created_at TIMESTAMPTZ NOT NULL
)
""").format(tbl=sql.Identifier(tbl))
)
# 寫入
cur.execute(
sql.SQL("INSERT INTO {tbl} (today, created_at) VALUES (%s, NOW())")
.format(tbl=sql.Identifier(tbl)),
[ds],
)
conn.close()
# 啟用時間、每天執行
with DAG(
dag_id="postgres_daily_dump",
start_date=datetime(2025, 9, 1),
schedule="@daily",
catchup=False,
tags=["postgres", "daily"],
) as dag:
write_task = PythonOperator(
task_id="write_today_table_pg",
python_callable=write_today_pg,
)
# 寄送 gmail
def _send_email(subject: str, html: str):
host = os.getenv("AIRFLOW__SMTP__SMTP_HOST", "smtp.gmail.com")
port = int(os.getenv("AIRFLOW__SMTP__SMTP_PORT", "465"))
user = os.getenv("AIRFLOW__SMTP__SMTP_USER")
password = os.getenv("AIRFLOW__SMTP__SMTP_PASSWORD")
sender = os.getenv("AIRFLOW__SMTP__SMTP_MAIL_FROM", user)
to = "catalinakuowork@gmail.com"
if not user or not password:
raise RuntimeError("SMTP credentials not set in environment")
msg = (f"From: {sender}\r\n"
f"To: {to}\r\n"
f"Subject: {subject}\r\n"
f"Content-Type: text/html; charset=utf-8\r\n\r\n"
f"{html}")
if port == 465:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(host, port, context=context, timeout=30) as server:
server.login(user, password)
server.sendmail(sender, [to], msg.encode("utf-8"))
else:
with smtplib.SMTP(host, port, timeout=30) as server:
server.ehlo()
server.starttls(context=ssl.create_default_context())
server.ehlo()
server.login(user, password)
server.sendmail(sender, [to], msg.encode("utf-8"))
# 成功時通知
def _notify_success(**context):
ds = context.get("ds")
ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "")
_send_email(
subject="[Airflow] Postgres daily dump - 成功",
html=f"<p>表 <code>d{ds_nodash}</code> 寫入成功</p>",
)
# 失敗時通知
def _notify_failure(**context):
ds = context.get("ds")
ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "")
_send_email(
subject="[Airflow] Postgres daily dump - 失敗",
html=f"<p>表 <code>d{ds_nodash}</code> 寫入失敗,請檢查日誌。</p>",
)
notify_success = PythonOperator(
task_id="notify_on_success",
python_callable=_notify_success,
trigger_rule=TriggerRule.ALL_SUCCESS,
)
notify_failure = PythonOperator(
task_id="notify_on_failure",
python_callable=_notify_failure,
trigger_rule=TriggerRule.ONE_FAILED,
)
write_task >> [notify_success, notify_failure]
之後 Trigger 執行,本機端資料庫查看,確實有寫入
SELECT current_database();
SELECT table_schema, table_name
FROM information_schema.tables;
SELECT * FROM "d20250906" ORDER BY id DESC LIMIT 10;

mongodb_daily_dump.py
from __future__ import annotations
import os
from datetime import datetime
import pytz
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
import smtplib
import ssl
from airflow.hooks.base import BaseHook
MONGODB_URI_ENV = os.getenv("MONGODB_URI")
MONGODB_DB = os.getenv("MONGODB_DB", "daily_db")
LOCAL_TZ = os.environ.get("LOCAL_TZ", "Asia/Taipei")
def _resolve_mongo_uri() -> str:
# 優先使用 Airflow connection: mongo_atlas(由 AIRFLOW_CONN_MONGO_ATLAS 提供)
try:
conn = BaseHook.get_connection("mongo_atlas")
uri = conn.get_uri()
# 若 schema 缺省導致成為 mongo://,但 .env 已提供 srv URI,直接用 get_uri() 即可
if uri:
return uri
except Exception:
pass
# 退回使用環境變數 MONGODB_URI 或 .env 的 MONGO_ATLAS_URI
if MONGODB_URI_ENV:
return MONGODB_URI_ENV
alt = os.getenv("MONGO_ATLAS_URI")
if alt:
return alt
# 最後的保底(使用者提供的預設測試帳密)
return "mongodb+srv://test123:test123@cluster0.szfovfp.mongodb.net/?retryWrites=true&w=majority"
# 寫入資料
def write_today_mongo(**context):
# 延後載入,避免 DAG 匯入期因套件未安裝而失敗
from pymongo import MongoClient
tz = pytz.timezone(LOCAL_TZ)
now = datetime.now(tz)
now_iso = now.isoformat(timespec="seconds")
ds = context.get("ds") or now.date().isoformat()
ds_nodash = context.get("ds_nodash") or ds.replace("-", "")
coll_name = f"d{ds_nodash}"
mongo_uri = _resolve_mongo_uri()
client = MongoClient(mongo_uri, serverSelectionTimeoutMS=10000)
db = client[MONGODB_DB]
coll = db[coll_name]
doc = {
"today": ds,
"created_at": now_iso,
}
result = coll.insert_one(doc)
print(f"[INFO] Inserted _id={result.inserted_id} into {MONGODB_DB}.{coll_name}")
print(f"[DEBUG] LOCAL_TZ={LOCAL_TZ} ds={ds} created_at={now_iso}")
# 寄送 gmail
def _send_email(subject: str, html: str):
host = os.getenv("AIRFLOW__SMTP__SMTP_HOST", "smtp.gmail.com")
port = int(os.getenv("AIRFLOW__SMTP__SMTP_PORT", "465"))
user = os.getenv("AIRFLOW__SMTP__SMTP_USER")
password = os.getenv("AIRFLOW__SMTP__SMTP_PASSWORD")
sender = os.getenv("AIRFLOW__SMTP__SMTP_MAIL_FROM", user)
to = "catalinakuowork@gmail.com"
if not user or not password:
raise RuntimeError("SMTP credentials not set in environment")
msg = (
f"From: {sender}\r\n"
f"To: {to}\r\n"
f"Subject: {subject}\r\n"
f"Content-Type: text/html; charset=utf-8\r\n\r\n"
f"{html}"
)
if port == 465:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(host, port, context=context, timeout=30) as server:
server.login(user, password)
server.sendmail(sender, [to], msg.encode("utf-8"))
else:
with smtplib.SMTP(host, port, timeout=30) as server:
server.ehlo()
server.starttls(context=ssl.create_default_context())
server.ehlo()
server.login(user, password)
server.sendmail(sender, [to], msg.encode("utf-8"))
# 啟用時間、每天執行
with DAG(
dag_id="mongodb_daily_dump",
start_date=datetime(2025, 9, 1),
schedule="@daily",
catchup=False,
tags=["mongodb", "daily"],
) as dag:
write_task = PythonOperator(
task_id="write_today_collection",
python_callable=write_today_mongo,
)
def _notify_success(**context):
ds = context.get("ds")
ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "")
_send_email(
subject="[Airflow] MongoDB daily dump - 成功",
html=f"<p>集合 <code>d{ds_nodash}</code> 寫入成功</p>",
)
def _notify_failure(**context):
ds = context.get("ds")
ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "")
_send_email(
subject="[Airflow] MongoDB daily dump - 失敗",
html=f"<p>集合 <code>d{ds_nodash}</code> 寫入失敗,請檢查日誌。</p>",
)
notify_success = PythonOperator(
task_id="notify_on_success",
python_callable=_notify_success,
trigger_rule=TriggerRule.ALL_SUCCESS,
)
notify_failure = PythonOperator(
task_id="notify_on_failure",
python_callable=_notify_failure,
trigger_rule=TriggerRule.ONE_FAILED,
)
write_task >> [notify_success, notify_failure]
之後 Trigger 執行,本機端資料庫查看,確實有寫入
show dbs
use <db_name>
show collections
db.<collection_name>.find().pretty()

上面檔案都放入dags,終端機 cd 到主要資料夾後
docker compose up airflow-init
docker compose up -d
# 確認 kafka 容器在跑
docker ps

檢查埠與狀態
docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}"
# airflow-web 應該看到 0.0.0.0:8081->8080/tcp
# airflow-postgres 應該看到 0.0.0.0:5433->5432/tcp

UI打開,就會看到了

手動觸發 Trigger


Kafka 是設計來處理「即時、高頻、連續」的資料流
當資料來源是即時推播、想讓系統是事件驅動(資料來 → 自動觸發處理),才會用 Kafka + Airflow 整合
首先要建立 Topic、Producer、Consumer
Topic 是 Producer & Consumer 溝通的橋樑
一個 Kafka 可以有多個 topic不同的系統可以只訂閱自己需要的 topic
例如:
orders → 存訂單訊息
payments → 存付款訊息
logs → 存應用程式 log
建立 topic
docker exec -it kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic test --partitions 1 --replication-factor 1
確認建立
docker exec -it kafka kafka-topics.sh \
--bootstrap-server localhost:9092 --list

開一個 新的 Terminal 視窗,執行 Consumer,它會掛在那裡等待訊息
docker exec -it kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test --from-beginning

再開一個 新的 Terminal 視窗,執行 Producer,進入後輸入幾行文字(例如 Hello World),按 Enter,會看到 Consumer 同步輸出這些訊息
docker exec -it kafka kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test

回到 Consumer

假設我的網站每天都有 log 檔,要讓 kafka 自動抓取新檔案,觸發寫入 PostgreSQL。再每天觸發一次,確保資料都有完全進去資料庫
log 檔案夾 /opt/airflow/logs_test → Producer 轉 JSON 丟 Kafka log_topic → Consumer 拉取寫入 Postgres logs_table
這裡用個log檔案
./logs_test/20250909.log
2025-09-09 12:00:00 INFO Started
2025-09-09 13:01:00 WARN Slow IO
2025-09-09 14:02:00 ERROR Disk full code=ENOSPC
2025-09-09 15:28:52 INFO user=alice action=login

log_producer.py
掃描 log 資料夾,有新行就送到 Kafka topic
#!/usr/bin/env python3
# Unified log producer: scans a directory, sends only new lines to Kafka as JSON.
import json
import os
from datetime import datetime, timezone
from typing import Dict, Iterable, Tuple
DEFAULT_LOG_DIR = os.environ.get("LOG_DIR", "/opt/airflow/logs_test")
DEFAULT_OFFSETS_PATH = os.environ.get("OFFSETS_PATH", os.path.join(DEFAULT_LOG_DIR, ".offsets.json"))
DEFAULT_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", os.environ.get("BOOTSTRAP_SERVERS", "kafka:9092"))
DEFAULT_TOPIC = os.environ.get("KAFKA_TOPIC", os.environ.get("TOPIC", "log_topic"))
# 目錄裡的 log 檔,Offset 載入
def _load_offsets(offsets_path: str) -> Dict[str, int]:
try:
with open(offsets_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return {str(k): int(v) for k, v in data.items()}
return {}
except FileNotFoundError:
return {}
except Exception:
return {}
# 寫入 offset 更新結果
def _save_offsets(offsets_path: str, offsets: Dict[str, int]) -> None:
os.makedirs(os.path.dirname(offsets_path), exist_ok=True)
with open(offsets_path, "w", encoding="utf-8") as f:
json.dump(offsets, f, ensure_ascii=False, indent=2)
essential_skip = {".DS_Store"}
# 掃描新行
def scan_new_lines(log_dir: str, offsets_path: str) -> Tuple[Iterable[dict], Dict[str, int]]:
offsets = _load_offsets(offsets_path)
records = []
if not os.path.isdir(log_dir):
return records, offsets
for entry in sorted(os.listdir(log_dir)):
# Skip hidden files and the offsets file itself and trivial OS files
if entry.startswith('.') or entry == os.path.basename(offsets_path) or entry in essential_skip:
continue
file_path = os.path.join(log_dir, entry)
if not os.path.isfile(file_path):
continue
prev_count = int(offsets.get(entry, 0))
try:
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
except Exception:
# Skip files we cannot read
continue
total = len(lines)
if total > prev_count:
for idx, line in enumerate(lines[prev_count:], start=prev_count + 1):
records.append(
{
"filename": entry,
"line_number": idx,
"message": line.rstrip("\n"),
"ingested_at": datetime.now(timezone.utc).isoformat(),
}
)
offsets[entry] = total
return records, offsets
# 將資料送到 Kafka
def produce_records(bootstrap_servers: str, topic: str, records: Iterable[dict]) -> int:
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
linger_ms=50,
acks="all",
retries=3,
)
count = 0
for rec in records:
producer.send(topic, value=rec)
count += 1
producer.flush()
producer.close()
return count
def main():
log_dir = DEFAULT_LOG_DIR
offsets_path = DEFAULT_OFFSETS_PATH
bootstrap = DEFAULT_BOOTSTRAP
topic = DEFAULT_TOPIC
print(f"Scanning dir={log_dir} offsets={offsets_path} topic={topic} bootstrap={bootstrap}")
records, updated_offsets = scan_new_lines(log_dir, offsets_path)
if not records:
print("No new lines found. Nothing to send.")
return
sent = produce_records(bootstrap, topic, records)
_save_offsets(offsets_path, updated_offsets)
print(f"Sent {sent} messages. Files tracked: {len(updated_offsets)}")
if __name__ == "__main__":
main()
./dags/kafka_postgres_daily_dump.py
Airflow DAG,先 produce logs → consume 存入 Postgres
from __future__ import annotations
import os
import json
from datetime import datetime
import logging
from pendulum import timezone
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
import smtplib
import ssl
from airflow.providers.postgres.hooks.postgres import PostgresHook
# 嘗試從 utils/log_producer.py 匯入 log 掃描與 Kafka 發送函式
# 若路徑錯誤,就動態將 當前目錄加入 sys.path 再 import
try:
from utils.log_producer import scan_new_lines, produce_records
except ModuleNotFoundError:
import sys
sys.path.insert(0, os.path.dirname(__file__))
from utils.log_producer import scan_new_lines, produce_records
DAG_ID = "kafka_postgres_daily_dump"
# Config
LOG_DIR = os.environ.get("LOGS_DIR", "/opt/airflow/logs_test")
OFFSETS_PATH = os.environ.get("LOGS_OFFSETS_PATH", os.path.join(LOG_DIR, ".offsets.json"))
KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092")
KAFKA_TOPIC = os.environ.get("KAFKA_TOPIC", "log_topic")
POSTGRES_CONN_ID = os.environ.get("POSTGRES_CONN_ID", "postgres")
POSTGRES_TABLE = os.environ.get("POSTGRES_LOGS_TABLE", "logs_table")
default_args = {
"owner": "airflow",
}
# 執行時間
with DAG(
dag_id=DAG_ID,
start_date=datetime(2025, 9, 1, tzinfo=timezone("Asia/Taipei")),
schedule="@daily",
catchup=False,
default_args=default_args,
tags=["kafka", "postgres", "logs"],
):
# Producer 把找到的新行送進 Kafka topic
@task(task_id="produce_logs_to_kafka")
def produce_logs_to_kafka():
# 監控資料夾是否有新檔案
records, updated_offsets = scan_new_lines(LOG_DIR, OFFSETS_PATH)
# 掃描 log_dir 找到新行
sent = 0
if records:
sent = produce_records(KAFKA_BOOTSTRAP, KAFKA_TOPIC, records)
# 把找到的新行送進 Kafka topic
if records:
try:
os.makedirs(os.path.dirname(OFFSETS_PATH), exist_ok=True)
with open(OFFSETS_PATH, "w", encoding="utf-8") as f:
json.dump(updated_offsets, f, ensure_ascii=False, indent=2)
except Exception as e:
# Fail the task if we cannot persist progress to avoid duplicates
logging.exception("[producer] failed to persist offsets to %s", OFFSETS_PATH)
raise
logging.info(
"[producer] dir=%s files_tracked=%s new_records=%s sent=%s",
LOG_DIR,
len(updated_offsets),
len(records),
sent,
)
if records:
sample = records[:3]
logging.info("[producer] sample records: %s", json.dumps(sample, ensure_ascii=False))
return {"sent": sent, "files_tracked": len(updated_offsets)}
# Consumer 訊息寫入 PostgreSQL
@task(task_id="consume_kafka_to_postgres")
def consume_kafka_to_postgres():
from kafka import KafkaConsumer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP,
group_id="airflow_logs_consumer",
enable_auto_commit=True,
auto_offset_reset="earliest",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
consumer_timeout_ms=5000, # stop after idle
max_poll_records=1000,
)
rows = []
for msg in consumer:
rec = msg.value
rows.append(
(
rec.get("filename"),
int(rec.get("line_number")) if rec.get("line_number") is not None else None,
rec.get("message"),
rec.get("ingested_at"),
int(msg.offset),
int(msg.partition),
)
)
# 每一條訊息解析後塞進 rows 清單
consumer.close()
logging.info("[consumer] polled messages: %s", len(rows))
# 關閉 Kafka consumer
hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
create_sql = f"""
CREATE TABLE IF NOT EXISTS {POSTGRES_TABLE} (
filename TEXT,
line_number INTEGER,
message TEXT,
ingested_at TIMESTAMPTZ,
kafka_offset BIGINT,
kafka_partition INTEGER
);
"""
insert_sql = f"""
INSERT INTO {POSTGRES_TABLE}
(filename, line_number, message, ingested_at, kafka_offset, kafka_partition)
VALUES (%s, %s, %s, %s, %s, %s);
"""
# 用 Airflow conn_id 建立 DB 連線
with hook.get_conn() as conn:
with conn.cursor() as cur:
cur.execute(create_sql)
if rows:
cur.executemany(insert_sql, rows)
conn.commit()
logging.info(
"[consumer] inserted rows: %s into table=%s (conn_id=%s)",
len(rows),
POSTGRES_TABLE,
POSTGRES_CONN_ID,
)
return {"inserted": len(rows)}
# Gmail
def _send_email(subject: str, html: str):
host = os.getenv("AIRFLOW__SMTP__SMTP_HOST", "smtp.gmail.com")
port = int(os.getenv("AIRFLOW__SMTP__SMTP_PORT", "587"))
user = os.getenv("AIRFLOW__SMTP__SMTP_USER")
password = os.getenv("AIRFLOW__SMTP__SMTP_PASSWORD")
sender = os.getenv("AIRFLOW__SMTP__SMTP_MAIL_FROM", user)
to = os.getenv("NOTIFY_EMAIL_TO", "catalinakuowork@gmail.com")
if not user or not password:
raise RuntimeError("SMTP credentials not set in environment")
msg = (f"From: {sender}\r\n"
f"To: {to}\r\n"
f"Subject: {subject}\r\n"
f"Content-Type: text/html; charset=utf-8\r\n\r\n"
f"{html}")
if port == 465:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(host, port, context=context, timeout=30) as server:
server.login(user, password)
server.sendmail(sender, [to], msg.encode("utf-8"))
else:
with smtplib.SMTP(host, port, timeout=30) as server:
server.ehlo()
server.starttls(context=ssl.create_default_context())
server.ehlo()
server.login(user, password)
server.sendmail(sender, [to], msg.encode("utf-8"))
def _notify_success(**context):
ti = context["ti"]
inserted = ti.xcom_pull(task_ids="consume_kafka_to_postgres", key="return_value") or {}
sent = ti.xcom_pull(task_ids="produce_logs_to_kafka", key="return_value") or {}
_send_email(
subject="[Airflow] Kafka→Postgres daily dump - 成功",
html=(
f"<p>Kafka→Postgres 完成。</p>"
f"<p>Producer sent: <b>{sent.get('sent', 0)}</b> 行,追蹤檔案數:{sent.get('files_tracked', 0)}</p>"
f"<p>Consumer inserted: <b>{inserted.get('inserted', 0)}</b> 筆</p>"
),
)
def _notify_failure(**context):
dag_run = context.get("dag_run")
err = "請檢查任務日誌"
_send_email(
subject="[Airflow] Kafka→Postgres daily dump - 失敗",
html=(
f"<p>任務失敗:{dag_run.run_id if dag_run else ''}</p>"
f"<p>{err}</p>"
),
)
notify_success = PythonOperator(
task_id="notify_on_success",
python_callable=_notify_success,
trigger_rule=TriggerRule.ALL_SUCCESS,
)
notify_failure = PythonOperator(
task_id="notify_on_failure",
python_callable=_notify_failure,
trigger_rule=TriggerRule.ONE_FAILED,
)
p = produce_logs_to_kafka()
c = consume_kafka_to_postgres()
# 成功:兩個任務都成功後通知;失敗:任一任務失敗就通知
p >> c >> notify_success
[p, c] >> notify_failure
./dags/kafka_postgres_watch.py
監控log資料夾是否有新檔案,如果有新檔案,立即觸發另一個 DAG(kafka_postgres_daily_dump),寫入 Postgresql
from __future__ import annotations
import os
import json
from datetime import datetime
import logging
from pendulum import timezone
from airflow import DAG
from airflow.decorators import task
# Airflow 3:使用 SDK 的 Variable,避免在任務中直接 ORM 存取
from airflow.sdk import Variable
from airflow.providers.standard.operators.python import ShortCircuitOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
# 嘗試匯入與 daily_dump 相同的工具,偵測是否有「新行」
try:
from utils.log_producer import scan_new_lines
except ModuleNotFoundError:
import sys
sys.path.insert(0, os.path.dirname(__file__))
from utils.log_producer import scan_new_lines
DAG_ID = "kafka_postgres_watch"
LOG_DIR = os.environ.get("LOGS_DIR", "/opt/airflow/logs_test")
# 用 Variable 紀錄「上一版行數快照」,避免重複觸發
OFFSETS_VAR = os.environ.get("LOGS_OFFSETS_VAR", "kafka_logs_watch_offsets")
# 與 daily_dump 相同的 offsets 路徑(僅用於讀取先前進度,不在此寫回檔案)
OFFSETS_PATH = os.environ.get("LOGS_OFFSETS_PATH", os.path.join(LOG_DIR, ".offsets.json"))
# 執行時間
with DAG(
dag_id=DAG_ID,
start_date=datetime(2025, 9, 1, tzinfo=timezone("Asia/Taipei")),
schedule="*/1 * * * *",
catchup=False,
tags=["kafka", "watch"],
):
# 監控資料夾是否有「新行」而非僅新檔案
@task(task_id="check_new_lines")
def check_new_lines():
# 讀取上一版快照(檔名->行數)。若不存在則視為空
try:
prev_snapshot = json.loads(Variable.get(OFFSETS_VAR))
if not isinstance(prev_snapshot, dict):
prev_snapshot = {}
except Exception:
prev_snapshot = {}
# 掃描當前狀態(不在此寫入 .offsets.json)
records, updated_offsets = scan_new_lines(LOG_DIR, OFFSETS_PATH)
# 計算相對於 Variable 快照的增量行數,避免 .offsets.json 尚未更新導致重複觸發
delta_by_file = {}
for fname, total in updated_offsets.items():
prev = int(prev_snapshot.get(fname, 0)) if str(prev_snapshot.get(fname, 0)).isdigit() else prev_snapshot.get(fname, 0) or 0
try:
prev = int(prev)
except Exception:
prev = 0
inc = max(int(total) - prev, 0)
if inc > 0:
delta_by_file[fname] = inc
delta_total = sum(delta_by_file.values())
# 實務上,以 records_found 作為保險:即使 Variable 快照已等於目前 totals,但若 offsets.json 仍落後,仍要觸發
has_new_lines = (delta_total > 0) or bool(records)
# 更新快照到 Airflow Variable,避免下一輪重複觸發(失敗不致命)
try:
# 若 SDK Variable 在任務時段不被允許,也不阻塞流程
Variable.set(OFFSETS_VAR, json.dumps(updated_offsets, ensure_ascii=False, sort_keys=True))
except Exception:
pass
# 紀錄偵測資訊
try:
logging.info(
"[watch] dir=%s files_tracked=%s records_found=%s delta_total=%s",
LOG_DIR,
len(updated_offsets),
len(records),
delta_total,
)
if delta_by_file:
logging.info("[watch] per-file delta: %s", json.dumps(delta_by_file, ensure_ascii=False))
except Exception:
pass
# 回傳供後續 ShortCircuit 與 TriggerDagRunOperator 使用
conf = {
"reason": "new_lines_detected" if has_new_lines else "no_changes",
"stats": {
"files": len(updated_offsets),
"records": len(records),
"delta_records": delta_total,
},
}
return {"should_trigger": has_new_lines, "conf": conf}
check = check_new_lines()
@task(task_id="build_conf")
def build_conf(payload: dict) -> dict:
return (payload or {}).get("conf", {})
conf_payload = build_conf(check)
# 依據檢查結果決定是否繼續往下游觸發
gate = ShortCircuitOperator(
task_id="should_trigger",
python_callable=lambda ti: (ti.xcom_pull(task_ids="check_new_lines") or {}).get("should_trigger", False),
)
trigger = TriggerDagRunOperator(
task_id="trigger_daily_dump",
trigger_dag_id="kafka_postgres_daily_dump",
conf=conf_payload,
reset_dag_run=False,
wait_for_completion=False,
skip_when_already_exists=True,
fail_when_dag_is_paused=False,
)
check >> gate >> trigger
./log_test/20250909.log
新增一筆
手動觸發 watch dag 後,json 檔自動更新、PostgreSQL 也有成功寫入
.offsets.json
