๋ฐ๋ธŒ์ฝ”์Šค_๋ฐ์ดํ„ฐ์—”์ง€๋‹ˆ์–ด๋ง

[Week10 Airflow] TIL 44์ผ์ฐจ Airflow DAG, task ์‹ค์Šต๊ณผ ๊ณ ๋„ํ™”

๐Ÿช„ํ•˜๋ฃจ๐Ÿช„ 2023. 12. 15. 17:43
728x90

์˜ค๋Š˜์€ ์•ž์„œ ์ž‘์„ฑํ–ˆ๋˜ ETL์ฝ”๋“œ๋ฅผ airflow๋กœ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ๋ณ€๊ฒฝํ•ด ๋ณด์ž.

 

ํ•ด๋‹น ์‹ค์Šต์—์„œ๋Š” AWS redshift๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

AWS S3์˜ csv ํŒŒ์ผ์„ extract, transform, load ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•ด airflow์— ์ ์žฌํ•˜๋Š” ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์„ ๋งŒ๋“ ๋‹ค.

Full refresh ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค.

โ… . ์‹ค์Šต1 : csv ํŒŒ์ผ ์ด์šฉํ•œ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ ๋งŒ๋“ค๊ธฐ

1. ์ดˆ๊ธฐ ์ฝ”๋“œ

AWS S3์˜ ํŒŒ์ผ์„ load, transform, extractํ•ด์„œ AWS Redshift์— ์ ์žฌ

import psycopg2

# 1. Redshift ์—ฐ๊ฒฐ
def get_Redshift_connection():
    host = "redshift์ฃผ์†Œ"
    redshift_user = "์‚ฌ์šฉ์ž์ด๋ฆ„"
    redshift_pass = "๋น„๋ฐ€๋ฒˆํ˜ธ"
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
        dbname=dbname,
        user=redshift_user,
        password=redshift_pass,
        host=host,
        port=port
    ))
    conn.set_session(autocommit=True)
    return conn.cursor()
    
# 2. ETL ํ•จ์ˆ˜
import requests
def extract(url):
    f = requests.get(url)
    return (f.text)
def transform(text):
    lines = text.strip().split("\n")[1:] #์ฒซ๋ฒˆ์งธ ๋ผ์ธ(ํ—ค๋”)๋Š” ์ œ์™ธํ•˜๊ณ  ์ฒ˜๋ฆฌ
    records = []
    for l in lines:
      (name, gender) = l.split(",") #l = "์ด๋ฆ„,์„ฑ๋ณ„" -> [ '์ด๋ฆ„', '์„ฑ๋ณ„' ]
      records.append([name, gender])
    return records
def load(records):
    # BEGIN๊ณผ END๋ฅผ ์‚ฌ์šฉํ•ด์„œ SQL ๊ฒฐ๊ณผ๋ฅผ ํŠธ๋žœ์žญ์…˜์œผ๋กœ ๋งŒ๋“ค์–ด์ฃผ๋Š” ๊ฒƒ์ด ์ข‹์Œ
    schema="์Šคํ‚ค๋งˆ์ด๋ฆ„"
    cur = get_Redshift_connection()
    # SQL ํŠธ๋žœ์žญ์…˜์œผ๋กœ ์ˆ˜ํ–‰
    try:
      cur.execute("BEGIN")
      # DELETE FROM์„ ๋จผ์ € ์ˆ˜ํ–‰ -> FULL REFRESH์„ ํ•˜๋Š” ํ˜•ํƒœ
      cur.execute(f"DELETE FROM {schema}.name_gender;")
      for r in records:
          name = r[0]
          gender = r[1]
          print(name, "-", gender)
          sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
          cur.execute(sql)
      cur.execute("COMMIT;")
    except(Exception, psycopg2.DatabaseError) as error:
      print(error)
      cur.execute("ROLLBACK;")

 

์‹คํ–‰

link = "AWS s3์˜ csv ์ฃผ์†Œ"
data = extract(link)
lines = transform(data)
load(lines)

 

2. ๊ฐœ์„ 1-์ค‘์š”ํ•œ ์ •๋ณด๋Š” ๋…ธ์ถœ๋˜์ง€ ์•Š๊ฒŒ ๋”ฐ๋กœ ์ €์žฅํ•œ๋‹ค.

: airflow-admin-connections ์ด์šฉ

 

airflow-admin-connections์— ์ €์žฅํ•  ์ •๋ณด๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

Connection-id : ์•„์ด๋”” ์ง€์ •

Connection-type : Amazon Redshift

Database : schema์ด๋ฆ„

Port : 5439

from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

 

2. ๊ฐœ์„ 2-๊ฐ๊ฐ์„ ํ•˜๋‚˜์˜ task๋กœ ๋งŒ๋“ค์–ด(์ด 3๊ฐœ์˜ task) DAG์— ์ง€์ •ํ•œ๋‹ค.

๋ณดํ†ต ํ•˜๋‚˜์˜ task๋Š” ํ•˜๋‚˜์˜ operator์„ ์‚ฌ์šฉํ•œ๋‹ค.

3๊ฐ€์ง€ ํ•จ์ˆ˜๋ฅผ ํ•˜๋‚˜์˜ task๋กœ ๋งŒ๋“ค๋•Œ๋Š” ํ•จ์ˆ˜์˜ ๊ฒฐ๊ณผ๋ฅผ ๋‹ค์Œ ํ•จ์ˆ˜์˜ ์ž…๋ ฅ์œผ๋กœ ์ง์ ‘ ์ง€์ •ํ•  ์ˆ˜๊ฐ€ ์žˆ๋‹ค.

ํ•˜์ง€๋งŒ, ๊ฐ๊ฐ์˜ ํ•จ์ˆ˜๋ฅผ ํ•˜๋‚˜์˜ task๋กœ ๋งŒ๋“ค๋•Œ์—๋Š”Xcom๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•˜์—ฌ ํ•œ ํ•จ์ˆ˜์˜ ์ถœ๋ ฅ์„ ๋‹ค์Œ ํ•จ์ˆ˜์˜ ์ž…๋ ฅ์œผ๋กœ ์ง€์ •ํ•œ๋‹ค.

 

Xcom์ด๋ž€?

๊ฐ task๋“ค์ด ์„œ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ ๋ฐ›๋Š” ๋ฐฉ์‹์œผ๋กœ,  ๋ณดํ†ต Operator์˜ ์ถœ๋ ฅ๊ฐ’์„ ๋‹ค๋ฅธ Operator๊ฐ€ ์ฝ์–ด๊ฐ€๋Š” ํ˜•ํƒœ์ด๋‹ค.

Airflow์˜ ๊ฒฝ์šฐ task์˜ return๊ฐ’์„ id์™€ ํ•จ๊ป˜ ์ €์žฅํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๊ฐ€๋Šฅํ•œ ๋ฐฉ๋ฒ•์ด๋‹ค.

๋‹ค๋งŒ, ์ด ๊ฐ’์€ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ DB์— ์ €์žฅ์ด ๋˜๊ธฐ ๋•Œ๋ฌธ์— ํฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ ๋ฐ›๋Š”๋ฐ ์‚ฌ์šฉํ•˜๊ธฐ๋Š” ์–ด๋ ต๋‹ค.(๋”ฐ๋ผ์„œ ๋ณดํ†ต ํฐ ๋ฐ์ดํ„ฐ๋ผ๋ฉด S3์— ๋กœ๋“œํ•œ ๋’ค, ํ•ด๋‹น ์œ„์น˜๋ฅผ ์ฃผ๊ณ ๋ฐ›๋Š” ํ˜•ํƒœ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ ๋ฐ›๋Š”๋‹ค.)

 

context["task_instance"].xcom_pull(key="return_value", task_ids="ํ•จ์ˆ˜์ด๋ฆ„")

 

3. ๊ฐœ์„ 3-operator์˜ params๋ฅผ ์ด์šฉํ•˜์—ฌ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์ •๋ณด๋ฅผ ์ „๋‹ฌํ•˜์ž.

airflow-admin-variables ์ด์šฉ (**context)

 

๋ณดํ†ต open_api_key, csv_url ๋“ฑ์˜ ์ •๋ณด๋ฅผ ์ €์žฅํ•œ๋‹ค.

from airflow.models import Variable

  • get : key๋ฅผ ์ด์šฉํ•ด airflow์—์„œ ๊ฐ’์„ ๊ฐ€์ ธ์˜ด
  • set : key, value๋ฅผ airflow์— ์ €์žฅ
def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)


def transform(**context):
    logging.info("Transform started")    
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    lines = text.strip().split("\n")[1:] # ์ฒซ ๋ฒˆ์งธ ๋ผ์ธ์„ ์ œ์™ธํ•˜๊ณ  ์ฒ˜๋ฆฌ
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "์ด๋ฆ„, ์„ฑ๋ณ„" -> [ '์ด๋ฆ„', '์„ฑ๋ณ„' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(**context):
    logging.info("load started")    
    schema = context["params"]["schema"]
    table = context["params"]["table"]

    lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")

    # BEGIN๊ณผ END๋ฅผ ์‚ฌ์šฉํ•ด์„œ SQL ๊ฒฐ๊ณผ๋ฅผ ํŠธ๋žœ์žญ์…˜์œผ๋กœ ๋งŒ๋“ค์–ด์ฃผ๋Š” ๊ฒƒ์ด ์ข‹์Œ
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # DELETE FROM์„ ๋จผ์ € ์ˆ˜ํ–‰ -> FULL REFRESH์„ ํ•˜๋Š” ํ˜•ํƒœ
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")

 

 

dag = DAG(
    dag_id = '์•„์ด๋””',
    start_date = datetime(2023,4,6), # ๋‚ ์งœ๊ฐ€ ๋ฏธ๋ž˜์ธ ๊ฒฝ์šฐ ์‹คํ–‰์ด ์•ˆ๋จ
    schedule = '0 2 * * *',  # ์ ๋‹นํžˆ ์กฐ์ ˆ
    catchup = False,
    max_active_runs = 1,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': '์Šคํ‚ค๋งˆ์ด๋ฆ„',
        'table': 'ํ…Œ์ด๋ธ”์ด๋ฆ„'
    },
    dag = dag)

extract >> transform >> load

 

์œ„์˜ ๊ฐœ์„ ์‚ฌํ•ญ๋“ค์— test decorator๋ฅผ ์‚ฌ์šฉํ•ด ๋ณด์ž.

ํ•ด๋‹น ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด, ๊ฐ€๋…์„ฑ ์žˆ๊ฒŒ ์ฝ”๋“œ๋ฅผ ๋งŒ๋“ค ์ˆ˜ ์žˆ๊ณ  Xcom์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ ๋ฐ›์ง€ ์•Š์•„๋„ ๋œ๋‹ค.

 

4. ๊ฐœ์„ 4 - ๊ฐ€๋…์„ฑ ์žˆ๋Š” ์ฝ”๋“œ๋ฅผ ๋งŒ๋“ค์–ด ๋ณด์ž.

task decorator๋ฅผ ์‚ฌ์šฉ

def get_Redshift_connection(autocommit=True):
	#1. AIRFLOW connection ํ™œ์šฉ
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

#2. Decorator๋ฅผ ์ด์šฉํ•ด ๊ฐ๊ฐ์˜ ํ•จ์ˆ˜๋ฅผ task๋กœ ์ง€์ • + ํŒŒ๋ผ๋ฏธํ„ฐ ๋Œ€์‹  ๋งค๊ฐœ๋ณ€์ˆ˜ ์ด์šฉ 
@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text


@task
def transform(text):
    lines = text.strip().split("\n")[1:] # ์ฒซ ๋ฒˆ์งธ ๋ผ์ธ์„ ์ œ์™ธํ•˜๊ณ  ์ฒ˜๋ฆฌ
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


@task
def load(schema, table, records):
    logging.info("load started")    
    cur = get_Redshift_connection()   
    # BEGIN๊ณผ END๋ฅผ ์‚ฌ์šฉํ•ด์„œ SQL ๊ฒฐ๊ณผ๋ฅผ ํŠธ๋žœ์žญ์…˜์œผ๋กœ ๋งŒ๋“ค์–ด์ฃผ๋Š” ๊ฒƒ์ด ์ข‹์Œ
    try:
        cur.execute("BEGIN;")
        # DELETE FROM์„ ๋จผ์ € ์ˆ˜ํ–‰ -> FULL REFRESH์„ ํ•˜๋Š” ํ˜•ํƒœ
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


with DAG(
    dag_id='namegender_v5',
    start_date=datetime(2022, 10, 6),  # ๋‚ ์งœ๊ฐ€ ๋ฏธ๋ž˜์ธ ๊ฒฝ์šฐ ์‹คํ–‰์ด ์•ˆ๋จ
    schedule='0 2 * * *',  # ์ ๋‹นํžˆ ์กฐ์ ˆ
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
) as dag:
	#3. Airflow variable ์ด์šฉ
    url = Variable.get("csv_url")
    schema = '์Šคํ‚ค๋งˆ๋ช…'   ## ์ž์‹ ์˜ ์Šคํ‚ค๋งˆ๋กœ ๋ณ€๊ฒฝ
    table = 'name_gender' ## ํ…Œ์ด๋ธ”๋ช…
	#2. ๋ฐ์ฝ”๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด๋ ‡๊ฒŒ ํ•จ์ˆ˜ํ˜•ํƒœ๋กœ ์ „๋‹ฌO
    lines = transform(extract(url))
    load(schema, table, lines)

 

 

5. ์œ„์˜ ์ฝ”๋“œ๊ฐ€ ์ €์žฅ๋œ ๊นƒํ—ˆ๋ธŒ์—์„œ ํŒŒ์ผ๋“ค์„ ๋ณต์‚ฌํ•ด์„œ ์‹ค์Šต

(AWS-airflow, GCP-Docker-airflow ๋‘ ๊ฒฝ์šฐ ๋ชจ๋‘ ์‹ค์Šต)

 

1. ๊นƒํ—ˆ๋ธŒ์—์„œ ์ฝ”๋“œ ๋ณต์‚ฌ(ํด๋ก +๋ชจ๋“  ํด๋”/ํŒŒ์ผ ๋ณต์‚ฌ)

git clone ๋ ˆํฌ์ง€ํ† ๋ฆฌ ์ฃผ์†Œ

cp -r ๋ ˆํฌ์ง€ํ† ๋ฆฌํŒŒ์ผ(ํด๋”)๊ฒฝ๋กœ/* ๋ณต์‚ฌํ•  ํด๋” ๊ฒฝ๋กœ(๋ณดํ†ต airflow๊ณ„์ •์˜ dags ํด๋” ์•ˆ์—)

 

2. dag ๊ธฐ๋ณธ ์ €์žฅ ์œ„์น˜ ํ™•์ธ airflow.cfg

aws์˜ ๊ฒฝ์šฐ

dags_folder = /var/lib/airflow/dags

 

docker์˜ ๊ฒฝ์šฐ

dags_folder = /docker/airflow-setup/dags

 

ํ•ด๋‹น ์œ„์น˜์— dag ๋“ค์„ ์œ„์น˜ํ•ด์•ผ Airflow๊ฐ€ ์ž๋™์œผ๋กœ dag๋กœ ์ธ์‹์„ ํ•˜๊ฒŒ ๋œ๋‹ค.

 

3-1. Web์œผ๋กœ Airflow ์ ‘์†

airflow์˜ DAG๋Š” ์›น์—์„œ ์กฐ์ž‘ํ•  ์ˆ˜๋„ ์žˆ๊ณ , shell์—์„œ ์กฐ์ž‘ํ•  ์ˆ˜๋„ ์žˆ๋‹ค. 

aws์˜ ๊ฒฝ์šฐ

ec2์ฃผ์†Œ:8080

 

docker์˜ ๊ฒฝ์šฐ

VM์™ธ๋ถ€ip์ฃผ์†Œ:8080

 

4. Unpause DAG ์„ ํƒ ํ›„ ์ •์ƒ์ ์œผ๋กœ ์ž‘๋™๋˜๋Š”์ง€ ํ™•์ธ

๋งŒ์•ฝ, ํŠน์ • DAG ์ƒํƒœ๊ฐ€ failed์ด๋ฉด → failed์ธ task์— ๋“ค์–ด๊ฐ€์„œ log ํ™•์ธ

 

3-2. shell์—์„œ ํ™•์ธ

EC2-airflow ์ด์šฉํ•˜๋Š” ๊ฒฝ์šฐ

(๊ณตํ†ต ๋ช…๋ น์–ด๋ฅผ ์ด์šฉํ•œ๋‹ค)

 

GCP-Docker-airflow ์ด์šฉํ•˜๋Š” ๊ฒฝ์šฐ

- docker ์ปจํ…Œ์ด๋„ˆ ์ •๋ณด ํ™•์ธ

docker -ps

- airflow-scheduler ์ปจํ…Œ์ด๋„ˆ ์ ‘์†

docker exec -it airflow-scheduler-์ปจํ…Œ์ด๋„ˆid sh

(๊ณตํ†ต ๋ช…๋ น์–ด๋ฅผ ์ด์šฉํ•œ๋‹ค)

 

(๊ณตํ†ต ๋ช…๋ น์–ด)

- ๋ชจ๋“  dags ๋ชฉ๋ก ํ™•์ธ

airflow dags list

- ํŠน์ • dag์˜ task ๋ชฉ๋ก ํ™•์ธ

airflow tasks list dagid์ด๋ฆ„

- variables ๋ชฉ๋ก ํ™•์ธ

airflow variables list

- ํŠน์ • dag ์‹คํ–‰

airflow task [text | run] DAG์ด๋ฆ„ task์ด๋ฆ„ ๋‚ ์งœ(YYYY-MM-DD)

  • test : task์˜ ์‹คํ–‰์ด ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ DB์— ๊ธฐ๋ก O
  • run : task์˜ ์‹คํ–‰์ด ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ DB์— ๊ธฐ๋ก O

 

 

โ…ก. ์‹ค์Šต2 : api ํŒŒ์ผ ์ด์šฉํ•œ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ ๋งŒ๋“ค๊ธฐ

Yahoo finance api๋ฅผ ํ™œ์šฉํ•ด ์ฃผ์‹์˜ ์ •๋ณด๋ฅผ airflow๋ฅผ ํ™œ์šฉํ•ด ์ˆ˜์ง‘ํ•ด์˜ค์ž.

yfinance api๋Š” ์ง€๋‚œ 30์ผ์˜ ์ •๋ณด๋ฅผ ๋ฐ›์•„์˜ฌ ์ˆ˜ ์žˆ๋‹ค(ํŠน์ • ๋‚ ์งœ์— ๋Œ€ํ•œ ์ •๋ณด๋Š” ์–ด๋ ต๋‹ค) 

 

๊ตฌํ˜„ํ•ด์•ผ ํ•˜๋Š” ๊ธฐ๋Šฅ๋“ค์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

1. Task1 : Extract, Transform

2. Task2-1 : Load(Full refresh) UpdateSymbol.py

  • delete * from table : ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ ์‚ญ์ œ(ํŠธ๋žœ์žญ์…˜)
  • drop table : ํ…Œ์ด๋ธ”์„ ํ†ต์งธ๋กœ ์‚ญ์ œ(์ด ๋ฐฉ๋ฒ• ์‚ฌ์šฉ)

3. Task2-2 : Load(Incremental Update) UpdateSymbol_v2.py

1) ์ž„์‹œ ํ…Œ์ด๋ธ” ์ƒ์„ฑ(CTAS) : ๊ธฐ์กด ํ…Œ์ด๋ธ” ๋ณต์‚ฌ

cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")

 

2) ์ž„์‹œ ํ…Œ์ด๋ธ”์— finanace api ๋ ˆ์ฝ”๋“œ ์ ์žฌ

for r in records:
    sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
    cur.execute(sql)

 

3) ์›๋ณธ ํ…Œ์ด๋ธ” ์‚ญ์ œํ•˜๊ณ  ์ƒˆ๋กœ ์ƒ์„ฑ

_create_table(cur, schema, table, True)

 

4) ์›๋ณธ ํ…Œ์ด๋ธ”์— ์ž„์‹œ ํ…Œ์ด๋ธ” ๋ณต์‚ฌ(๋‹จ CTASelect distinc ์‚ฌ์šฉํ•˜์—ฌ ์ค‘๋ณต ์ œ๊ฑฐ)

cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
cur.execute("COMMIT;")   # cur.execute("END;")

 

 

Task 3. DAG ์‹คํ–‰

1. airflow web UI ์—์„œ ์‹คํ–‰

2. shell์—์„œ ์‹คํ–‰

airflow dats test dagid์ด๋ฆ„ 2023-05-30

 

 

โ…ก. SQL ๋ช…๋ น์–ด์— ๋Œ€ํ•œ ๊ฒฌํ•ด

1. FULL Refresh์ผ๋•Œ DDL๊ณผ DML์˜ ์‚ฌ์šฉ

๋ฐ์ดํ„ฐ๊ฐ€ ๋งŽ์€ ๊ฒฝ์šฐ delete ~ from table, truncate ~ from table, drop table ์ค‘ ์–ด๋Š ๋ช…๋ น์–ด๋ฅผ ์‚ฌ์šฉํ• ์ง€ ๊ณ ๋ฏผ์ด ๋œ๋‹ค.

 

์—ฌ๊ธฐ์—์„œ๋Š” delete์™€ drop ๋ช…๋ น์–ด์˜ ์‚ฌ์šฉ์— ๋Œ€ํ•ด ๋น„๊ตํ•ด ๋ณด์ž.

 

DELETE๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ:

  • ํŠน์ • ์กฐ๊ฑด์— ๋งž๋Š” ํ–‰์„ ์‚ญ์ œํ•  ๋•Œ
  • ํŠน์ • ํ–‰์„ ์‚ญ์ œํ•œ ํ›„์—๋„ ํ…Œ์ด๋ธ”์˜ ๊ตฌ์กฐ๋ฅผ ์œ ์ง€ํ•˜๊ณ  ์‹ถ์„ ๋•Œ
  • ๋กค๋ฐฑ์ด ํ•„์š”ํ•  ๋•Œ(ํŠธ๋žœ์žญ์…˜์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Œ).

DROP์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ:

  • ํ…Œ์ด๋ธ” ์ „์ฒด๋ฅผ ์‚ญ์ œํ•˜๊ณ  ํ•ด๋‹น ํ…Œ์ด๋ธ”์˜ ๊ตฌ์กฐ๊นŒ์ง€ ์™„์ „ํžˆ ์ œ๊ฑฐํ•  ๋•Œ
  • ํ…Œ์ด๋ธ”์„ ์™„์ „ํžˆ ์žฌ์ƒ์„ฑํ•ด์•ผ ํ•  ๋•Œ

DROP ๋ช…๋ น์–ด๋ฅผ ์ด์šฉํ•˜๋ฉด ํ…Œ์ด๋ธ” ์ „์ฒด๊ฐ€ ์‚ญ์ œ๋˜๊ธฐ ๋•Œ๋ฌธ์— ๋ณดํ†ต ๋” ๋น ๋ฅด๋‹ค.

ํ•˜์ง€๋งŒ, ๋ฐ์ดํ„ฐ๊ฐ€ ๋งŽ์€ ๊ฒฝ์šฐ ๋ณต๊ตฌํ•ด์•ผํ•  ํ•„์š”์„ฑ์ด ์žˆ์„ ์ˆ˜๋„ ์žˆ์œผ๋ฏ€๋กœ ๋ฐ์ดํ„ฐ์˜ ์“ฐ์ž„์— ๋Œ€ํ•ด ๊ณ ๋ฏผํ•œ ๋’ค, ๋ช…๋ น์–ด๋ฅผ ๊ณ ๋ฅด์ž! 

 

 

โ…ฆ. ์ˆ™์ œ2) ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ ์ฝ”๋“œ ์ž‘์„ฑํ•˜๊ธฐ

728x90