๋ฐ๋ธŒ์ฝ”์Šค_๋ฐ์ดํ„ฐ์—”์ง€๋‹ˆ์–ด๋ง

[Week10 Airflow] TIL 43์ผ์ฐจ Airflow DAG, task์˜ ์ •์˜์™€ ์‚ฌ์šฉ

๐Ÿช„ํ•˜๋ฃจ๐Ÿช„ 2023. 12. 15. 17:42
728x90

โ… . ์šฉ์–ด ์„ค๋ฉ

1. DAG๋ž€?

Directed Acyclic Graph(๋ฐฉํ–ฅ์ด ์žˆ๋Š” ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„)๋กœ Airflow์˜ ์ž‘์—…์˜ ๋‹จ์œ„๋ฅผ ๋งํ•œ๋‹ค.

DAG๋Š” ์—ฌ๋Ÿฌ task๋กœ ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋Š”๋ฐ, task์˜ ์‹คํ–‰๋ฐฉ๋ฒ•, ์‹คํ–‰์ˆœ์„œ, ์žฌ์‹œ๋„ ํšŸ์ˆ˜, ์‹œ๊ฐ„ ์ดˆ๊ณผ ๋“ฑ์˜ ์ •๋ณด๋ฅผ ๋‹ด๊ณ  ์žˆ๋‹ค.

2. Task๋ž€?

Workflow์˜ ์ตœ์†Œ ์‹คํ–‰ ๋‹จ์œ„์ด๋‹ค. ์›์น™์ ์œผ๋กœ๋Š” ํ•˜๋‚˜์˜ Operator์„ ๊ฐ€์ง„๋‹ค(๋‹จ, ํŠน๋ณ„ํ•œ ๊ฒฝ์šฐ์— ์—ฌ๋Ÿฌ Operator๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด, PythonOperator๋‚˜ BranchPythonOperator๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Python ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•˜๊ณ , ํ•ด๋‹น ํ•จ์ˆ˜ ๋‚ด์—์„œ ์—ฌ๋Ÿฌ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋„๋ก ๊ตฌํ˜„ํ•ด์•ผ ํ•œ๋‹ค.)

3. Operator๋ž€?

Operator๋Š” Airflow์—์„œ ์–ด๋–ค ์ž‘์—…์„ ์ˆ˜ํ–‰ํ• ์ง€๋ฅผ ์ •์˜ํ•˜๋Š” ํด๋ž˜์Šค์ด๋‹ค. Task์˜ ํ…œํ”Œ๋ฆฟ์ด๋ผ๊ณ  ์ƒ๊ฐํ•˜๋ฉด ๋œ๋‹ค.

์˜คํผ๋ ˆ์ดํ„ฐ์˜ ์ข…๋ฅ˜๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

Operators — Airflow Documentation

 

airflow.apache.org

 

์ข…๋ฅ˜

BashOperator:

๋ช…๋ น ์ค„ ๋ช…๋ น์–ด๋‚˜ ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‹คํ–‰

PythonOperator: + @test์™€ ํ•จ๊ป˜ ์‚ฌ์šฉ!

Python ํ•จ์ˆ˜๋ฅผ ์‹คํ–‰ํ•˜๋Š” Operator๋กœ, ์‚ฌ์šฉ์ž ์ •์˜ Python ์ฝ”๋“œ๋ฅผ ์‹คํ–‰

DummyOperator:

์•„๋ฌด ์ž‘์—…๋„ ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š๋Š” Operator๋กœ, ์ฃผ๋กœ DAG ๋‚ด์—์„œ ์กฐ๊ฑด๋ถ€ ๋…ผ๋ฆฌ๋ฅผ ๊ตฌ์„ฑํ•  ๋•Œ ์‚ฌ์šฉ

HttpSensor:

HTTP ์—”๋“œํฌ์ธํŠธ์˜ ์ƒํƒœ๋ฅผ ํ™•์ธํ•˜์—ฌ ์„ผ์„œ ์—ญํ• ์„ ํ•˜๋Š” Operator

BranchPythonOperator:

Python ํ•จ์ˆ˜์˜ ๋ฐ˜ํ™˜ ๊ฐ’์— ๋”ฐ๋ผ ๋‹ค๋ฅธ ๊ฒฝ๋กœ๋กœ DAG๋ฅผ ๋ถ„๊ธฐ

SubDagOperator:

SubDAG๋ฅผ ํ˜ธ์ถœํ•˜๋Š” Operator๋กœ, ํ•˜์œ„ DAG๋ฅผ ์‹คํ–‰ํ•  ๋•Œ ์‚ฌ์šฉ

TriggerDagRunOperator:

๋‹ค๋ฅธ DAG๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉ๋˜๋ฉฐ, ์™ธ๋ถ€ DAG๋ฅผ ํŠธ๋ฆฌ๊ฑฐํ•  ์ˆ˜ ์žˆ์Œ

S3FileTransformOperator:

AWS S3 ํŒŒ์ผ์„ ๋ณ€ํ™˜ํ•˜๋Š” Operator๋กœ, ์ฃผ๋กœ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์ž‘์—…์— ์‚ฌ์šฉ

EmailOperator:

์ด๋ฉ”์ผ์„ ๋ณด๋‚ด๋Š” Operator๋กœ, ์ž‘์—… ์™„๋ฃŒ ๋˜๋Š” ์˜ค๋ฅ˜ ์•Œ๋ฆผ์— ์‚ฌ์šฉ

SqlSensor:

SQL ์งˆ์˜ ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•˜์—ฌ ์„ผ์„œ ์—ญํ• ์„ ํ•˜๋Š” Operator

DockerOperator:

Docker ์ปจํ…Œ์ด๋„ˆ ๋‚ด์—์„œ ๋ช…๋ น์„ ์‹คํ–‰ํ•˜๋Š” Operator

KubernetesPodOperator:

Kubernetes ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด์—์„œ ํŒŒ๋“œ๋ฅผ ์‹คํ–‰ํ•˜๋Š” Operator๋กœ, ์ปจํ…Œ์ด๋„ˆ ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ด์…˜์— ์‚ฌ์šฉ

 

 

โ…ก.  DAG๋ฅผ ์„ ์–ธํ•˜๋Š” 3๊ฐ€์ง€ ๋ฐฉ๋ฒ•

0. DAG ํŒŒ๋ผ๋ฏธํ„ฐ ์ข…๋ฅ˜

ํ•„์ˆ˜ ๋งค๊ฐœ๋ณ€์ˆ˜

1) dag_id

DAG์˜ ์ด๋ฆ„์œผ๋กœ, ๊ณ ์œ ํ•˜๊ฒŒ ์‹๋ณ„ ๊ฐ€๋Šฅํ•œ ๋ฌธ์ž์—ด์ด์–ด์•ผ ํ•œ๋‹ค.

2) start_date

DAG์˜ ์‹œ์ž‘ ๋‚ ์งœ ๋ฐ ์‹œ๊ฐ„์„ ๋‚˜ํƒ€๋‚ธ๋‹ค.

3) schedule

์–ผ๋งˆ๋‚˜ ์ž์ฃผ ์‹คํ–‰๋ ์ง€๋ฅผ ๊ฒฐ์ •ํ•˜๋Š” DAG ์‹คํ–‰ ์ฃผ๊ธฐ๋กœ timedelta ๊ฐ์ฒด๋กœ ์ „๋‹ฌํ•ด์•ผ ํ•œ๋‹ค.

ํ˜•์‹

" Minute[0-59] Hour[0-23] Day[1-31] Month[1-12] Week[1-7 start from monday] "

ํ˜น์€

@none, @once, @hourly, @daily, @weekly, @monthly, @yearly

์„ ํƒ์  ๋งค๊ฐœ๋ณ€์ˆ˜

1) default_args

DAG์™€ ์—ฐ๊ด€๋œ ๊ณตํ†ต ์ ์šฉ ์„ค์ •์— ๊ด€ํ•œ ์ •๋ณด์ด๋‹ค. ๋”•์…”๋„ˆ๋ฆฌ ํ˜•ํƒœ๋กœ ์ „๋‹ฌํ•ด์•ผ ํ•œ๋‹ค.

2) catchup

startdate๊ฐ€ ๊ณผ๊ฑฐ์ด๊ณ , DAGํ™œ์„ฑํ™”๋ฅผ ๊ทธ ์ดํ›„์— ํ–ˆ์„ ๋•Œ, ์ด์ „ ์‹คํ–‰๋“ค์„ ์žก์•„๋‚ด๊ธฐ(catch up) ์—ฌ๋ถ€๋ฅผ ๊ฒฐ์ •ํ•˜๋Š” ๋งค๊ฐœ๋ณ€์ˆ˜์ด๋‹ค.

์ด์ „์˜ ๋ฐ€๋ฆฐ ๋‚ ์งœ๋“ค๋งˆ๋‹ค schedule๋Œ€๋กœ DAG๋ฅผ ์‹คํ–‰ํ•  ๊ฒƒ์ด๋‹ค. ๊ธฐ๋ณธ ์„ค์ •์€ True์ด๋‹ค.

Full refresh๋Š” ์˜๋ฏธ๊ฐ€ ์—†๊ณ  Incremental update๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” DAG์—๋งŒ ์˜๋ฏธ๊ฐ€ ์žˆ๋‹ค. 

3) tags

ํƒœ๊ทธ์— ๋Œ€ํ•œ ์ •๋ณด๋กœ ํ•ด๋‹น DAG์˜ ํƒœ๊ทธ ๋ชฉ๋ก์„ ์ „๋‹ฌํ•œ๋‹ค.

์ดํ›„์— ๋งŽ์€ DAG๊ฐ€ ์ƒ์„ฑ๋˜๋ฉด, ๋ณดํ†ต ํƒœ๊ทธ๋กœ ๋ฌถ์–ด์„œ ๊ด€๋ฆฌํ•œ๋‹ค(๊ฒ€์ƒ‰)

4) end_date

DAG์˜ ์ข…๋ฃŒ ๋‚ ์งœ์— ๋Œ€ํ•œ ๋งค๊ฐœ๋ณ€์ˆ˜์ด๋‹ค.

5) max_active_runs

๋™์‹œ์— ์‹คํ–‰๋˜๋Š” ์ตœ๋Œ€ DAG ์ธ์Šคํ„ด์Šค ์ˆ˜๋ฅผ ์ œํ•œํ•ฉ๋‹ˆ๋‹ค. (backfill์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒฝ์šฐ ์—ฌ๋Ÿฌ ๊ฐœ์˜ DAG๋ฅผ ์‹คํ–‰ํ•ด์•ผ ํ•œ๋‹ค. ์ด๋•Œ, ๋ณ‘๋ ฌ๋กœ ์—ฌ๋Ÿฌ DAG๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค.) 

6) max_active_tasks

๋™์‹œ์— ์‹คํ–‰๋˜๋Š” ์ตœ๋Œ€ task ์ˆ˜๋ฅผ ์ œํ•œํ•ฉ๋‹ˆ๋‹ค. (์ผ๋ ฌ๋กœ task๊ฐ€ ๋ฐฐ์น˜๋œ ๊ฒฝ์šฐ ์˜๋ฏธ๊ฐ€ ์—†์œผ๋‚˜ ๋ณ‘๋ ฌ task๊ฐ€ ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ ์˜๋ฏธ๊ฐ€ ์žˆ๋‹ค.)

5) ๋ฒˆ, 6) ๋ฒˆ์˜ ๊ฒฝ์šฐ ์•„๋ฌด๋ฆฌ ๋งŽ์€ ์ˆ˜์˜ DAG, task๋ฅผ ์ง€์ •ํ•˜์—ฌ๋„, ์‹ค์ œ๋กœ๋Š” airflow worker ๋…ธ๋“œ์— ์ ์šฉ๋œ CPU๊ฐœ์ˆ˜ ์ด์ƒ์œผ๋กœ ํ• ๋‹น๋˜์ง„ ์•Š๋Š”๋‹ค. 

7) retries

์‹คํŒจ ์‹œ ๋‹ค์‹œ ์‹œ๋„ํ•˜๋Š” ํšŸ์ˆ˜

8) retry_delay

์‹คํŒจ ์‹œ ๋‹ค์‹œ ์‹œ๋„ํ•  ๋•Œ์˜ ๋”œ๋ ˆ์ด์— ๋Œ€ํ•œ ์‹œ๊ฐ„(๋ถ„ ๋‹จ์œ„)

9) on_failure_callback 

task ์‹คํŒจ ์‹œ ์ง€์ •๋œ ํ•จ์ˆ˜๊ฐ€ ํ˜ธ์ถœ

๋ณดํ†ต, slack๊ณผ ์—ฐ๋™ํ•˜์—ฌ ์—๋Ÿฌ๋ฅผ ๋ช…์‹œ์ ์œผ๋กœ ์•Œ ์ˆ˜ ์žˆ๊ฒŒ ํ•œ๋‹ค.

10) on_success_callback

์„ฑ๊ณต ์‹œ ์ง€์ •๋œ ํ•จ์ˆ˜๊ฐ€ ํ˜ธ์ถœ

 

1. Context Manager ์‚ฌ์šฉ

: with ๋ฌธ์„ ์‚ฌ์šฉํ•˜์—ฌ ์ฝ”๋“œ ๋ธ”๋ก์„ ๊ฐ์‹ธ๋Š” ํŒŒ์ด์ฌ ๊ฐ์ฒด

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 with DAG(
    dag_id='DAG์˜ ์ด๋ฆ„'
    start_date=datetime(๋…„๋„, ์›”, ์ผ)
    schedule="@daily",
 ):
 ์˜คํผ๋ ˆ์ดํ„ฐ์ด๋ฆ„(task_id="TASK์˜ ID")

 

2. Standard constructor ์‚ฌ์šฉ

: ์˜คํผ๋ ˆ์ดํ„ฐ์— dag๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ์ด์šฉํ•˜๋Š” ๊ฒƒ

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 my_dag=with DAG(
    dag_id='DAG์˜ ์ด๋ฆ„'
    start_date=datetime(๋…„๋„, ์›”, ์ผ)
    schedule="@daily",
 )
 ์˜คํผ๋ ˆ์ดํ„ฐ์ด๋ฆ„(task_id="TASK์˜ ID", dag=my_dag)

 

3. @dag ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ ์‚ฌ์šฉ

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator


@dag(start_date=datetime.datetime(๋…„๋„, ์›”, ์ผ), schedule="@daily")
def generate_dag():
    ์˜คํผ๋ ˆ์ดํ„ฐ(task_id="TASK์˜ ID")

generate_dag()

 

โ…ข. ์‹ค์Šต

์‹ค์Šต์€ ์„œ๋ฒ„์— airflow๋ฅผ ์ง์ ‘ ์„ค์น˜ํ•˜๋Š” ๊ฒฝ์šฐ, docker์„ ์ด์šฉํ•˜๋Š” ๊ฒฝ์šฐ ๋‘ ๊ฐ€์ง€๋กœ ์ง„ํ–‰ํ•˜์˜€๋‹ค.

์•ž์˜ ๊ฒฝ์šฐ๋Š” AWS EC2 ์„œ๋ฒ„๋ฅผ ์ด์šฉํ•˜์˜€๊ณ  ๋’ค์˜ ๊ฒฝ์šฐ๋Š” GCP VM ์—”์ง„์— docker๋ฅผ ์„ค์น˜ํ•˜์—ฌ ์‚ฌ์šฉํ•œ๋‹ค.

AWS, GCP ๋ชจ๋‘ ๊ฒฝํ—˜ํ•˜๊ณ  ์‹ถ์—ˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

 

0. dag ๊ธฐ๋ณธ ์ €์žฅ ์œ„์น˜ ํ™•์ธ airflow.cfg

aws์˜ ๊ฒฝ์šฐ

dags_folder = /var/lib/airflow/dags

 

docker์˜ ๊ฒฝ์šฐ

dags_folder = /docker/airflow-setup/dags

 

ํ•ด๋‹น ์œ„์น˜์— dag ๋“ค์„ ์œ„์น˜ํ•ด์•ผ Airflow๊ฐ€ ์ž๋™์œผ๋กœ dag๋กœ ์ธ์‹์„ ํ•˜๊ฒŒ ๋œ๋‹ค.

 

1. AWS EC2 ๊ณ„์ •์— ์ง์ ‘ airflow๋ฅผ ์„ค์น˜ํ•˜๋Š” ๊ฒฝ์šฐ

1) EC2 ๊ณ„์ • ๋กœ๊ทธ์ธ

ssh -i "airflow-dev.pem" ubuntu@~

2) DAG ๋ชฉ๋ก ํ™•์ธ

airflow dags list

3) DAG์˜ task๋ชฉ๋ก ํ™•์ธ

airflow tasks list DAG์ด๋ฆ„

4) task ์‹คํ–‰

airflow task [text | run] DAG์ด๋ฆ„ task์ด๋ฆ„ ๋‚ ์งœ(YYYY-MM-DD)

  • test : task์˜ ์‹คํ–‰์ด ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ DB์— ๊ธฐ๋ก O
  • run : task์˜ ์‹คํ–‰์ด ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ DB์— ๊ธฐ๋ก O

 

2. Docker ์‹ค์Šต ์‹œ(GCP ์ด์šฉ)

1) Airflow schedular์— ๋กœ๊ทธ์ธ

- airflow schedular ์ปจํ…Œ์ด๋„ˆ ID ํ™•์ธ(airflow-setup-airflow-schedular-1)

docker ps

- airflow schedular ์ปจํ…Œ์ด๋„ˆ์— ์‰˜์Šคํฌ๋ฆฝํŠธ ๋„์šฐ๊ธฐ

docker exec -it ์ปจํ…Œ์ด๋„ˆ ID sh

(์ดํ›„์˜ ๊ณผ์ •์€ EC2์™€ ๋™์ผํ•˜๋‹ค)

2) DAG ๋ชฉ๋ก ํ™•์ธ

airflow dags list

3) DAG์˜ task๋ชฉ๋ก ํ™•์ธ

airflow tasks list DAG์ด๋ฆ„

4) task ์‹คํ–‰

airflow task [text | run] DAG์ด๋ฆ„ task์ด๋ฆ„ ๋‚ ์งœ(YYYY-MM-DD)

  • test : task์˜ ์‹คํ–‰์ด ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ DB์— ๊ธฐ๋ก O
  • run : task์˜ ์‹คํ–‰์ด ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ DB์— ๊ธฐ๋ก O
728x90