Welcome! 🙋‍♂️ View more

Engineering 💻/Cloud

[Airflow] Docker를 활용한 Airflow 구축

DeepFlame 2022. 4. 18. 20:00

필자는 개인 프로젝트를 진행하며, Airflow를 통한 워크플로우 자동화를 구축했다. 

https://github.com/DeepFlame-JR/Stock_and_Forum

 

GitHub - DeepFlame-JR/Stock_and_Forum: 주식 가격과 네이버 종목토론방의 내용을 시간별로 수집

주식 가격과 네이버 종목토론방의 내용을 시간별로 수집. Contribute to DeepFlame-JR/Stock_and_Forum development by creating an account on GitHub.

github.com

 

어느 날, 모든 작업을 수행한 Task가 계속 Running 상태로 남아있었다.
구글링을 해보니, 한 작업에서 많은 리소스를 사용하게 되면 위와 같이 되는 버그가 있는 것을 확인했다.

한 Task에서 꽤 많은 작업을 했기 때문에..... 개선이 필요하다고 생각되었다.... 😅

 

그래서 한 가지 작업을 나누는 작업을 먼저 진행했다.

1. Task 작업 분산


Airflow에서는 다양한 Operator가 있다. 이것을 통해 단일 작업을 수행할 수 있다.
그 종류가 다양한데, 아래 링크에서 확인가능하다.

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html

 

airflow.operators — Airflow Documentation

 

airflow.apache.org

 

이중 가장 대표적인 BashOperator와 PythonOperator를 살펴보자.

  1. BashOperator: Bash Shell 명령어를 실행한다.
  2. PythonOperator: 정의된 Python 함수를 실행한다.

 

기존에는 BashOperator를 통해서 작업을 실행했다.

forum = BashOperator(task_id='get_forum',
                     bash_command='python3 %s/data/forum.py' % src_folder,
                     dag=dag)

 

하지만 같은 작업을 여러 개의 작업으로 나누는 과정이 필요했고, 이를 위해 매개변수를 전달하는 PythonOperator로 수정했다.

op_kwags를 통해서 매개변수를 전달할 수 있다. (start, end, port)

def execute_forum(start, end, port, **kwargs):
    from data import forum
    forum.main_get_forum(start, end, port)

forum_tasks = {}
for i, f in enumerate(["f1", "f2", "f3", "f4", "f5"]):
    task = PythonOperator(
        task_id=f"forum_{i+1}",
        python_callable=execute_forum,
        op_kwargs={"start":i*11, "end":min(50,(i+1)*11), "port":4444+i},
        dag=dag,
    )
    forum_tasks[f] = task

forum_tasks["f1"] >> forum_tasks["f2"] >> forum_tasks["f3"] >> forum_tasks["f4"] >> forum_tasks["f5"]

 

완성된 DAG는 아래와 같다. 이렇게 task를 여러 개로 나누는 작업을 완료했다.

 

🤔 그런데... 해당 작업은 비동기로 처리해도 되는 작업이기 때문에 이렇게 작업을 배치하면 상당히 비효율적이다. (총 1시간 정도의 시간이 소요되었다.)

따라서 작업을 병렬 처리해보기로 했다!

 

 

2. Docker를 통한 Celery Executor 구축


Airflow는 다양한 Executor를 제공한다.

  1. Sequential Executor
    Airflow에서 제공되는 기본 Executor로 한번에 하나의 작업만 실행할 수 있다.

  2. Local Executor
    Sequential Executor와 달리 task를 병렬로 실행하는 것이 가능하다.

  3. Celery Executor
    Local Executor와 같이 task를 병렬로 실행할 수 있다. Celery는 추가적으로 Redis와 같은 Message Queue를 필요로 하는데, 이는 Executor가 클러스터 형식으로 구성되고, Message Queue에 있는 task를 실행하는 구조로 동작하기 때문이다. 그리고 클러스터로 구성되었기 때문에 Executor에 대한 High Availability 구성과 Scale out이 자연스럽게 가능하다.
    따라서 실제 운영환경에 적합하다고 판단된다.

 

필자의 경우, 병렬 처리worker 수 증가를 통한 시간 단축을 위해 Celery Executor를 구축했다.

또한 Airflow 구축을 위해 Airflow에서 제공되는 docker-compose 파일을 활용하여 환경을 구축했다.
dockerdocker-compose 파일를 활용하면 아래와 같은 장점이 있다.

  1. 실행환경 구축이 간단하다.
  2. Host와 독립된 환경에 설치됨으로 충돌을 방지할 수 있고, 쉽게 처음부터 다시 환경을 구축할 수 있다.
  3. Celery Executor 구축을 위해 webserver, scheduler, worker 등 많은 컨테이너 구축이 필요하다. 이러한 과정을 docker-compose.yaml에 정의하여 한 번에 해결할 수 있다. 또한 컨테이너들의 설정값과 의존성을 정의할 수 있다.

 

Airflow에서 공식적으로 나와있는 문서에서 docker-compose.yaml 파일과 명령어를 확인할 수 있다.

https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html

 

Running Airflow in Docker — Airflow Documentation

 

airflow.apache.org

 

🤔 그런데.... 필자의 코드를 실행하기 위해서는 pip로 설치해야하는 패키지도 많고,,,, Spark 실행을 위해 JAVA 환경 설정도 필요하다.... 그래서 기본 docker-compose.yaml 내용에서 튜닝이 필요가 있었다. 

필자가 활용했던 Docker 구축 환경을 공유하고자 한다.

 

1. docker-compose.yaml

 

2. Dockerfile

위에서 build 위치에 생성한다. 컨테이너 구축 시, 실행할 코드를 정의한다.

 

requirements.txt는 아래와 같이 작성하여 docker-compose.yaml과 같은 폴더에 위치시키면 된다.

pandas==1.3.5
selenium
selenium-requests
bs4
webdriver-manager
pymongo
psycopg2
pyspark
pyhive
thrift
thrift-sasl
sasl
konlpy

 

3. Docker 빌드

그 후, Docker 환경을 구축하고 싶다면 아래 명령어를 입력한다.

docker-compose up -d --scale airflow-worker=n --build
// -d: docker 구축이 백그라운드로 실행된다.
// --scale: 특정 image를 n개 생성한다.
// --build: 캐싱된 이미지를 체크하지 않고, 무조건 빌드를 하고 시작한다.

 

최종적으로 생성된 DAG는 아래와 같으며, 기존에 1시간 걸리던 작업이 30분으로 단축되었다. 💪

 

참고
https://umbum.dev/779
https://dydwnsekd.tistory.com/98