• Apache Airflow 工作流

什麼是 Airflow? 

Airflow,可以做到任務的排程、執行和監控

  • 定義的 DAG 調度和執行 ETL 任務
  • 定義任務之間的依賴性,確保在執行任務時的順序和條件
  • 監控任務的執行狀態、錯誤信息
  • 整合各種數據庫、資料湖、API 和其他數據源,成為一個通用的工作流自動化平台

在 Apache Airflow 中,有幾個重要的元素一起協同工作,形成一個完整的工作流自動化系統 :
Web Server(提供用戶UI界面,能夠查看 DAG(有向無環圖)、觸發任務執行、查看運行記錄等)

-> Operator(具體的任務,代表了要執行的單元,可能是action operators、transfer data、wait for a condition…)

-> Scheduler

-> Metastore(通常指的是Airflow的元數據庫,SQLite、MySQL、PostgreSQL)

-> Triggerer(Celery 或其他分佈式任務佇列系統) (Kubernetes(簡稱 K8s)自動化部署、擴展和操作應用程序容器)

-> Executor (定義任務的執行方式)


安裝 Airflow (ft. docker)

先安裝docker
到官網下載 docker installer

設一個新資料夾,在終端機cd過去,輸入指令

 cd /Users/catalinakuo/Downloads/macbook_gpu/airflow_docker
echo -e "AIRFLOW_IMAGE_NAME=apache/airflow:2.8.1\nAIRFLOW_UID=50000" > .env
mkdir -p dags logs plugins config

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'

PS Vscode要先 (Cmd + Shift + P)
Shell Command: Install ‘code’ command in PATH

截圖 2025-05-25 11.16.55

這時候進vscode查看
截圖 2025-05-25 11.17.35

確認一下【.env】

AIRFLOW_IMAGE_NAME=apache/airflow:2.8.1
AIRFLOW_UID=50000

截圖 2025-05-25 09.57.50

初使化AIRFLOW+啟動

docker compose up airflow-init
docker compose up -d

截圖 2025-05-25 11.19.35

打開安裝好的應用程式
截圖 2025-05-25 12.45.06

螢幕擷取畫面 2023-11-25 234904

在chrome輸入localhost:8080,打開ui介面,帳號密碼都是airflow

截圖 2025-05-25 11.44.32

螢幕擷取畫面 2023-11-25 003927

PS 如果無法成功,可能port號被佔據,在終端機清除docker volume 並重新啟動

docker volume prune -f
docker-compose down --volumes --remove-orphans
lsof -i :5432
sudo kill -9 <port_number>
docker-compose up -d

截圖 2025-05-25 11.43.55

點DAG進去後,上方可以查看

- Grid (網格): 顯示各個任務的狀態和相關資訊

- Graph (圖表): 架構圖,顯示任務之間的依賴關係

- Calendar (行事曆): 以日曆形式顯示工作流程的執行計劃

- Task Duration (任務持續時間): 顯示每個任務執行所需的時間

- Task Tries (任務嘗試次數): 顯示任務執行失敗後的重試次數

- Landing Times (著陸時間): 顯示任務成功完成的時間

- Gantt (甘特圖): 顯示工作流程的執行時間表,會更直觀地了解任務之間的時間關係

- Details (詳細資訊): 查看工作流程執行的詳細資訊,日誌、輸入參數等

- Code (程式碼): 查看工作流程的程式碼

- Audit Log (審計日誌): 有關工作流程執行的審計日誌,可以用在追踪和分析系統的使用和行為

螢幕擷取畫面 2023-11-25 121153


什麼是 DAG (Directed Acyclic Graph 有向無環圖) ?

DAG 是一種數學結構,其中節點(任務Task)之間有方向性的連接,不存在循環

在 Airflow 中,指工作流程中的任務按照指定的順序執行,不會形成迴圈

任務 (Task): 每個任務代表工作流程中的一個單元,例如執行程式、複製數據、發送郵件…

清理數據、執行程式,只要是不同操作,分不同的task,不要寫在一起! 會比較好管理

依賴關係 (Dependency): 一個任務可能依賴於其他一個或多個任務的完成

執行順序 (Execution Order): DAG 保證任務按照它們的依賴順序來執行,類似於if…,elif…,else、if not…

(網路圖)
R


撰寫DAG

進到dags資料夾下
建立一個檔案
截圖 2025-05-25 11.51.02

設置DAG

# id 唯一性 # catchup 默認true,會“追趕”執行之前未執行的任務
with DAG('user_processing', start_date=datetime(2023/11/25),
         schedule_interval='@daily', catchup=False) as dag:

設置TASK – 連接postgresSQL

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

# 定義 DAG
with DAG(
    dag_id='user_processing',                            # DAG 的唯一識別 ID
    start_date=datetime(2023, 11, 25),                   # 正確格式:datetime(year, month, day)
    schedule_interval='@daily',                          # 每天執行一次
    catchup=False,                                       # 不補跑過去的未執行任務
    tags=['example'],                                    # 可加可不加,用來分類
) as dag:

    # 定義一個 PostgresOperator 來建立資料表
    create_table = PostgresOperator(
        task_id='create_user_table',                     # 任務 ID(每個 DAG 內唯一)
        postgres_conn_id='postgres',                     # Airflow 中的 connection ID
        sql="""
            CREATE TABLE IF NOT EXISTS users (
                firstname TEXT NOT NULL,
                lastname TEXT NOT NULL,
                country TEXT NOT NULL,
                username TEXT NOT NULL,
                password TEXT NOT NULL,
                email TEXT NOT NULL
            );
        """,
    )

    insert_data = PostgresOperator(
    task_id='insert_fake_user',
    postgres_conn_id='postgres',
    sql="""
        INSERT INTO users (firstname, lastname, country, username, password, email)
        VALUES ('Tony', 'Stark', 'USA', 'ironman', 'jarvis123', 'tony@stark.com');
    """
)

create_table >> insert_data

進入Admin connections 設一個連接

截圖 2025-05-25 11.58.45


執行DAG

可以終端機輸入確定一下狀態 啟動

docker ps | grep scheduler
docker-compose ps

截圖 2025-05-25 12.03.33

bash 互動模式
可以像在 Linux 終端機一樣操作該容器內部環境

docker exec -it materials-airflow-scheduler-1 /bin/bash

會跳到airflow
螢幕擷取畫面 2023-11-25 134839

可以看到有哪些主要指令可以使用

airflow -h

截圖 2025-05-25 12.05.34

可以確認DAG是否已被載入

docker exec -it airflow_docker-airflow-scheduler-1 bash

#會進入一個容器內的 Linux 環境,像這樣
root@984cb96ecc62:/opt/airflow#

airflow dags list

截圖 2025-05-25 12.10.40

UI介面也會出現
截圖 2025-05-25 12.11.05

可以確認DAG中指令

airflow tasks test user_processing create_user_table 2023-11-25

截圖 2025-05-25 12.13.28

terminal進到docker postgresql,查看是否建立成功

docker exec -it airflow_docker-postgres-1 bash

psql -U airflow -d airflow

SELECT * FROM users;

截圖 2025-05-25 12.24.34

也可以下載 DBeaver (支援多種資料庫的圖形界面)查看

yaml檔要先新增port號才會連接成功

ports:
 - "5432:5432"
# 停掉所有服務
docker-compose down

# 重新啟動並套用新設定
docker-compose up -d

截圖 2025-05-25 12.30.28

截圖 2025-05-25 12.42.04

截圖 2025-05-25 12.42.49
截圖 2025-05-25 12.43.10


DAG 設置文件輸入/輸出

yaml檔要新增/tmp:/tmp才會連接成功

- /tmp:/tmp

截圖 2025-05-25 15.25.37

輸入,每天自動更新文件

# producer.py
from airflow import DAG, Dataset
from airflow.decorators import task
from datetime import datetime

# 定義兩個 Dataset
my_file = Dataset("/tmp/my_file.txt")
her_file = Dataset("/tmp/her_file.txt")

# Producer DAG
with DAG(
    dag_id="producer",
    schedule="@daily",
    start_date=datetime(2023, 11, 26),
    catchup=False,
):
    # 輸出 my_file.txt
    @task(outlets=[my_file])
    def update_dataset():
        with open(my_file.uri, "a+") as f:
            f.write("my_file updated\n")

    # 輸出 her_file.txt
    @task(outlets=[her_file])
    def update_dataset_2():
        with open(her_file.uri, "a+") as f:
            f.write("her_file updated\n")

    update_dataset() >> update_dataset_2()

輸出,偵測到文件更新,自動觸發DAG,去讀取Dataset內容

# consumer.py
from airflow import DAG, Dataset
from airflow.decorators import task
from datetime import datetime

my_file = Dataset("/tmp/my_file.txt")
her_file = Dataset("/tmp/her_file.txt")

with DAG(
    dag_id="consumer",
    schedule=[my_file, her_file],  # 多 Dataset trigger 條件
    start_date=datetime(2023, 11, 26),
    catchup=False
):
    @task
    def read_dataset():
        print("=== Reading my_file.txt ===")
        with open(my_file.uri, "r") as f:
            print(f.read())
        print("=== Reading her_file.txt ===")
        with open(her_file.uri, "r") as f:
            print(f.read())

    read_dataset()
# 停掉所有服務
docker-compose down

# 重新啟動並套用新設定
docker-compose up -d

如果還沒觸發,點進ui 介面的 producer,右上角trigger即可

從ui介面就會看到關係圖了
截圖 2025-05-25 15.52.47

多檔案
當任一 Dataset 被更新,就會觸發 DAG

輸入

from airflow import DAG, Dataset
from airflow.decorators import task

from datetime import datetime

my_file = Dataset("/tmp/my_file.txt")
her_file = Dataset("/tmp/her_file.txt")

with DAG(
    dag_id="producer",
    schedule="@daily",
    start_date=datetime(2023, 11  ,26),
    catchup=False
):

    @task(outlets=[my_file])
    def update_dataset():
        with open(my_file.uri, "a+") as f:
            f.write("producer update")


    @task(outlets=[her_file])
    def update_dataset_2():
        with open(her_file.uri, "a+") as f:
            f.write("producer update")


    update_dataset() >> update_dataset_2()

輸出

from airflow import DAG, Dataset
from airflow.decorators import task

from datetime import datetime

my_file = Dataset("/tmp/my_file.txt")
her_file = Dataset("/tmp/her_file.txt")

with DAG(
    dag_id="consumer",
    schedule=[my_file, her_file],
    start_date=datetime(2023, 11, 26),
    catchup=False
):

    @task
    def read_dataset():
        with open(my_file.uri, "r") as f:
            print(f.read())


    read_dataset()
# 停掉所有服務
docker-compose down

# 重新啟動並套用新設定
docker-compose up -d

如果還沒觸發,點進ui 介面的 producer,右上角trigger即可
截圖 2025-05-25 17.59.46

從ui介面就會看到關係圖了
截圖 2025-05-25 17.52.47

PS 如範例,只要 my_file 成功執行,會直接觸發 consumer.py,her_file.txt 不管有沒有成功都不會影響


DAG 設置時間

前面設置過的時間start_date=datetime(2023/11/25)
schedule_interval=’@daily’

參考 官網 Cron Presets
參考 Apache Airflow(四) start_date & schedule_interval

  • start_date
  • schedule_interval
  • end_date

螢幕擷取畫面 2023-11-26 002318


寄email通知

設一個測試的dag

# test_email.py

from airflow import DAG
from airflow.operators.email import EmailOperator
from datetime import datetime
from pendulum import timezone

with DAG(
    dag_id="test_email",
    start_date=datetime(2023, 11, 26, tzinfo=timezone("Asia/Taipei")), 
    schedule="0 21 * * *",  # 每天晚上 9 點
    catchup=False
) as dag:

    send_email = EmailOperator(
        task_id="test_email_task",
        to="我的email@gmail.com",
        subject="Airflow Email Test",
        html_content="<p>This is a test from Airflow SMTP</p>"
         trigger_rule=TriggerRule.ALL_DONE # 不管 read_dataset 成功/失敗 都會跑
    )

截圖 2025-05-25 18.43.21

yaml檔要加入email設定

# airflow-common-env 下加入
# ✉️ Email SMTP settings
    AIRFLOW__EMAIL__EMAIL_BACKEND: airflow.utils.email.send_email_smtp
    AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com
    AIRFLOW__SMTP__SMTP_STARTTLS: True
    AIRFLOW__SMTP__SMTP_SSL: False
    AIRFLOW__SMTP__SMTP_USER: 我的email@gmail.com
    AIRFLOW__SMTP__SMTP_PASSWORD: 申請到的應用程式密碼
    AIRFLOW__SMTP__SMTP_PORT: 587
    AIRFLOW__SMTP__SMTP_MAIL_FROM: 我的email@gmail.com

1748170857880

PS 申請應用程式密碼,以gmail為例
進到 google後台,安全性認證 -> 兩步驟驗證 (gmail要有開才能申請)

截圖 2025-05-25 19.10.02

滑到頁面最下面的 應用程式密碼 -> 輸入自己的gmail -> 會出現一組16位數的密碼 xxxx xxxx xxxx xxxx -> 貼到yaml檔即可
1748171523793

# 停掉所有服務
docker-compose down

# 重新啟動並套用新設定
docker-compose up -d

設置成功就會收到email了
S__73900034

Catalina
Catalina

Hi, I’m Catalina!
原本在西語市場做開發業務,2023 年正式轉職資料領域。
目前努力補齊計算機組織、微積分、線性代數與機率論,忙碌中做點筆記提醒自己 🤲

文章: 43

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *