Welcome! ๐Ÿ™‹โ€โ™‚๏ธ View more

Engineering ๐Ÿ’ป/MLOps

[Airflow] 2.3.0 ๋ฆด๋ฆฌ์ฆˆ ์ฃผ์š” ๋ณ€ํ™” ์‚ฌํ•ญ์„ ๊ฐ„๋‹จํ•˜๊ฒŒ ์•Œ์•„๋ณด์ž

DeepFlame 2022. 5. 3. 23:16

Airflow๊ฐ€ 2022๋…„ 4์›” 30์ผ ๋ฒ„์ „ 2.3.0์„ ๋ฆด๋ฆฌ์ฆˆํ–ˆ์Šต๋‹ˆ๋‹ค.

2.0.0 ๋ฒ„์ „ ์—…๋ฐ์ดํŠธ ์ดํ›„ ๊ฐ€์žฅ ํฐ ๋ณ€ํ™”๋ผ๊ณ  ํ•˜๋Š”๋ฐ, ์ฃผ์š” ๋ณ€ํ™” ์‚ฌํ•ญ์„ ๊ฐ„๋‹จํ•˜๊ฒŒ ์•Œ์•„๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค! ๐Ÿ”ฅ

 

1. Tree๋ทฐ์˜ ๋ณ€ํ™”


๊ฐ€์žฅ ํฌ๊ฒŒ ๋ˆˆ์— ๋„๋Š” ๋ณ€ํ™”์ž…๋‹ˆ๋‹ค.

๊ธฐ์กด์—๋Š” ์ˆ˜ํ–‰๋ณ„๋กœ Task์˜ ์ƒํƒœ๋งŒ ํ™•์ธํ•  ์ˆ˜ ์žˆ์—ˆ๊ณ , ์ค‘๊ฐ„์— ๊ตฌ๋ณ„์„ ์ด ์—†์–ด์„œ ํ•ด๋‹น ์ƒํƒœ(๋„ค๋ชจ)๊ฐ€ ์–ด๋А Task์˜ ์ƒํƒœ์ธ์ง€ ๊ตฌ๋ณ„ํ•˜๊ธฐ๊ฐ€ ์–ด๋ ค์› ์Šต๋‹ˆ๋‹ค.

Tree View (2.2.4 ๋ฒ„์ „)

 

ํ•˜์ง€๋งŒ ์ด๊ฒƒ์ด Grid View๋กœ ๋ณ€๊ฒฝ๋˜๋ฉด์„œ ์–ด๋А Task๊ฐ€ ์–ด๋–ค ์ƒํƒœ์ธ์ง€ ๋ช…ํ™•ํ•˜๊ฒŒ ํ™•์ธํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. 

๋˜ํ•œ ์ˆ˜ํ–‰ ์‹œ๊ฐ„์ด ์ œ๊ณต๋จ์œผ๋กœ ์‹คํ–‰์ด ๋„ˆ๋ฌด ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๊ฑฐ๋‚˜, ๋„ˆ๋ฌด ๋นจ๋ฆฌ ๋๋‚˜๋Š” ์ƒํ™ฉ์— ๋Œ€ํ•œ ์ด์ƒํ˜„์ƒ์„ ์ข€ ๋” ๋น ๋ฅด๊ฒŒ ํ™•์ธํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค!

Grid View (2.3.0 ๋ฒ„์ „)

 

์ถ”๊ฐ€์ ์œผ๋กœ ์ € ๋„ค๋ชจ๋ฅผ ํด๋ฆญํ•˜์—ฌ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋Š” ์ƒ์„ธ ํ™”๋ฉด์ด ๊ธฐ์กด์—๋Š” ์ฐฝ์œผ๋กœ ๋‚˜ํƒ€๋‚˜์„œ ๋’ท ๋‚ด์šฉ์„ ๊ฐ€๋ ธ์ง€๋งŒ,

ํ˜„์žฌ๋Š” ์˜ค๋ฅธ์ชฝ์— ๋ฐ”๋กœ ๋‚˜ํƒ€๋‚˜์„œ ํ•จ๊ป˜ ํ™•์ธํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค! ๐Ÿค—

 

 

2. Dynamic Task Mapping


๊ธฐ์กด์— DAG ์ž‘์„ฑ์ž๊ฐ€ ํ•„์š”ํ•œ ์ž‘์—… ์ˆ˜๋ฅผ ๋ฏธ๋ฆฌ ์•Œ์•„์•ผํ•˜๋Š” ๋Œ€์‹  ํ˜„์žฌ ๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ์—ฌ๋Ÿฌ ์ž‘์—…์„ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ฐ„๋‹จํ•œ ์˜ˆ์‹œ๋ฅผ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. ๐Ÿ˜ฒ

๋งŒ์•ฝ List์˜ ์š”์†Œ์— 1์”ฉ ๋”ํ•ด์•ผํ•˜๋Š” add_one Task๊ฐ€ ์žˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค..

๊ทธ๋ ‡๋‹ค๋ฉด ์ด Task ๋ช‡ ๊ฐœ ์ƒ์„ฑํ•ด์•ผํ• ๊นŒ์š”? ์ •๋‹ต์€ List ์š”์†Œ์˜ ๊ฐœ์ˆ˜์ผ ๊ฒƒ์ž…๋‹ˆ๋‹ค.

๊ทธ๋Ÿฐ๋ฐ ๋งŒ์•ฝ ์ด List์˜ ํฌ๊ธฐ๊ฐ€ ์ˆ˜ํ–‰๋  ๋•Œ๋งˆ๋‹ค ๊ฐ€๋ณ€์ ์ด๋ผ๋ฉด?? 

 

์ด๋Ÿฌํ•œ ๋ฌธ์ œ์— expand ํ•จ์ˆ˜๋ฅผ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

from datetime import datetime

from airflow import DAG
from airflow.decorators import task


with DAG(dag_id="simple_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3]) # List์˜ ๊ธธ์ด๊ฐ€ 3์ด๋‹ค. ๋”ฐ๋ผ์„œ add_one Task๊ฐ€ 3๊ฐœ ์ƒ์„ฑ๋œ๋‹ค.
    sum_it(added_values)

 

์•„๋ž˜๋ฅผ ๋ณด์‹œ๋‹ค์‹œํ”ผ, Grid View์™€ Graph View์—์„œ๋Š” ํ•˜๋‚˜์˜ Task๋กœ ๋‚˜ํƒ€๋‚ฉ๋‹ˆ๋‹ค.

์‹คํ–‰๋œ ์ƒ์„ธ ๋‚ด์šฉ์€ Mapped Instances์—์„œ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

Dynamic Task Mapping๋Š” Tree ๋ทฐ ๋ณ€๊ฒฝ๊ณผ ๋”๋ถˆ์–ด 2.3.0 ๋ฆด๋ฆฌ์ฆˆ์˜ ๋ฉ”์ธ์ด ๋˜๋Š” ๋ณ€๊ฒฝ์‚ฌํ•ญ์ž…๋‹ˆ๋‹ค. 

์œ„์˜ ์˜ˆ์ œ๋Š” ์•„์ฃผ ๊ฐ„๋‹จํ•œ ์ƒํ™ฉ์ž„์œผ๋กœ ์ข€ ๋” ์ƒ์„ธ ๋‚ด์šฉ์„ ์›ํ•˜์‹œ๋Š” ๋ถ„์€ ์•„๋ž˜ URL์„ ์ฐธ๊ณ ํ•ด์ฃผ์„ธ์š”.

https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#

 

 

3. LocalKubernetesExecutor


์Šค์ผ€์ค„๋Ÿฌ์—์„œ LocalExecutor์™€ KubernetesExecutor๋ฅผ ํ˜ผ์šฉํ•˜์—ฌ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

๊ฐ Executor๋ฅผ ์„ค๋ช…์€ ์•„๋ž˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  • LocalExecutor
    ๋‹จ์ผ ์žฅ๋น„์— ์›น ์„œ๋ฒ„์™€ ์Šค์ผ€์ค„๋Ÿฌ๋ฅผ ๊ฐ™์ด ๊ฐ€๋™ํ•ฉ๋‹ˆ๋‹ค.
  • KubernetesExecutor
    Task๋ฅผ ์Šค์ผ€์ค„๋Ÿฌ๊ฐ€ ์ฐพ๊ณ , Executor๊ฐ€ ๋™์ €๊ทธ๋กœ ์›Œ์ปค๋ฅผ POD ํ˜•ํƒœ๋กœ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ํ•ด๋‹น ์›Œ์ปค์—์„œ ํƒœ์Šคํฌ๋ฅผ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

 

Airflow๋Š” Task๋ฅผ ์ˆ˜ํ–‰ํ•  ์ฃผ์ฒด๋กœ Executor๋ฅผ ํ•˜๋‚˜ ์„ ํƒํ•˜์—ฌ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ, ์ด๋ฅผ ํ˜ผ์šฉํ•ด์„œ ์‚ฌ์šฉํ•จ์œผ๋กœ์จ ๊ฐ€๋ฒผ์šด ์ž‘์—…์€ LocalExecutor๋กœ ๊ทธ๋ ‡์ง€ ์•Š์€ ์ž‘์—…์€ KubernetesExecutor๋กœ ์ˆ˜ํ–‰ํ•˜์—ฌ ์ž‘์—…์‹œ๊ฐ„์„ ์ตœ์ ํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

โ€ป KubernetesExecutor๋Š” LocalExecutor์— ์—†๋Š” ๋Œ€๊ธฐ ์‹œ๊ฐ„ ๋ ˆ์ด์–ด๊ฐ€ ์ถ”๊ฐ€๋˜์–ด ์žˆ์–ด ๊ฐ€๋ฒผ์šด ์ž‘์—…์—๋Š” LocalExecutor๋กœ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ์ด ์žฅ์ ์„ ๊ฐ€์ง€๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

 

 

4. Task ์žฌํ™œ์šฉ


override ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด์„œ Task๋ฅผ ์žฌํ™œ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋˜ํ•œ ์žฌํ™œ์šฉํ•  ์‹œ์— ๋งค๊ฐœ๋ณ€์ˆ˜๋„ ํ•จ๊ป˜ ์ „๋‹ฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

@task
def add_task(x, y):
    print(f"Task args: x={x}, y={y}")
    return x + y


@dag(start_date=datetime(2022, 1, 1))
def mydag():
    start = add_task.override(task_id="start")(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"add_start_{i}")(start, i)

 

์œ„ ์ฝ”๋“œ๋ฅผ ํ™•์ธํ•˜๋ฉด add_task(x,y)๋ฅผ override ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด์„œ ์†์‰ฝ๊ฒŒ ์žฌํ™œ์šฉํ•˜๊ณ , ๋งค๊ฐœ๋ณ€์ˆ˜๋„ ์ „๋‹ฌํ•˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

 


 

๋Œ€๋ถ€๋ถ„์˜ ์—”์ง€๋‹ˆ์–ด๋“ค์ด ์›Œํฌํ”Œ๋กœ์šฐ ํˆด๋กœ Airflow๋ฅผ ํ™œ์šฉํ•˜๊ณ , ์ปค๋ฎค๋‹ˆํ‹ฐ๊ฐ€ ํ™œ์„ฑํ™”๋จ์œผ๋กœ์จ ์›Œํฌํ”Œ๋กœ์šฐ ์ž๋™ํ™”, UI, ์‚ฌ์šฉ์„ฑ์—์„œ ๊ฐ•๋ ฅํ•˜๊ฒŒ ๋ฐœ์ „ํ•ด๋‚˜๊ฐ€๋Š” ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค. ๐Ÿ‘€

์ด ์™ธ์—๋„ ๊ธฐ์กด์— URI ํฌ๋งท์ด ์•„๋‹Œ JSON ํฌ๋งท์œผ๋กœ๋„ DB ์—ฐ๊ฒฐ์ด ์ง€์›๋˜๋ฉฐ, "airflow db downgrade"์™€ "airflow db clean" ๋“ฑ์˜ ์ปค๋งจ๋“œ ์—…๋ฐ์ดํŠธ๋ฅผ ํ†ตํ•ด์„œ DB ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜์— ๋„์›€์„ ์ฃผ๋Š” ์—…๋ฐ์ดํŠธ๋„ ์ง„ํ–‰๋˜์—ˆ์Šต๋‹ˆ๋‹ค. 

# Json ํฌ๋งท์„ ํ†ตํ•œ DB ์—ฐ๊ฒฐ ์˜ˆ์‹œ
airflow connections add 'my_prod_db' \
    --conn-json '{
        "conn_type": "my-conn-type",
        "login": "my-login",
        "password": "my-password",
        "host": "my-host",
        "port": 1234,
        "schema": "my-schema",
        "extra": {
            "param1": "val1",
            "param2": "val2"
        }
    }'

 

 

์ž์„ธํ•œ ๋‚ด์šฉ์€ ์•„๋ž˜ URL์„ ์ฐธ๊ณ ํ•ด์ฃผ์‹œ๊ธฐ ๋ฐ”๋ž๋‹ˆ๋‹ค.
๊ธด ๊ธ€ ์ฝ์–ด์ฃผ์…”์„œ ๊ฐ์‚ฌํ•ฉ๋‹ˆ๋‹ค. ๐Ÿ˜€

 

์ฐธ๊ณ 
https://airflow.apache.org/blog/airflow-2.3.0/
https://www.reddit.com/r/dataengineering/comments/ufl9tx/apache_airflow_230_is_out/