โ . ์ฉ์ด ์ค๋ฉ
1. DAG๋?
Directed Acyclic Graph(๋ฐฉํฅ์ด ์๋ ๋น์ํ ๊ทธ๋ํ)๋ก Airflow์ ์์ ์ ๋จ์๋ฅผ ๋งํ๋ค.
DAG๋ ์ฌ๋ฌ task๋ก ๊ตฌ์ฑ๋์ด ์๋๋ฐ, task์ ์คํ๋ฐฉ๋ฒ, ์คํ์์, ์ฌ์๋ ํ์, ์๊ฐ ์ด๊ณผ ๋ฑ์ ์ ๋ณด๋ฅผ ๋ด๊ณ ์๋ค.
2. Task๋?
Workflow์ ์ต์ ์คํ ๋จ์์ด๋ค. ์์น์ ์ผ๋ก๋ ํ๋์ Operator์ ๊ฐ์ง๋ค(๋จ, ํน๋ณํ ๊ฒฝ์ฐ์ ์ฌ๋ฌ Operator๋ฅผ ์ฌ์ฉํ๊ณ ์ถ๋ค๋ฉด, PythonOperator๋ BranchPythonOperator๋ฅผ ์ฌ์ฉํ์ฌ Python ํจ์๋ฅผ ์ ์ํ๊ณ , ํด๋น ํจ์ ๋ด์์ ์ฌ๋ฌ ์์ ์ ์ํํ๋๋ก ๊ตฌํํด์ผ ํ๋ค.)
3. Operator๋?
Operator๋ Airflow์์ ์ด๋ค ์์ ์ ์ํํ ์ง๋ฅผ ์ ์ํ๋ ํด๋์ค์ด๋ค. Task์ ํ ํ๋ฆฟ์ด๋ผ๊ณ ์๊ฐํ๋ฉด ๋๋ค.
์คํผ๋ ์ดํฐ์ ์ข ๋ฅ๋ ๋ค์๊ณผ ๊ฐ๋ค.
Operators — Airflow Documentation
airflow.apache.org
์ข ๋ฅ
BashOperator:
๋ช ๋ น ์ค ๋ช ๋ น์ด๋ ์คํฌ๋ฆฝํธ๋ฅผ ์คํ
PythonOperator: + @test์ ํจ๊ป ์ฌ์ฉ!
Python ํจ์๋ฅผ ์คํํ๋ Operator๋ก, ์ฌ์ฉ์ ์ ์ Python ์ฝ๋๋ฅผ ์คํ
DummyOperator:
์๋ฌด ์์ ๋ ์ํํ์ง ์๋ Operator๋ก, ์ฃผ๋ก DAG ๋ด์์ ์กฐ๊ฑด๋ถ ๋ ผ๋ฆฌ๋ฅผ ๊ตฌ์ฑํ ๋ ์ฌ์ฉ
HttpSensor:
HTTP ์๋ํฌ์ธํธ์ ์ํ๋ฅผ ํ์ธํ์ฌ ์ผ์ ์ญํ ์ ํ๋ Operator
BranchPythonOperator:
Python ํจ์์ ๋ฐํ ๊ฐ์ ๋ฐ๋ผ ๋ค๋ฅธ ๊ฒฝ๋ก๋ก DAG๋ฅผ ๋ถ๊ธฐ
SubDagOperator:
SubDAG๋ฅผ ํธ์ถํ๋ Operator๋ก, ํ์ DAG๋ฅผ ์คํํ ๋ ์ฌ์ฉ
TriggerDagRunOperator:
๋ค๋ฅธ DAG๋ฅผ ์คํํ๊ธฐ ์ํด ์ฌ์ฉ๋๋ฉฐ, ์ธ๋ถ DAG๋ฅผ ํธ๋ฆฌ๊ฑฐํ ์ ์์
S3FileTransformOperator:
AWS S3 ํ์ผ์ ๋ณํํ๋ Operator๋ก, ์ฃผ๋ก ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ ์ ์ฌ์ฉ
EmailOperator:
์ด๋ฉ์ผ์ ๋ณด๋ด๋ Operator๋ก, ์์ ์๋ฃ ๋๋ ์ค๋ฅ ์๋ฆผ์ ์ฌ์ฉ
SqlSensor:
SQL ์ง์ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ์ฌ ์ผ์ ์ญํ ์ ํ๋ Operator
DockerOperator:
Docker ์ปจํ ์ด๋ ๋ด์์ ๋ช ๋ น์ ์คํํ๋ Operator
KubernetesPodOperator:
Kubernetes ํด๋ฌ์คํฐ ๋ด์์ ํ๋๋ฅผ ์คํํ๋ Operator๋ก, ์ปจํ ์ด๋ ์ค์ผ์คํธ๋ ์ด์ ์ ์ฌ์ฉ
โ ก. DAG๋ฅผ ์ ์ธํ๋ 3๊ฐ์ง ๋ฐฉ๋ฒ
0. DAG ํ๋ผ๋ฏธํฐ ์ข ๋ฅ
ํ์ ๋งค๊ฐ๋ณ์
1) dag_id
DAG์ ์ด๋ฆ์ผ๋ก, ๊ณ ์ ํ๊ฒ ์๋ณ ๊ฐ๋ฅํ ๋ฌธ์์ด์ด์ด์ผ ํ๋ค.
2) start_date
DAG์ ์์ ๋ ์ง ๋ฐ ์๊ฐ์ ๋ํ๋ธ๋ค.
3) schedule
์ผ๋ง๋ ์์ฃผ ์คํ๋ ์ง๋ฅผ ๊ฒฐ์ ํ๋ DAG ์คํ ์ฃผ๊ธฐ๋ก timedelta ๊ฐ์ฒด๋ก ์ ๋ฌํด์ผ ํ๋ค.
ํ์
" Minute[0-59] Hour[0-23] Day[1-31] Month[1-12] Week[1-7 start from monday] "
ํน์
@none, @once, @hourly, @daily, @weekly, @monthly, @yearly
์ ํ์ ๋งค๊ฐ๋ณ์
1) default_args
DAG์ ์ฐ๊ด๋ ๊ณตํต ์ ์ฉ ์ค์ ์ ๊ดํ ์ ๋ณด์ด๋ค. ๋์ ๋๋ฆฌ ํํ๋ก ์ ๋ฌํด์ผ ํ๋ค.
2) catchup
startdate๊ฐ ๊ณผ๊ฑฐ์ด๊ณ , DAGํ์ฑํ๋ฅผ ๊ทธ ์ดํ์ ํ์ ๋, ์ด์ ์คํ๋ค์ ์ก์๋ด๊ธฐ(catch up) ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํ๋ ๋งค๊ฐ๋ณ์์ด๋ค.
์ด์ ์ ๋ฐ๋ฆฐ ๋ ์ง๋ค๋ง๋ค schedule๋๋ก DAG๋ฅผ ์คํํ ๊ฒ์ด๋ค. ๊ธฐ๋ณธ ์ค์ ์ True์ด๋ค.
Full refresh๋ ์๋ฏธ๊ฐ ์๊ณ Incremental update๋ฅผ ์ํํ๋ DAG์๋ง ์๋ฏธ๊ฐ ์๋ค.
3) tags
ํ๊ทธ์ ๋ํ ์ ๋ณด๋ก ํด๋น DAG์ ํ๊ทธ ๋ชฉ๋ก์ ์ ๋ฌํ๋ค.
์ดํ์ ๋ง์ DAG๊ฐ ์์ฑ๋๋ฉด, ๋ณดํต ํ๊ทธ๋ก ๋ฌถ์ด์ ๊ด๋ฆฌํ๋ค(๊ฒ์)
4) end_date
DAG์ ์ข ๋ฃ ๋ ์ง์ ๋ํ ๋งค๊ฐ๋ณ์์ด๋ค.
5) max_active_runs
๋์์ ์คํ๋๋ ์ต๋ DAG ์ธ์คํด์ค ์๋ฅผ ์ ํํฉ๋๋ค. (backfill์ ์ํํ๋ ๊ฒฝ์ฐ ์ฌ๋ฌ ๊ฐ์ DAG๋ฅผ ์คํํด์ผ ํ๋ค. ์ด๋, ๋ณ๋ ฌ๋ก ์ฌ๋ฌ DAG๋ฅผ ์ํํ๋ค.)
6) max_active_tasks
๋์์ ์คํ๋๋ ์ต๋ task ์๋ฅผ ์ ํํฉ๋๋ค. (์ผ๋ ฌ๋ก task๊ฐ ๋ฐฐ์น๋ ๊ฒฝ์ฐ ์๋ฏธ๊ฐ ์์ผ๋ ๋ณ๋ ฌ task๊ฐ ์กด์ฌํ๋ ๊ฒฝ์ฐ ์๋ฏธ๊ฐ ์๋ค.)
5) ๋ฒ, 6) ๋ฒ์ ๊ฒฝ์ฐ ์๋ฌด๋ฆฌ ๋ง์ ์์ DAG, task๋ฅผ ์ง์ ํ์ฌ๋, ์ค์ ๋ก๋ airflow worker ๋ ธ๋์ ์ ์ฉ๋ CPU๊ฐ์ ์ด์์ผ๋ก ํ ๋น๋์ง ์๋๋ค.
7) retries
์คํจ ์ ๋ค์ ์๋ํ๋ ํ์
8) retry_delay
์คํจ ์ ๋ค์ ์๋ํ ๋์ ๋๋ ์ด์ ๋ํ ์๊ฐ(๋ถ ๋จ์)
9) on_failure_callback
task ์คํจ ์ ์ง์ ๋ ํจ์๊ฐ ํธ์ถ
๋ณดํต, slack๊ณผ ์ฐ๋ํ์ฌ ์๋ฌ๋ฅผ ๋ช ์์ ์ผ๋ก ์ ์ ์๊ฒ ํ๋ค.
10) on_success_callback
์ฑ๊ณต ์ ์ง์ ๋ ํจ์๊ฐ ํธ์ถ
1. Context Manager ์ฌ์ฉ
: with ๋ฌธ์ ์ฌ์ฉํ์ฌ ์ฝ๋ ๋ธ๋ก์ ๊ฐ์ธ๋ ํ์ด์ฌ ๊ฐ์ฒด
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id='DAG์ ์ด๋ฆ'
start_date=datetime(๋
๋, ์, ์ผ)
schedule="@daily",
):
์คํผ๋ ์ดํฐ์ด๋ฆ(task_id="TASK์ ID")
2. Standard constructor ์ฌ์ฉ
: ์คํผ๋ ์ดํฐ์ dag๋งค๊ฐ๋ณ์๋ฅผ ์ด์ฉํ๋ ๊ฒ
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag=with DAG(
dag_id='DAG์ ์ด๋ฆ'
start_date=datetime(๋
๋, ์, ์ผ)
schedule="@daily",
)
์คํผ๋ ์ดํฐ์ด๋ฆ(task_id="TASK์ ID", dag=my_dag)
3. @dag ๋ฐ์ฝ๋ ์ดํฐ ์ฌ์ฉ
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(๋
๋, ์, ์ผ), schedule="@daily")
def generate_dag():
์คํผ๋ ์ดํฐ(task_id="TASK์ ID")
generate_dag()
โ ข. ์ค์ต
์ค์ต์ ์๋ฒ์ airflow๋ฅผ ์ง์ ์ค์นํ๋ ๊ฒฝ์ฐ, docker์ ์ด์ฉํ๋ ๊ฒฝ์ฐ ๋ ๊ฐ์ง๋ก ์งํํ์๋ค.
์์ ๊ฒฝ์ฐ๋ AWS EC2 ์๋ฒ๋ฅผ ์ด์ฉํ์๊ณ ๋ค์ ๊ฒฝ์ฐ๋ GCP VM ์์ง์ docker๋ฅผ ์ค์นํ์ฌ ์ฌ์ฉํ๋ค.
AWS, GCP ๋ชจ๋ ๊ฒฝํํ๊ณ ์ถ์๊ธฐ ๋๋ฌธ์ด๋ค.
0. dag ๊ธฐ๋ณธ ์ ์ฅ ์์น ํ์ธ airflow.cfg
aws์ ๊ฒฝ์ฐ
dags_folder = /var/lib/airflow/dags
docker์ ๊ฒฝ์ฐ
dags_folder = /docker/airflow-setup/dags
ํด๋น ์์น์ dag ๋ค์ ์์นํด์ผ Airflow๊ฐ ์๋์ผ๋ก dag๋ก ์ธ์์ ํ๊ฒ ๋๋ค.
1. AWS EC2 ๊ณ์ ์ ์ง์ airflow๋ฅผ ์ค์นํ๋ ๊ฒฝ์ฐ
1) EC2 ๊ณ์ ๋ก๊ทธ์ธ
ssh -i "airflow-dev.pem" ubuntu@~
2) DAG ๋ชฉ๋ก ํ์ธ
airflow dags list
3) DAG์ task๋ชฉ๋ก ํ์ธ
airflow tasks list DAG์ด๋ฆ
4) task ์คํ
airflow task [text | run] DAG์ด๋ฆ task์ด๋ฆ ๋ ์ง(YYYY-MM-DD)
- test : task์ ์คํ์ด ๋ฉํ๋ฐ์ดํฐ DB์ ๊ธฐ๋ก O
- run : task์ ์คํ์ด ๋ฉํ๋ฐ์ดํฐ DB์ ๊ธฐ๋ก O
2. Docker ์ค์ต ์(GCP ์ด์ฉ)
1) Airflow schedular์ ๋ก๊ทธ์ธ
- airflow schedular ์ปจํ ์ด๋ ID ํ์ธ(airflow-setup-airflow-schedular-1)
docker ps
- airflow schedular ์ปจํ ์ด๋์ ์์คํฌ๋ฆฝํธ ๋์ฐ๊ธฐ
docker exec -it ์ปจํ ์ด๋ ID sh
(์ดํ์ ๊ณผ์ ์ EC2์ ๋์ผํ๋ค)
2) DAG ๋ชฉ๋ก ํ์ธ
airflow dags list
3) DAG์ task๋ชฉ๋ก ํ์ธ
airflow tasks list DAG์ด๋ฆ
4) task ์คํ
airflow task [text | run] DAG์ด๋ฆ task์ด๋ฆ ๋ ์ง(YYYY-MM-DD)
- test : task์ ์คํ์ด ๋ฉํ๋ฐ์ดํฐ DB์ ๊ธฐ๋ก O
- run : task์ ์คํ์ด ๋ฉํ๋ฐ์ดํฐ DB์ ๊ธฐ๋ก O