Airflow가 2022년 4월 30일 버전 2.3.0을 릴리즈했습니다.
2.0.0 버전 업데이트 이후 가장 큰 변화라고 하는데, 주요 변화 사항을 간단하게 알아보도록 하겠습니다! 🔥
1. Tree뷰의 변화
가장 크게 눈에 띄는 변화입니다.
기존에는 수행별로 Task의 상태만 확인할 수 있었고, 중간에 구별선이 없어서 해당 상태(네모)가 어느 Task의 상태인지 구별하기가 어려웠습니다.
하지만 이것이 Grid View로 변경되면서 어느 Task가 어떤 상태인지 명확하게 확인할 수 있게 되었습니다.
또한 수행 시간이 제공됨으로 실행이 너무 오래 걸리거나, 너무 빨리 끝나는 상황에 대한 이상현상을 좀 더 빠르게 확인할 수 있게 되었습니다!
추가적으로 저 네모를 클릭하여 확인할 수 있는 상세 화면이 기존에는 창으로 나타나서 뒷 내용을 가렸지만,
현재는 오른쪽에 바로 나타나서 함께 확인할 수 있게 되었습니다! 🤗
2. Dynamic Task Mapping
기존에 DAG 작성자가 필요한 작업 수를 미리 알아야하는 대신 현재 데이터를 기반으로 여러 작업을 생성할 수 있습니다.
간단한 예시를 살펴보겠습니다. 😲
만약 List의 요소에 1씩 더해야하는 add_one Task가 있다고 가정하겠습니다..
그렇다면 이 Task 몇 개 생성해야할까요? 정답은 List 요소의 개수일 것입니다.
그런데 만약 이 List의 크기가 수행될 때마다 가변적이라면??
이러한 문제에 expand 함수를 활용할 수 있습니다.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="simple_mapping", start_date=datetime(2022, 3, 4)) as dag:
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values):
total = sum(values)
print(f"Total was {total}")
added_values = add_one.expand(x=[1, 2, 3]) # List의 길이가 3이다. 따라서 add_one Task가 3개 생성된다.
sum_it(added_values)
아래를 보시다시피, Grid View와 Graph View에서는 하나의 Task로 나타납니다.
실행된 상세 내용은 Mapped Instances에서 확인할 수 있습니다.
Dynamic Task Mapping는 Tree 뷰 변경과 더불어 2.3.0 릴리즈의 메인이 되는 변경사항입니다.
위의 예제는 아주 간단한 상황임으로 좀 더 상세 내용을 원하시는 분은 아래 URL을 참고해주세요.
https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#
3. LocalKubernetesExecutor
스케줄러에서 LocalExecutor와 KubernetesExecutor를 혼용하여 사용할 수 있습니다.
각 Executor를 설명은 아래와 같습니다.
- LocalExecutor
단일 장비에 웹 서버와 스케줄러를 같이 가동합니다. - KubernetesExecutor
Task를 스케줄러가 찾고, Executor가 동저그로 워커를 POD 형태로 실행합니다. 그리고 해당 워커에서 태스크를 수행합니다.
Airflow는 Task를 수행할 주체로 Executor를 하나 선택하여 사용할 수 있는데, 이를 혼용해서 사용함으로써 가벼운 작업은 LocalExecutor로 그렇지 않은 작업은 KubernetesExecutor로 수행하여 작업시간을 최적화할 수 있습니다.
※ KubernetesExecutor는 LocalExecutor에 없는 대기 시간 레이어가 추가되어 있어 가벼운 작업에는 LocalExecutor로 수행하는 것이 장점을 가지고 있습니다.
4. Task 재활용
override 함수를 통해서 Task를 재활용할 수 있습니다.
또한 재활용할 시에 매개변수도 함께 전달할 수 있습니다.
@task
def add_task(x, y):
print(f"Task args: x={x}, y={y}")
return x + y
@dag(start_date=datetime(2022, 1, 1))
def mydag():
start = add_task.override(task_id="start")(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"add_start_{i}")(start, i)
위 코드를 확인하면 add_task(x,y)를 override 함수를 통해서 손쉽게 재활용하고, 매개변수도 전달하는 것을 확인할 수 있습니다.
대부분의 엔지니어들이 워크플로우 툴로 Airflow를 활용하고, 커뮤니티가 활성화됨으로써 워크플로우 자동화, UI, 사용성에서 강력하게 발전해나가는 것 같습니다. 👀
이 외에도 기존에 URI 포맷이 아닌 JSON 포맷으로도 DB 연결이 지원되며, "airflow db downgrade"와 "airflow db clean" 등의 커맨드 업데이트를 통해서 DB 마이그레이션에 도움을 주는 업데이트도 진행되었습니다.
# Json 포맷을 통한 DB 연결 예시
airflow connections add 'my_prod_db' \
--conn-json '{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
자세한 내용은 아래 URL을 참고해주시기 바랍니다.
긴 글 읽어주셔서 감사합니다. 😀
참고
https://airflow.apache.org/blog/airflow-2.3.0/
https://www.reddit.com/r/dataengineering/comments/ufl9tx/apache_airflow_230_is_out/
'Engineering 💻 > Cloud' 카테고리의 다른 글
[Airflow] Docker를 활용한 Airflow 구축 (1) | 2022.04.18 |
---|---|
[Docker] Docker-Compose에서 같은 이미지로 여러 컨테이너 생성하기 (0) | 2022.03.30 |
[Docker] 컨테이너와 도커 그리고 쿠버네티스 (0) | 2022.01.18 |
터미널 종료 후에도 프로세스 실행 (feat. Selenium 작동시 유의사항) (0) | 2022.01.05 |
EC2에 Selenium 환경 구축하기 (feat. Chrome, Chrome Driver) (0) | 2022.01.05 |