Welcome! ๐Ÿ™‹โ€โ™‚๏ธ View more

Engineering ๐Ÿ’ป/MLOps

[Airflow] Docker๋ฅผ ํ™œ์šฉํ•œ Airflow ๊ตฌ์ถ•

DeepFlame 2022. 4. 18. 20:00

ํ•„์ž๋Š” ๊ฐœ์ธ ํ”„๋กœ์ ํŠธ๋ฅผ ์ง„ํ–‰ํ•˜๋ฉฐ, Airflow๋ฅผ ํ†ตํ•œ ์›Œํฌํ”Œ๋กœ์šฐ ์ž๋™ํ™”๋ฅผ ๊ตฌ์ถ•ํ–ˆ๋‹ค. 

https://github.com/DeepFlame-JR/Stock_and_Forum

 

GitHub - DeepFlame-JR/Stock_and_Forum: ์ฃผ์‹ ๊ฐ€๊ฒฉ๊ณผ ๋„ค์ด๋ฒ„ ์ข…๋ชฉํ† ๋ก ๋ฐฉ์˜ ๋‚ด์šฉ์„ ์‹œ๊ฐ„๋ณ„๋กœ ์ˆ˜์ง‘

์ฃผ์‹ ๊ฐ€๊ฒฉ๊ณผ ๋„ค์ด๋ฒ„ ์ข…๋ชฉํ† ๋ก ๋ฐฉ์˜ ๋‚ด์šฉ์„ ์‹œ๊ฐ„๋ณ„๋กœ ์ˆ˜์ง‘. Contribute to DeepFlame-JR/Stock_and_Forum development by creating an account on GitHub.

github.com

 

์–ด๋А ๋‚ , ๋ชจ๋“  ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•œ Task๊ฐ€ ๊ณ„์† Running ์ƒํƒœ๋กœ ๋‚จ์•„์žˆ์—ˆ๋‹ค.
๊ตฌ๊ธ€๋ง์„ ํ•ด๋ณด๋‹ˆ, ํ•œ ์ž‘์—…์—์„œ ๋งŽ์€ ๋ฆฌ์†Œ์Šค๋ฅผ ์‚ฌ์šฉํ•˜๊ฒŒ ๋˜๋ฉด ์œ„์™€ ๊ฐ™์ด ๋˜๋Š” ๋ฒ„๊ทธ๊ฐ€ ์žˆ๋Š” ๊ฒƒ์„ ํ™•์ธํ–ˆ๋‹ค.

ํ•œ Task์—์„œ ๊ฝค ๋งŽ์€ ์ž‘์—…์„ ํ–ˆ๊ธฐ ๋•Œ๋ฌธ์—..... ๊ฐœ์„ ์ด ํ•„์š”ํ•˜๋‹ค๊ณ  ์ƒ๊ฐ๋˜์—ˆ๋‹ค.... ๐Ÿ˜…

 

๊ทธ๋ž˜์„œ ํ•œ ๊ฐ€์ง€ ์ž‘์—…์„ ๋‚˜๋ˆ„๋Š” ์ž‘์—…์„ ๋จผ์ € ์ง„ํ–‰ํ–ˆ๋‹ค.

1. Task ์ž‘์—… ๋ถ„์‚ฐ


Airflow์—์„œ๋Š” ๋‹ค์–‘ํ•œ Operator๊ฐ€ ์žˆ๋‹ค. ์ด๊ฒƒ์„ ํ†ตํ•ด ๋‹จ์ผ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.
๊ทธ ์ข…๋ฅ˜๊ฐ€ ๋‹ค์–‘ํ•œ๋ฐ, ์•„๋ž˜ ๋งํฌ์—์„œ ํ™•์ธ๊ฐ€๋Šฅํ•˜๋‹ค.

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html

 

airflow.operators — Airflow Documentation

 

airflow.apache.org

 

์ด์ค‘ ๊ฐ€์žฅ ๋Œ€ํ‘œ์ ์ธ BashOperator์™€ PythonOperator๋ฅผ ์‚ดํŽด๋ณด์ž.

  1. BashOperator: Bash Shell ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.
  2. PythonOperator: ์ •์˜๋œ Python ํ•จ์ˆ˜๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.

 

๊ธฐ์กด์—๋Š” BashOperator๋ฅผ ํ†ตํ•ด์„œ ์ž‘์—…์„ ์‹คํ–‰ํ–ˆ๋‹ค.

forum = BashOperator(task_id='get_forum',
                     bash_command='python3 %s/data/forum.py' % src_folder,
                     dag=dag)

 

ํ•˜์ง€๋งŒ ๊ฐ™์€ ์ž‘์—…์„ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ž‘์—…์œผ๋กœ ๋‚˜๋ˆ„๋Š” ๊ณผ์ •์ด ํ•„์š”ํ–ˆ๊ณ , ์ด๋ฅผ ์œ„ํ•ด ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ์ „๋‹ฌํ•˜๋Š” PythonOperator๋กœ ์ˆ˜์ •ํ–ˆ๋‹ค.

op_kwags๋ฅผ ํ†ตํ•ด์„œ ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ์ „๋‹ฌํ•  ์ˆ˜ ์žˆ๋‹ค. (start, end, port)

def execute_forum(start, end, port, **kwargs):
    from data import forum
    forum.main_get_forum(start, end, port)

forum_tasks = {}
for i, f in enumerate(["f1", "f2", "f3", "f4", "f5"]):
    task = PythonOperator(
        task_id=f"forum_{i+1}",
        python_callable=execute_forum,
        op_kwargs={"start":i*11, "end":min(50,(i+1)*11), "port":4444+i},
        dag=dag,
    )
    forum_tasks[f] = task

forum_tasks["f1"] >> forum_tasks["f2"] >> forum_tasks["f3"] >> forum_tasks["f4"] >> forum_tasks["f5"]

 

์™„์„ฑ๋œ DAG๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค. ์ด๋ ‡๊ฒŒ task๋ฅผ ์—ฌ๋Ÿฌ ๊ฐœ๋กœ ๋‚˜๋ˆ„๋Š” ์ž‘์—…์„ ์™„๋ฃŒํ–ˆ๋‹ค.

 

๐Ÿค” ๊ทธ๋Ÿฐ๋ฐ... ํ•ด๋‹น ์ž‘์—…์€ ๋น„๋™๊ธฐ๋กœ ์ฒ˜๋ฆฌํ•ด๋„ ๋˜๋Š” ์ž‘์—…์ด๊ธฐ ๋•Œ๋ฌธ์— ์ด๋ ‡๊ฒŒ ์ž‘์—…์„ ๋ฐฐ์น˜ํ•˜๋ฉด ์ƒ๋‹นํžˆ ๋น„ํšจ์œจ์ ์ด๋‹ค. (์ด 1์‹œ๊ฐ„ ์ •๋„์˜ ์‹œ๊ฐ„์ด ์†Œ์š”๋˜์—ˆ๋‹ค.)

๋”ฐ๋ผ์„œ ์ž‘์—…์„ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌํ•ด๋ณด๊ธฐ๋กœ ํ–ˆ๋‹ค!

 

 

2. Docker๋ฅผ ํ†ตํ•œ Celery Executor ๊ตฌ์ถ•


Airflow๋Š” ๋‹ค์–‘ํ•œ Executor๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

  1. Sequential Executor
    Airflow์—์„œ ์ œ๊ณต๋˜๋Š” ๊ธฐ๋ณธ Executor๋กœ ํ•œ๋ฒˆ์— ํ•˜๋‚˜์˜ ์ž‘์—…๋งŒ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

  2. Local Executor
    Sequential Executor์™€ ๋‹ฌ๋ฆฌ task๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

  3. Celery Executor
    Local Executor์™€ ๊ฐ™์ด task๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค. Celery๋Š” ์ถ”๊ฐ€์ ์œผ๋กœ Redis์™€ ๊ฐ™์€ Message Queue๋ฅผ ํ•„์š”๋กœ ํ•˜๋Š”๋ฐ, ์ด๋Š” Executor๊ฐ€ ํด๋Ÿฌ์Šคํ„ฐ ํ˜•์‹์œผ๋กœ ๊ตฌ์„ฑ๋˜๊ณ , Message Queue์— ์žˆ๋Š” task๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๊ตฌ์กฐ๋กœ ๋™์ž‘ํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. ๊ทธ๋ฆฌ๊ณ  ํด๋Ÿฌ์Šคํ„ฐ๋กœ ๊ตฌ์„ฑ๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์— Executor์— ๋Œ€ํ•œ High Availability ๊ตฌ์„ฑ๊ณผ Scale out์ด ์ž์—ฐ์Šค๋Ÿฝ๊ฒŒ ๊ฐ€๋Šฅํ•˜๋‹ค.
    ๋”ฐ๋ผ์„œ ์‹ค์ œ ์šด์˜ํ™˜๊ฒฝ์— ์ ํ•ฉํ•˜๋‹ค๊ณ  ํŒ๋‹จ๋œ๋‹ค.

 

ํ•„์ž์˜ ๊ฒฝ์šฐ, ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ์™€ worker ์ˆ˜ ์ฆ๊ฐ€๋ฅผ ํ†ตํ•œ ์‹œ๊ฐ„ ๋‹จ์ถ•์„ ์œ„ํ•ด Celery Executor๋ฅผ ๊ตฌ์ถ•ํ–ˆ๋‹ค.

๋˜ํ•œ Airflow ๊ตฌ์ถ•์„ ์œ„ํ•ด Airflow์—์„œ ์ œ๊ณต๋˜๋Š” docker-compose ํŒŒ์ผ์„ ํ™œ์šฉํ•˜์—ฌ ํ™˜๊ฒฝ์„ ๊ตฌ์ถ•ํ–ˆ๋‹ค.
docker์™€ docker-compose ํŒŒ์ผ๋ฅผ ํ™œ์šฉํ•˜๋ฉด ์•„๋ž˜์™€ ๊ฐ™์€ ์žฅ์ ์ด ์žˆ๋‹ค.

  1. ์‹คํ–‰ํ™˜๊ฒฝ ๊ตฌ์ถ•์ด ๊ฐ„๋‹จํ•˜๋‹ค.
  2. Host์™€ ๋…๋ฆฝ๋œ ํ™˜๊ฒฝ์— ์„ค์น˜๋จ์œผ๋กœ ์ถฉ๋Œ์„ ๋ฐฉ์ง€ํ•  ์ˆ˜ ์žˆ๊ณ , ์‰ฝ๊ฒŒ ์ฒ˜์Œ๋ถ€ํ„ฐ ๋‹ค์‹œ ํ™˜๊ฒฝ์„ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ๋‹ค.
  3. Celery Executor ๊ตฌ์ถ•์„ ์œ„ํ•ด webserver, scheduler, worker ๋“ฑ ๋งŽ์€ ์ปจํ…Œ์ด๋„ˆ ๊ตฌ์ถ•์ด ํ•„์š”ํ•˜๋‹ค. ์ด๋Ÿฌํ•œ ๊ณผ์ •์„ docker-compose.yaml์— ์ •์˜ํ•˜์—ฌ ํ•œ ๋ฒˆ์— ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๋‹ค. ๋˜ํ•œ ์ปจํ…Œ์ด๋„ˆ๋“ค์˜ ์„ค์ •๊ฐ’๊ณผ ์˜์กด์„ฑ์„ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

Airflow์—์„œ ๊ณต์‹์ ์œผ๋กœ ๋‚˜์™€์žˆ๋Š” ๋ฌธ์„œ์—์„œ docker-compose.yaml ํŒŒ์ผ๊ณผ ๋ช…๋ น์–ด๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html

 

Running Airflow in Docker — Airflow Documentation

 

airflow.apache.org

 

๐Ÿค” ๊ทธ๋Ÿฐ๋ฐ.... ํ•„์ž์˜ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” pip๋กœ ์„ค์น˜ํ•ด์•ผํ•˜๋Š” ํŒจํ‚ค์ง€๋„ ๋งŽ๊ณ ,,,, Spark ์‹คํ–‰์„ ์œ„ํ•ด JAVA ํ™˜๊ฒฝ ์„ค์ •๋„ ํ•„์š”ํ•˜๋‹ค.... ๊ทธ๋ž˜์„œ ๊ธฐ๋ณธ docker-compose.yaml ๋‚ด์šฉ์—์„œ ํŠœ๋‹์ด ํ•„์š”๊ฐ€ ์žˆ์—ˆ๋‹ค. 

ํ•„์ž๊ฐ€ ํ™œ์šฉํ–ˆ๋˜ Docker ๊ตฌ์ถ• ํ™˜๊ฒฝ์„ ๊ณต์œ ํ•˜๊ณ ์ž ํ•œ๋‹ค.

 

1. docker-compose.yaml

 

2. Dockerfile

์œ„์—์„œ build ์œ„์น˜์— ์ƒ์„ฑํ•œ๋‹ค. ์ปจํ…Œ์ด๋„ˆ ๊ตฌ์ถ• ์‹œ, ์‹คํ–‰ํ•  ์ฝ”๋“œ๋ฅผ ์ •์˜ํ•œ๋‹ค.

 

requirements.txt๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด ์ž‘์„ฑํ•˜์—ฌ docker-compose.yaml๊ณผ ๊ฐ™์€ ํด๋”์— ์œ„์น˜์‹œํ‚ค๋ฉด ๋œ๋‹ค.

pandas==1.3.5
selenium
selenium-requests
bs4
webdriver-manager
pymongo
psycopg2
pyspark
pyhive
thrift
thrift-sasl
sasl
konlpy

 

3. Docker ๋นŒ๋“œ

๊ทธ ํ›„, Docker ํ™˜๊ฒฝ์„ ๊ตฌ์ถ•ํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด ์•„๋ž˜ ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•œ๋‹ค.

docker-compose up -d --scale airflow-worker=n --build
// -d: docker ๊ตฌ์ถ•์ด ๋ฐฑ๊ทธ๋ผ์šด๋“œ๋กœ ์‹คํ–‰๋œ๋‹ค.
// --scale: ํŠน์ • image๋ฅผ n๊ฐœ ์ƒ์„ฑํ•œ๋‹ค.
// --build: ์บ์‹ฑ๋œ ์ด๋ฏธ์ง€๋ฅผ ์ฒดํฌํ•˜์ง€ ์•Š๊ณ , ๋ฌด์กฐ๊ฑด ๋นŒ๋“œ๋ฅผ ํ•˜๊ณ  ์‹œ์ž‘ํ•œ๋‹ค.

 

์ตœ์ข…์ ์œผ๋กœ ์ƒ์„ฑ๋œ DAG๋Š” ์•„๋ž˜์™€ ๊ฐ™์œผ๋ฉฐ, ๊ธฐ์กด์— 1์‹œ๊ฐ„ ๊ฑธ๋ฆฌ๋˜ ์ž‘์—…์ด 30๋ถ„์œผ๋กœ ๋‹จ์ถ•๋˜์—ˆ๋‹ค. ๐Ÿ’ช

 

์ฐธ๊ณ 
https://umbum.dev/779
https://dydwnsekd.tistory.com/98
โ€‹