์ค๋์ ์์ ์์ฑํ๋ ETL์ฝ๋๋ฅผ airflow๋ก ๊ด๋ฆฌํ ์ ์๋๋ก ๋ณ๊ฒฝํด ๋ณด์.
ํด๋น ์ค์ต์์๋ AWS redshift๋ฅผ ์ฌ์ฉํ๋ค.
AWS S3์ csv ํ์ผ์ extract, transform, load ํจ์๋ฅผ ์ด์ฉํด airflow์ ์ ์ฌํ๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๋ง๋ ๋ค.
Full refresh ๋ฐฉ์์ ์ฌ์ฉํ๋ค.
โ . ์ค์ต1 : csv ํ์ผ ์ด์ฉํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ๋ง๋ค๊ธฐ
1. ์ด๊ธฐ ์ฝ๋
AWS S3์ ํ์ผ์ load, transform, extractํด์ AWS Redshift์ ์ ์ฌ
import psycopg2
# 1. Redshift ์ฐ๊ฒฐ
def get_Redshift_connection():
host = "redshift์ฃผ์"
redshift_user = "์ฌ์ฉ์์ด๋ฆ"
redshift_pass = "๋น๋ฐ๋ฒํธ"
port = 5439
dbname = "dev"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
# 2. ETL ํจ์
import requests
def extract(url):
f = requests.get(url)
return (f.text)
def transform(text):
lines = text.strip().split("\n")[1:] #์ฒซ๋ฒ์งธ ๋ผ์ธ(ํค๋)๋ ์ ์ธํ๊ณ ์ฒ๋ฆฌ
records = []
for l in lines:
(name, gender) = l.split(",") #l = "์ด๋ฆ,์ฑ๋ณ" -> [ '์ด๋ฆ', '์ฑ๋ณ' ]
records.append([name, gender])
return records
def load(records):
# BEGIN๊ณผ END๋ฅผ ์ฌ์ฉํด์ SQL ๊ฒฐ๊ณผ๋ฅผ ํธ๋์ญ์
์ผ๋ก ๋ง๋ค์ด์ฃผ๋ ๊ฒ์ด ์ข์
schema="์คํค๋ง์ด๋ฆ"
cur = get_Redshift_connection()
# SQL ํธ๋์ญ์
์ผ๋ก ์ํ
try:
cur.execute("BEGIN")
# DELETE FROM์ ๋จผ์ ์ํ -> FULL REFRESH์ ํ๋ ํํ
cur.execute(f"DELETE FROM {schema}.name_gender;")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;")
except(Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
์คํ
link = "AWS s3์ csv ์ฃผ์"
data = extract(link)
lines = transform(data)
load(lines)
2. ๊ฐ์ 1-์ค์ํ ์ ๋ณด๋ ๋ ธ์ถ๋์ง ์๊ฒ ๋ฐ๋ก ์ ์ฅํ๋ค.
: airflow-admin-connections ์ด์ฉ
airflow-admin-connections์ ์ ์ฅํ ์ ๋ณด๋ ๋ค์๊ณผ ๊ฐ๋ค.
Connection-id : ์์ด๋ ์ง์
Connection-type : Amazon Redshift
Database : schema์ด๋ฆ
Port : 5439
from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
2. ๊ฐ์ 2-๊ฐ๊ฐ์ ํ๋์ task๋ก ๋ง๋ค์ด(์ด 3๊ฐ์ task) DAG์ ์ง์ ํ๋ค.
๋ณดํต ํ๋์ task๋ ํ๋์ operator์ ์ฌ์ฉํ๋ค.
3๊ฐ์ง ํจ์๋ฅผ ํ๋์ task๋ก ๋ง๋ค๋๋ ํจ์์ ๊ฒฐ๊ณผ๋ฅผ ๋ค์ ํจ์์ ์ ๋ ฅ์ผ๋ก ์ง์ ์ง์ ํ ์๊ฐ ์๋ค.
ํ์ง๋ง, ๊ฐ๊ฐ์ ํจ์๋ฅผ ํ๋์ task๋ก ๋ง๋ค๋์๋Xcom๊ธฐ๋ฅ์ ์ฌ์ฉํ์ฌ ํ ํจ์์ ์ถ๋ ฅ์ ๋ค์ ํจ์์ ์ ๋ ฅ์ผ๋ก ์ง์ ํ๋ค.
Xcom์ด๋?
๊ฐ task๋ค์ด ์๋ก ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋ ๋ฐฉ์์ผ๋ก, ๋ณดํต Operator์ ์ถ๋ ฅ๊ฐ์ ๋ค๋ฅธ Operator๊ฐ ์ฝ์ด๊ฐ๋ ํํ์ด๋ค.
Airflow์ ๊ฒฝ์ฐ task์ return๊ฐ์ id์ ํจ๊ป ์ ์ฅํ๊ธฐ ๋๋ฌธ์ ๊ฐ๋ฅํ ๋ฐฉ๋ฒ์ด๋ค.
๋ค๋ง, ์ด ๊ฐ์ ๋ฉํ๋ฐ์ดํฐ DB์ ์ ์ฅ์ด ๋๊ธฐ ๋๋ฌธ์ ํฐ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋๋ฐ ์ฌ์ฉํ๊ธฐ๋ ์ด๋ ต๋ค.(๋ฐ๋ผ์ ๋ณดํต ํฐ ๋ฐ์ดํฐ๋ผ๋ฉด S3์ ๋ก๋ํ ๋ค, ํด๋น ์์น๋ฅผ ์ฃผ๊ณ ๋ฐ๋ ํํ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋๋ค.)
context["task_instance"].xcom_pull(key="return_value", task_ids="ํจ์์ด๋ฆ")
3. ๊ฐ์ 3-operator์ params๋ฅผ ์ด์ฉํ์ฌ ๋งค๊ฐ๋ณ์๋ก ์ ๋ณด๋ฅผ ์ ๋ฌํ์.
airflow-admin-variables ์ด์ฉ (**context)
๋ณดํต open_api_key, csv_url ๋ฑ์ ์ ๋ณด๋ฅผ ์ ์ฅํ๋ค.
from airflow.models import Variable
- get : key๋ฅผ ์ด์ฉํด airflow์์ ๊ฐ์ ๊ฐ์ ธ์ด
- set : key, value๋ฅผ airflow์ ์ ์ฅ
def extract(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
f = requests.get(link)
return (f.text)
def transform(**context):
logging.info("Transform started")
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.strip().split("\n")[1:] # ์ฒซ ๋ฒ์งธ ๋ผ์ธ์ ์ ์ธํ๊ณ ์ฒ๋ฆฌ
records = []
for l in lines:
(name, gender) = l.split(",") # l = "์ด๋ฆ, ์ฑ๋ณ" -> [ '์ด๋ฆ', '์ฑ๋ณ' ]
records.append([name, gender])
logging.info("Transform ended")
return records
def load(**context):
logging.info("load started")
schema = context["params"]["schema"]
table = context["params"]["table"]
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
# BEGIN๊ณผ END๋ฅผ ์ฌ์ฉํด์ SQL ๊ฒฐ๊ณผ๋ฅผ ํธ๋์ญ์
์ผ๋ก ๋ง๋ค์ด์ฃผ๋ ๊ฒ์ด ์ข์
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# DELETE FROM์ ๋จผ์ ์ํ -> FULL REFRESH์ ํ๋ ํํ
cur.execute(f"DELETE FROM {schema}.name_gender;")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
dag = DAG(
dag_id = '์์ด๋',
start_date = datetime(2023,4,6), # ๋ ์ง๊ฐ ๋ฏธ๋์ธ ๊ฒฝ์ฐ ์คํ์ด ์๋จ
schedule = '0 2 * * *', # ์ ๋นํ ์กฐ์
catchup = False,
max_active_runs = 1,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag = dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag = dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': '์คํค๋ง์ด๋ฆ',
'table': 'ํ
์ด๋ธ์ด๋ฆ'
},
dag = dag)
extract >> transform >> load
์์ ๊ฐ์ ์ฌํญ๋ค์ test decorator๋ฅผ ์ฌ์ฉํด ๋ณด์.
ํด๋น ๋ฐ์ฝ๋ ์ดํฐ๋ฅผ ์ฌ์ฉํ๋ฉด, ๊ฐ๋ ์ฑ ์๊ฒ ์ฝ๋๋ฅผ ๋ง๋ค ์ ์๊ณ Xcom์ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ์ง ์์๋ ๋๋ค.
4. ๊ฐ์ 4 - ๊ฐ๋ ์ฑ ์๋ ์ฝ๋๋ฅผ ๋ง๋ค์ด ๋ณด์.
task decorator๋ฅผ ์ฌ์ฉ
def get_Redshift_connection(autocommit=True):
#1. AIRFLOW connection ํ์ฉ
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
#2. Decorator๋ฅผ ์ด์ฉํด ๊ฐ๊ฐ์ ํจ์๋ฅผ task๋ก ์ง์ + ํ๋ผ๋ฏธํฐ ๋์ ๋งค๊ฐ๋ณ์ ์ด์ฉ
@task
def extract(url):
logging.info(datetime.utcnow())
f = requests.get(url)
return f.text
@task
def transform(text):
lines = text.strip().split("\n")[1:] # ์ฒซ ๋ฒ์งธ ๋ผ์ธ์ ์ ์ธํ๊ณ ์ฒ๋ฆฌ
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
# BEGIN๊ณผ END๋ฅผ ์ฌ์ฉํด์ SQL ๊ฒฐ๊ณผ๋ฅผ ํธ๋์ญ์
์ผ๋ก ๋ง๋ค์ด์ฃผ๋ ๊ฒ์ด ์ข์
try:
cur.execute("BEGIN;")
# DELETE FROM์ ๋จผ์ ์ํ -> FULL REFRESH์ ํ๋ ํํ
cur.execute(f"DELETE FROM {schema}.name_gender;")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
with DAG(
dag_id='namegender_v5',
start_date=datetime(2022, 10, 6), # ๋ ์ง๊ฐ ๋ฏธ๋์ธ ๊ฒฝ์ฐ ์คํ์ด ์๋จ
schedule='0 2 * * *', # ์ ๋นํ ์กฐ์
max_active_runs=1,
catchup=False,
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
) as dag:
#3. Airflow variable ์ด์ฉ
url = Variable.get("csv_url")
schema = '์คํค๋ง๋ช
' ## ์์ ์ ์คํค๋ง๋ก ๋ณ๊ฒฝ
table = 'name_gender' ## ํ
์ด๋ธ๋ช
#2. ๋ฐ์ฝ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ์ฌ ์ด๋ ๊ฒ ํจ์ํํ๋ก ์ ๋ฌO
lines = transform(extract(url))
load(schema, table, lines)
5. ์์ ์ฝ๋๊ฐ ์ ์ฅ๋ ๊นํ๋ธ์์ ํ์ผ๋ค์ ๋ณต์ฌํด์ ์ค์ต
(AWS-airflow, GCP-Docker-airflow ๋ ๊ฒฝ์ฐ ๋ชจ๋ ์ค์ต)
1. ๊นํ๋ธ์์ ์ฝ๋ ๋ณต์ฌ(ํด๋ก +๋ชจ๋ ํด๋/ํ์ผ ๋ณต์ฌ)
git clone ๋ ํฌ์งํ ๋ฆฌ ์ฃผ์
cp -r ๋ ํฌ์งํ ๋ฆฌํ์ผ(ํด๋)๊ฒฝ๋ก/* ๋ณต์ฌํ ํด๋ ๊ฒฝ๋ก(๋ณดํต airflow๊ณ์ ์ dags ํด๋ ์์)
2. dag ๊ธฐ๋ณธ ์ ์ฅ ์์น ํ์ธ airflow.cfg
aws์ ๊ฒฝ์ฐ
dags_folder = /var/lib/airflow/dags
docker์ ๊ฒฝ์ฐ
dags_folder = /docker/airflow-setup/dags
ํด๋น ์์น์ dag ๋ค์ ์์นํด์ผ Airflow๊ฐ ์๋์ผ๋ก dag๋ก ์ธ์์ ํ๊ฒ ๋๋ค.
3-1. Web์ผ๋ก Airflow ์ ์
airflow์ DAG๋ ์น์์ ์กฐ์ํ ์๋ ์๊ณ , shell์์ ์กฐ์ํ ์๋ ์๋ค.
aws์ ๊ฒฝ์ฐ
ec2์ฃผ์:8080
docker์ ๊ฒฝ์ฐ
VM์ธ๋ถip์ฃผ์:8080
4. Unpause DAG ์ ํ ํ ์ ์์ ์ผ๋ก ์๋๋๋์ง ํ์ธ
๋ง์ฝ, ํน์ DAG ์ํ๊ฐ failed์ด๋ฉด → failed์ธ task์ ๋ค์ด๊ฐ์ log ํ์ธ
3-2. shell์์ ํ์ธ
EC2-airflow ์ด์ฉํ๋ ๊ฒฝ์ฐ
(๊ณตํต ๋ช ๋ น์ด๋ฅผ ์ด์ฉํ๋ค)
GCP-Docker-airflow ์ด์ฉํ๋ ๊ฒฝ์ฐ
- docker ์ปจํ ์ด๋ ์ ๋ณด ํ์ธ
docker -ps
- airflow-scheduler ์ปจํ ์ด๋ ์ ์
docker exec -it airflow-scheduler-์ปจํ ์ด๋id sh
(๊ณตํต ๋ช ๋ น์ด๋ฅผ ์ด์ฉํ๋ค)
(๊ณตํต ๋ช ๋ น์ด)
- ๋ชจ๋ dags ๋ชฉ๋ก ํ์ธ
airflow dags list
- ํน์ dag์ task ๋ชฉ๋ก ํ์ธ
airflow tasks list dagid์ด๋ฆ
- variables ๋ชฉ๋ก ํ์ธ
airflow variables list
- ํน์ dag ์คํ
airflow task [text | run] DAG์ด๋ฆ task์ด๋ฆ ๋ ์ง(YYYY-MM-DD)
- test : task์ ์คํ์ด ๋ฉํ๋ฐ์ดํฐ DB์ ๊ธฐ๋ก O
- run : task์ ์คํ์ด ๋ฉํ๋ฐ์ดํฐ DB์ ๊ธฐ๋ก O
โ ก. ์ค์ต2 : api ํ์ผ ์ด์ฉํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ๋ง๋ค๊ธฐ
Yahoo finance api๋ฅผ ํ์ฉํด ์ฃผ์์ ์ ๋ณด๋ฅผ airflow๋ฅผ ํ์ฉํด ์์งํด์ค์.
yfinance api๋ ์ง๋ 30์ผ์ ์ ๋ณด๋ฅผ ๋ฐ์์ฌ ์ ์๋ค(ํน์ ๋ ์ง์ ๋ํ ์ ๋ณด๋ ์ด๋ ต๋ค)
๊ตฌํํด์ผ ํ๋ ๊ธฐ๋ฅ๋ค์ ๋ค์๊ณผ ๊ฐ๋ค.
1. Task1 : Extract, Transform
2. Task2-1 : Load(Full refresh) UpdateSymbol.py
- delete * from table : ๋ชจ๋ ๋ ์ฝ๋ ์ญ์ (ํธ๋์ญ์ )
- drop table : ํ ์ด๋ธ์ ํต์งธ๋ก ์ญ์ (์ด ๋ฐฉ๋ฒ ์ฌ์ฉ)
3. Task2-2 : Load(Incremental Update) UpdateSymbol_v2.py
1) ์์ ํ ์ด๋ธ ์์ฑ(CTAS) : ๊ธฐ์กด ํ ์ด๋ธ ๋ณต์ฌ
cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
2) ์์ ํ ์ด๋ธ์ finanace api ๋ ์ฝ๋ ์ ์ฌ
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
cur.execute(sql)
3) ์๋ณธ ํ ์ด๋ธ ์ญ์ ํ๊ณ ์๋ก ์์ฑ
_create_table(cur, schema, table, True)
4) ์๋ณธ ํ ์ด๋ธ์ ์์ ํ ์ด๋ธ ๋ณต์ฌ(๋จ CTASelect distinc ์ฌ์ฉํ์ฌ ์ค๋ณต ์ ๊ฑฐ)
cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
cur.execute("COMMIT;") # cur.execute("END;")
Task 3. DAG ์คํ
1. airflow web UI ์์ ์คํ
2. shell์์ ์คํ
airflow dats test dagid์ด๋ฆ 2023-05-30
โ ก. SQL ๋ช ๋ น์ด์ ๋ํ ๊ฒฌํด
1. FULL Refresh์ผ๋ DDL๊ณผ DML์ ์ฌ์ฉ
๋ฐ์ดํฐ๊ฐ ๋ง์ ๊ฒฝ์ฐ delete ~ from table, truncate ~ from table, drop table ์ค ์ด๋ ๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ ์ง ๊ณ ๋ฏผ์ด ๋๋ค.
์ฌ๊ธฐ์์๋ delete์ drop ๋ช ๋ น์ด์ ์ฌ์ฉ์ ๋ํด ๋น๊ตํด ๋ณด์.
DELETE๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ:
- ํน์ ์กฐ๊ฑด์ ๋ง๋ ํ์ ์ญ์ ํ ๋
- ํน์ ํ์ ์ญ์ ํ ํ์๋ ํ ์ด๋ธ์ ๊ตฌ์กฐ๋ฅผ ์ ์งํ๊ณ ์ถ์ ๋
- ๋กค๋ฐฑ์ด ํ์ํ ๋(ํธ๋์ญ์ ์ ์ฌ์ฉํ ์ ์์).
DROP์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ:
- ํ ์ด๋ธ ์ ์ฒด๋ฅผ ์ญ์ ํ๊ณ ํด๋น ํ ์ด๋ธ์ ๊ตฌ์กฐ๊น์ง ์์ ํ ์ ๊ฑฐํ ๋
- ํ ์ด๋ธ์ ์์ ํ ์ฌ์์ฑํด์ผ ํ ๋
DROP ๋ช ๋ น์ด๋ฅผ ์ด์ฉํ๋ฉด ํ ์ด๋ธ ์ ์ฒด๊ฐ ์ญ์ ๋๊ธฐ ๋๋ฌธ์ ๋ณดํต ๋ ๋น ๋ฅด๋ค.
ํ์ง๋ง, ๋ฐ์ดํฐ๊ฐ ๋ง์ ๊ฒฝ์ฐ ๋ณต๊ตฌํด์ผํ ํ์์ฑ์ด ์์ ์๋ ์์ผ๋ฏ๋ก ๋ฐ์ดํฐ์ ์ฐ์์ ๋ํด ๊ณ ๋ฏผํ ๋ค, ๋ช ๋ น์ด๋ฅผ ๊ณ ๋ฅด์!