Welcome! 🙋‍♂️ View more

Engineering 💻/Cloud

[Airflow] 2.3.0 릴리즈 주요 변화 사항을 간단하게 알아보자

DeepFlame 2022. 5. 3. 23:16

Airflow가 2022년 4월 30일 버전 2.3.0을 릴리즈했습니다.

2.0.0 버전 업데이트 이후 가장 큰 변화라고 하는데, 주요 변화 사항을 간단하게 알아보도록 하겠습니다! 🔥

 

1. Tree뷰의 변화


가장 크게 눈에 띄는 변화입니다.

기존에는 수행별로 Task의 상태만 확인할 수 있었고, 중간에 구별선이 없어서 해당 상태(네모)가 어느 Task의 상태인지 구별하기가 어려웠습니다.

Tree View (2.2.4 버전)

 

하지만 이것이 Grid View로 변경되면서 어느 Task가 어떤 상태인지 명확하게 확인할 수 있게 되었습니다. 

또한 수행 시간이 제공됨으로 실행이 너무 오래 걸리거나, 너무 빨리 끝나는 상황에 대한 이상현상을 좀 더 빠르게 확인할 수 있게 되었습니다!

Grid View (2.3.0 버전)

 

추가적으로 저 네모를 클릭하여 확인할 수 있는 상세 화면이 기존에는 창으로 나타나서 뒷 내용을 가렸지만,

현재는 오른쪽에 바로 나타나서 함께 확인할 수 있게 되었습니다! 🤗

 

 

2. Dynamic Task Mapping


기존에 DAG 작성자가 필요한 작업 수를 미리 알아야하는 대신 현재 데이터를 기반으로 여러 작업을 생성할 수 있습니다.

간단한 예시를 살펴보겠습니다. 😲

만약 List의 요소에 1씩 더해야하는 add_one Task가 있다고 가정하겠습니다..

그렇다면 이 Task 몇 개 생성해야할까요? 정답은 List 요소의 개수일 것입니다.

그런데 만약 이 List의 크기가 수행될 때마다 가변적이라면?? 

 

이러한 문제에 expand 함수를 활용할 수 있습니다.

from datetime import datetime

from airflow import DAG
from airflow.decorators import task


with DAG(dag_id="simple_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3]) # List의 길이가 3이다. 따라서 add_one Task가 3개 생성된다.
    sum_it(added_values)

 

아래를 보시다시피, Grid View와 Graph View에서는 하나의 Task로 나타납니다.

실행된 상세 내용은 Mapped Instances에서 확인할 수 있습니다.

 

Dynamic Task Mapping는 Tree 뷰 변경과 더불어 2.3.0 릴리즈의 메인이 되는 변경사항입니다. 

위의 예제는 아주 간단한 상황임으로 좀 더 상세 내용을 원하시는 분은 아래 URL을 참고해주세요.

https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#

 

 

3. LocalKubernetesExecutor


스케줄러에서 LocalExecutor와 KubernetesExecutor를 혼용하여 사용할 수 있습니다. 

각 Executor를 설명은 아래와 같습니다.

  • LocalExecutor
    단일 장비에 웹 서버와 스케줄러를 같이 가동합니다.
  • KubernetesExecutor
    Task를 스케줄러가 찾고, Executor가 동저그로 워커를 POD 형태로 실행합니다. 그리고 해당 워커에서 태스크를 수행합니다.

 

Airflow는 Task를 수행할 주체로 Executor를 하나 선택하여 사용할 수 있는데, 이를 혼용해서 사용함으로써 가벼운 작업은 LocalExecutor로 그렇지 않은 작업은 KubernetesExecutor로 수행하여 작업시간을 최적화할 수 있습니다.

※ KubernetesExecutor는 LocalExecutor에 없는 대기 시간 레이어가 추가되어 있어 가벼운 작업에는 LocalExecutor로 수행하는 것이 장점을 가지고 있습니다.

 

 

4. Task 재활용


override 함수를 통해서 Task를 재활용할 수 있습니다.

또한 재활용할 시에 매개변수도 함께 전달할 수 있습니다.

 

@task
def add_task(x, y):
    print(f"Task args: x={x}, y={y}")
    return x + y


@dag(start_date=datetime(2022, 1, 1))
def mydag():
    start = add_task.override(task_id="start")(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"add_start_{i}")(start, i)

 

위 코드를 확인하면 add_task(x,y)를 override 함수를 통해서 손쉽게 재활용하고, 매개변수도 전달하는 것을 확인할 수 있습니다.

 

 


 

대부분의 엔지니어들이 워크플로우 툴로 Airflow를 활용하고, 커뮤니티가 활성화됨으로써 워크플로우 자동화, UI, 사용성에서 강력하게 발전해나가는 것 같습니다. 👀

이 외에도 기존에 URI 포맷이 아닌 JSON 포맷으로도 DB 연결이 지원되며, "airflow db downgrade"와 "airflow db clean" 등의 커맨드 업데이트를 통해서 DB 마이그레이션에 도움을 주는 업데이트도 진행되었습니다. 

# Json 포맷을 통한 DB 연결 예시
airflow connections add 'my_prod_db' \
    --conn-json '{
        "conn_type": "my-conn-type",
        "login": "my-login",
        "password": "my-password",
        "host": "my-host",
        "port": 1234,
        "schema": "my-schema",
        "extra": {
            "param1": "val1",
            "param2": "val2"
        }
    }'

 

 

자세한 내용은 아래 URL을 참고해주시기 바랍니다.
긴 글 읽어주셔서 감사합니다. 😀

 

참고
https://airflow.apache.org/blog/airflow-2.3.0/
https://www.reddit.com/r/dataengineering/comments/ufl9tx/apache_airflow_230_is_out/