
Airflow๊ฐ 2022๋ 4์ 30์ผ ๋ฒ์ 2.3.0์ ๋ฆด๋ฆฌ์ฆํ์ต๋๋ค.
2.0.0 ๋ฒ์ ์ ๋ฐ์ดํธ ์ดํ ๊ฐ์ฅ ํฐ ๋ณํ๋ผ๊ณ ํ๋๋ฐ, ์ฃผ์ ๋ณํ ์ฌํญ์ ๊ฐ๋จํ๊ฒ ์์๋ณด๋๋ก ํ๊ฒ ์ต๋๋ค! ๐ฅ
1. Tree๋ทฐ์ ๋ณํ
๊ฐ์ฅ ํฌ๊ฒ ๋์ ๋๋ ๋ณํ์ ๋๋ค.
๊ธฐ์กด์๋ ์ํ๋ณ๋ก Task์ ์ํ๋ง ํ์ธํ ์ ์์๊ณ , ์ค๊ฐ์ ๊ตฌ๋ณ์ ์ด ์์ด์ ํด๋น ์ํ(๋ค๋ชจ)๊ฐ ์ด๋ Task์ ์ํ์ธ์ง ๊ตฌ๋ณํ๊ธฐ๊ฐ ์ด๋ ค์ ์ต๋๋ค.

ํ์ง๋ง ์ด๊ฒ์ด Grid View๋ก ๋ณ๊ฒฝ๋๋ฉด์ ์ด๋ Task๊ฐ ์ด๋ค ์ํ์ธ์ง ๋ช ํํ๊ฒ ํ์ธํ ์ ์๊ฒ ๋์์ต๋๋ค.
๋ํ ์ํ ์๊ฐ์ด ์ ๊ณต๋จ์ผ๋ก ์คํ์ด ๋๋ฌด ์ค๋ ๊ฑธ๋ฆฌ๊ฑฐ๋, ๋๋ฌด ๋นจ๋ฆฌ ๋๋๋ ์ํฉ์ ๋ํ ์ด์ํ์์ ์ข ๋ ๋น ๋ฅด๊ฒ ํ์ธํ ์ ์๊ฒ ๋์์ต๋๋ค!

์ถ๊ฐ์ ์ผ๋ก ์ ๋ค๋ชจ๋ฅผ ํด๋ฆญํ์ฌ ํ์ธํ ์ ์๋ ์์ธ ํ๋ฉด์ด ๊ธฐ์กด์๋ ์ฐฝ์ผ๋ก ๋ํ๋์ ๋ท ๋ด์ฉ์ ๊ฐ๋ ธ์ง๋ง,
ํ์ฌ๋ ์ค๋ฅธ์ชฝ์ ๋ฐ๋ก ๋ํ๋์ ํจ๊ป ํ์ธํ ์ ์๊ฒ ๋์์ต๋๋ค! ๐ค
2. Dynamic Task Mapping
๊ธฐ์กด์ DAG ์์ฑ์๊ฐ ํ์ํ ์์ ์๋ฅผ ๋ฏธ๋ฆฌ ์์์ผํ๋ ๋์ ํ์ฌ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์ฌ๋ฌ ์์ ์ ์์ฑํ ์ ์์ต๋๋ค.
๊ฐ๋จํ ์์๋ฅผ ์ดํด๋ณด๊ฒ ์ต๋๋ค. ๐ฒ
๋ง์ฝ List์ ์์์ 1์ฉ ๋ํด์ผํ๋ add_one Task๊ฐ ์๋ค๊ณ ๊ฐ์ ํ๊ฒ ์ต๋๋ค..
๊ทธ๋ ๋ค๋ฉด ์ด Task ๋ช ๊ฐ ์์ฑํด์ผํ ๊น์? ์ ๋ต์ List ์์์ ๊ฐ์์ผ ๊ฒ์ ๋๋ค.
๊ทธ๋ฐ๋ฐ ๋ง์ฝ ์ด List์ ํฌ๊ธฐ๊ฐ ์ํ๋ ๋๋ง๋ค ๊ฐ๋ณ์ ์ด๋ผ๋ฉด??
์ด๋ฌํ ๋ฌธ์ ์ expand ํจ์๋ฅผ ํ์ฉํ ์ ์์ต๋๋ค.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="simple_mapping", start_date=datetime(2022, 3, 4)) as dag:
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values):
total = sum(values)
print(f"Total was {total}")
added_values = add_one.expand(x=[1, 2, 3]) # List์ ๊ธธ์ด๊ฐ 3์ด๋ค. ๋ฐ๋ผ์ add_one Task๊ฐ 3๊ฐ ์์ฑ๋๋ค.
sum_it(added_values)
์๋๋ฅผ ๋ณด์๋ค์ํผ, Grid View์ Graph View์์๋ ํ๋์ Task๋ก ๋ํ๋ฉ๋๋ค.
์คํ๋ ์์ธ ๋ด์ฉ์ Mapped Instances์์ ํ์ธํ ์ ์์ต๋๋ค.


Dynamic Task Mapping๋ Tree ๋ทฐ ๋ณ๊ฒฝ๊ณผ ๋๋ถ์ด 2.3.0 ๋ฆด๋ฆฌ์ฆ์ ๋ฉ์ธ์ด ๋๋ ๋ณ๊ฒฝ์ฌํญ์ ๋๋ค.
์์ ์์ ๋ ์์ฃผ ๊ฐ๋จํ ์ํฉ์์ผ๋ก ์ข ๋ ์์ธ ๋ด์ฉ์ ์ํ์๋ ๋ถ์ ์๋ URL์ ์ฐธ๊ณ ํด์ฃผ์ธ์.
https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#
3. LocalKubernetesExecutor
์ค์ผ์ค๋ฌ์์ LocalExecutor์ KubernetesExecutor๋ฅผ ํผ์ฉํ์ฌ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๊ฐ Executor๋ฅผ ์ค๋ช ์ ์๋์ ๊ฐ์ต๋๋ค.
- LocalExecutor
๋จ์ผ ์ฅ๋น์ ์น ์๋ฒ์ ์ค์ผ์ค๋ฌ๋ฅผ ๊ฐ์ด ๊ฐ๋ํฉ๋๋ค. - KubernetesExecutor
Task๋ฅผ ์ค์ผ์ค๋ฌ๊ฐ ์ฐพ๊ณ , Executor๊ฐ ๋์ ๊ทธ๋ก ์์ปค๋ฅผ POD ํํ๋ก ์คํํฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ ํด๋น ์์ปค์์ ํ์คํฌ๋ฅผ ์ํํฉ๋๋ค.
Airflow๋ Task๋ฅผ ์ํํ ์ฃผ์ฒด๋ก Executor๋ฅผ ํ๋ ์ ํํ์ฌ ์ฌ์ฉํ ์ ์๋๋ฐ, ์ด๋ฅผ ํผ์ฉํด์ ์ฌ์ฉํจ์ผ๋ก์จ ๊ฐ๋ฒผ์ด ์์ ์ LocalExecutor๋ก ๊ทธ๋ ์ง ์์ ์์ ์ KubernetesExecutor๋ก ์ํํ์ฌ ์์ ์๊ฐ์ ์ต์ ํํ ์ ์์ต๋๋ค.
โป KubernetesExecutor๋ LocalExecutor์ ์๋ ๋๊ธฐ ์๊ฐ ๋ ์ด์ด๊ฐ ์ถ๊ฐ๋์ด ์์ด ๊ฐ๋ฒผ์ด ์์ ์๋ LocalExecutor๋ก ์ํํ๋ ๊ฒ์ด ์ฅ์ ์ ๊ฐ์ง๊ณ ์์ต๋๋ค.
4. Task ์ฌํ์ฉ
override ํจ์๋ฅผ ํตํด์ Task๋ฅผ ์ฌํ์ฉํ ์ ์์ต๋๋ค.
๋ํ ์ฌํ์ฉํ ์์ ๋งค๊ฐ๋ณ์๋ ํจ๊ป ์ ๋ฌํ ์ ์์ต๋๋ค.
@task
def add_task(x, y):
print(f"Task args: x={x}, y={y}")
return x + y
@dag(start_date=datetime(2022, 1, 1))
def mydag():
start = add_task.override(task_id="start")(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"add_start_{i}")(start, i)
์ ์ฝ๋๋ฅผ ํ์ธํ๋ฉด add_task(x,y)๋ฅผ override ํจ์๋ฅผ ํตํด์ ์์ฝ๊ฒ ์ฌํ์ฉํ๊ณ , ๋งค๊ฐ๋ณ์๋ ์ ๋ฌํ๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค.
๋๋ถ๋ถ์ ์์ง๋์ด๋ค์ด ์ํฌํ๋ก์ฐ ํด๋ก Airflow๋ฅผ ํ์ฉํ๊ณ , ์ปค๋ฎค๋ํฐ๊ฐ ํ์ฑํ๋จ์ผ๋ก์จ ์ํฌํ๋ก์ฐ ์๋ํ, UI, ์ฌ์ฉ์ฑ์์ ๊ฐ๋ ฅํ๊ฒ ๋ฐ์ ํด๋๊ฐ๋ ๊ฒ ๊ฐ์ต๋๋ค. ๐
์ด ์ธ์๋ ๊ธฐ์กด์ URI ํฌ๋งท์ด ์๋ JSON ํฌ๋งท์ผ๋ก๋ DB ์ฐ๊ฒฐ์ด ์ง์๋๋ฉฐ, "airflow db downgrade"์ "airflow db clean" ๋ฑ์ ์ปค๋งจ๋ ์ ๋ฐ์ดํธ๋ฅผ ํตํด์ DB ๋ง์ด๊ทธ๋ ์ด์ ์ ๋์์ ์ฃผ๋ ์ ๋ฐ์ดํธ๋ ์งํ๋์์ต๋๋ค.
# Json ํฌ๋งท์ ํตํ DB ์ฐ๊ฒฐ ์์
airflow connections add 'my_prod_db' \
--conn-json '{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
์์ธํ ๋ด์ฉ์ ์๋ URL์ ์ฐธ๊ณ ํด์ฃผ์๊ธฐ ๋ฐ๋๋๋ค.
๊ธด ๊ธ ์ฝ์ด์ฃผ์
์ ๊ฐ์ฌํฉ๋๋ค. ๐
์ฐธ๊ณ
https://airflow.apache.org/blog/airflow-2.3.0/
https://www.reddit.com/r/dataengineering/comments/ufl9tx/apache_airflow_230_is_out/
'Engineering ๐ป > MLOps' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| ํ์ง MLOps Engineer์ ์ฟ ๋ฒ๋คํฐ์ค(Kubernetes) ๊ฐ๋จํ ๊ณ ์ฐฐ (0) | 2025.01.11 |
|---|---|
| [MLOps] Windows ํ๊ฒฝ์์ kubeflow ์ค์นํ๊ธฐ (0) | 2023.03.27 |
| [Airflow] Docker๋ฅผ ํ์ฉํ Airflow ๊ตฌ์ถ (1) | 2022.04.18 |
| [Docker] Docker-Compose์์ ๊ฐ์ ์ด๋ฏธ์ง๋ก ์ฌ๋ฌ ์ปจํ ์ด๋ ์์ฑํ๊ธฐ (0) | 2022.03.30 |
| [Docker] ์ปจํ ์ด๋์ ๋์ปค ๊ทธ๋ฆฌ๊ณ ์ฟ ๋ฒ๋คํฐ์ค (0) | 2022.01.18 |