๋ฐ๋ธŒ์ฝ”์Šค_๋ฐ์ดํ„ฐ์—”์ง€๋‹ˆ์–ด๋ง

[Week12 Airflow ๊ณ ๊ธ‰] TIL 52์ผ์ฐจ Gsheet ์—ฐ๋™ํ•˜๊ธฐ&Airflow API ์‚ฌ์šฉํ•˜๊ธฐ

๐Ÿช„ํ•˜๋ฃจ๐Ÿช„ 2024. 1. 3. 23:26
728x90

โ… . Redshift, Gsheet ์—ฐ๋™ํ•˜๊ธฐ

1. GCP ๊ธฐ๋ณธ์„ค์ •

๊ตฌ๊ธ€ ํด๋ผ์šฐ๋“œ์— ๋กœ๊ทธ์ธํ•œ๋‹ค.

https://console.cloud.google.com/welcome?project=programmers-devcourse-docker

 

Google ํด๋ผ์šฐ๋“œ ํ”Œ๋žซํผ

๋กœ๊ทธ์ธ Google ํด๋ผ์šฐ๋“œ ํ”Œ๋žซํผ์œผ๋กœ ์ด๋™

accounts.google.com

 

ํ”„๋กœ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.

 

API ๋ฐ ์„œ๋น„์Šค>์‚ฌ์šฉ์ž ์ธ์ฆ ์ •๋ณด>์‚ฌ์šฉ์ž ์ธ์ฆ ์ •๋ณด ๋งŒ๋“ค๊ธฐ>์„œ๋น„์Šค๊ณ„์ •์„ ์„ ํƒํ•œ๋‹ค.

 

 

์„œ๋น„์Šค ๊ณ„์ • ์ด๋ฆ„ : Gsheet๋กœ ์ž…๋ ฅ 

์ด ์„œ๋น„์Šค ๊ณ„์ •์— ํ”„๋กœ์ ํŠธ์— ๋Œ€ํ•œ ์•ก์„ธ์Šค ๊ถŒํ•œ ๋ถ€์—ฌ : ํŽธ์ง‘์ž๋ฅผ ์„ ํƒํ•œ๋‹ค.

 

 

๊ทธ๋‹ค์Œ ๋‹ค์‹œ API ๋ฐ ์„œ๋น„์Šค>์‚ฌ์šฉ์ž ์ธ์ฆ ์ •๋ณด ํŽ˜์ด์ง€์—์„œ

์•ž์„œ ๋งŒ๋“  Gsheet ์„œ๋น„์Šค๊ณ„์ •์˜ ์ด๋ฉ”์ผ์„ ์„ ํƒํ•œ๋‹ค.

 

ํ‚ค>ํ‚ค์ถ”๊ฐ€>JSON ๋ฒ„ํŠผ์„ ๋ˆŒ๋Ÿฌ ์ƒˆ ํ‚ค๋ฅผ ๋‹ค์šด๋กœ๋“œํ•œ๋‹ค.

 

2๊ฐœ์˜ API๋ฅผ ํ—ˆ์šฉํ•œ๋‹ค.API ๋ฐ ์„œ๋น„์Šค>์‚ฌ์šฉ ์„ค์ •๋œ API ๋ฐ ์„œ๋น„์Šค>API ๋ฐ ์„œ๋น„์Šค ์‚ฌ์šฉ ์„ค์ •์„ ์„ ํƒํ•œ๋‹ค.

 

Google Sheets API ์‚ฌ์šฉ ๋ฒ„ํŠผ์„ ๋ˆ„๋ฅธ๋‹ค.

 

 

Google Drive API๋ฅผ ํ™œ์„ฑํ™”ํ•œ๋‹ค.

Google Drive API

 

๋งŒ์•ฝ ํ•ด๋‹น API๋ฅผ ํ™œ์„ฑํ™”ํ•˜์ง€ ์•Š์œผ๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์˜ค๋ฅ˜๊ฐ€ ๋‚˜ํƒ€๋‚œ๋‹ค.

 

์˜ค๋ฅ˜๋‚ด์šฉ

"code": 403,
  "message": "Access Not Configured. Drive API has not been used in project {your project} before or it is disabled. Enable it by visiting https://console.developers.google.com/apis/api/drive.googleapis.com/overview?project={your project} then retry. If you enabled this API recently, wait a few minutes for the action to propagate to our systems and retry."

 

 

2. Airflow Variable ๋“ฑ๋กํ•˜๊ธฐ

key : google_sheet_access_token

Var : 1๋ฒˆ์—์„œ ๋‹ค์šด๋กœ๋“œํ•œ ํ‚ค ๋‚ด์šฉ

์„ ์ž…๋ ฅํ•œ๋‹ค.

 

 

โ…ก. Gsheet๋‚ด์šฉ Redshfit ํ…Œ์ด๋ธ”๋กœ ์ €์žฅํ•˜๊ธฐ

1. DAG ๋งŒ๋“ค๊ธฐ : Gsheet_to_Redshift

ํ•ด๋‹น DAG๋Š” ์„ธ ๊ฐœ์˜ task๋กœ ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋‹ค.

  • download_tab_in_gsheet (PythonOperator) : gsheet ํŠน์ • tab์— ์žˆ๋Š” ์ •๋ณด๋ฅผ csvํŒŒ์ผ๋กœ ๋‹ค์šด๋กœ๋“œ
download_tab_in_gsheet = PythonOperator(
    task_id = 'download_{}_in_gsheet'.format(sheet["table"]),
    python_callable = download_tab_in_gsheet,
    params = sheet,
dag = dag)

def download_tab_in_gsheet(**context):
    url = context["params"]["url"]
    tab = context["params"]["tab"]
    table = context["params"]["table"]
    data_dir = Variable.get("DATA_DIR")

    gsheet.get_google_sheet_to_csv(
        url,
        tab,
        data_dir+'{}.csv'.format(table)
    )
  • copy_to_s3 (PythonOperator) : csvํŒŒ์ผ์„ AWS S3์— ์ €์žฅ
copy_to_s3 = PythonOperator(
    task_id = 'copy_{}_to_s3'.format(sheet["table"]),
    python_callable = copy_to_s3,
    params = {
        "table": sheet["table"],
        "s3_key": s3_key
    },
dag = dag)

def copy_to_s3(**context):
    table = context["params"]["table"]
    s3_key = context["params"]["s3_key"]

    s3_conn_id = "aws_conn_id"
    s3_bucket = "grepp-data-engineering"
    data_dir = Variable.get("DATA_DIR")
    local_files_to_upload = [ data_dir+'{}.csv'.format(table) ]
    replace = True

    s3.upload_to_s3(s3_conn_id, s3_bucket, s3_key, local_files_to_upload, replace)

 

  • run_copy_sql (S3ToRedshiftOperator) : AWS S3์— ์ €์žฅ๋œ csvํŒŒ์ผ Redshift์— ๋ฒŒํฌ ์—…๋ฐ์ดํŠธ(COPY)
S3ToRedshiftOperator(
        task_id = 'run_copy_sql_{}'.format(sheet["table"]),
        s3_bucket = "grepp-data-engineering",
        s3_key = s3_key,
        schema = sheet["schema"],
        table = sheet["table"],
        copy_options=['csv', 'IGNOREHEADER 1'],
        method = 'REPLACE',
        redshift_conn_id = "redshift_dev_db",
        aws_conn_id = 'aws_conn_id',
        dag = dag
    )

 

 

โ…ข. Redshift ํ…Œ์ด๋ธ” ๋‚ด์šฉ GSheet์— ๋ฐ˜์˜ํ•˜๊ธฐ

1. DAG ๋งŒ๋“ค๊ธฐ : SQL_to_Sheet

ํ•ด๋‹น DAG๋Š” ํ•˜๋‚˜์˜ task๋กœ ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋‹ค.

  • update_sql_to_sheet1 : Redshift ํ…Œ์ด๋ธ”์˜ ์ •๋ณด๋ฅผ sql์„ ์ด์šฉํ•ด์„œ ์ถ”์ถœ ํ•œ ๋’ค, GSheet์— ๋ฐ˜์˜ํ•˜๋Š” task์ด๋‹ค.
sheet_update = PythonOperator(
    dag=dag,
    task_id='update_sql_to_sheet1',
    python_callable=update_gsheet,
    params = {
        "sql": "SELECT * FROM analytics.nps_summary",
        "sheetfilename": "spreadsheet-copy-testing",
        "sheetgid": "RedshiftToSheet"
    }
)

def update_gsheet(**context):
    sql = context["params"]["sql"]
    sheetfilename = context["params"]["sheetfilename"]
    sheetgid = context["params"]["sheetgid"]

    gsheet.update_sheet(sheetfilename, sheetgid, sql, "redshift_dev_db")

 

 

โ…ฃ. Airflow API์™€ ๋ชจ๋‹ˆํ„ฐ๋ง

API๋ฅผ ํ™œ์šฉํ•˜์—ฌ Airflow์— ๋Œ€ํ•œ ๋‚ด์šฉ์„ ์–ป์„ ์ˆ˜ ์žˆ๋‹ค. 

1. Airflow API ํ™œ์„ฑํ™”

airflow.cfg

[api]
auth_backend = airflow.api.auth.backend.basic_auth

 

๋งŒ์•ฝ, yamlํŒŒ์ผ์„ ์ˆ˜์ •ํ•ด์„œ Docker compose up ํ•  ๋•Œ cfg์˜ ์„ค์ •์„ Override ํ•˜๋ ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋ฐฉ๋ฒ•์„ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

docker-compose.yaml

AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'

 

์ œ๋Œ€๋กœ ์„ค์ •๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ด ๋ณด์ž.

docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend

 

2. ๋ณด์•ˆ์„ ์œ„ํ•ด Airflow์—์„œ ์ƒˆ๋กœ์šด ์‚ฌ์šฉ์ž๋ฅผ ์ƒ์„ฑ

airflow ์‚ฌ์šฉ์ž๋Š” ๋ชจ๋“  ๊ถŒํ•œ์— ๋Œ€ํ•ด ํ—ˆ์šฉ๋˜์–ด ์žˆ์œผ๋ฏ€๋กœ ๋…ธ์ถœ๋˜๋ฉด ์ž์นซ ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธธ ์ˆ˜ ์žˆ๋‹ค.

๋”ฐ๋ผ์„œ ๋” ์ ์€ ๊ถŒํ•œ์„ ๊ฐ€์ง€๋Š” ๋ชจ๋‹ˆํ„ฐ๋ง์„ ์œ„ํ•œ ์ƒˆ๋กœ์šด ์‚ฌ์šฉ์ž๋ฅผ ์ƒ์„ฑํ•ด ๋ณด์ž.

 

Airflow Web UI >Security>List Usert>+ ๋ฒ„ํŠผ์„ ์„ ํƒํ•œ๋‹ค.

 

 

 

3. Airflow API๋ฅผ ํ˜ธ์ถœํ•˜๊ธฐ

์ฐธ๊ณ  ๋ฌธ์„œ

 

Airflow REST API

 

airflow.apache.org

 

1) ํŠน์ • DAG๋ฅผ API๋กœ Trigger ํ•˜๊ธฐ

curl -X POST --user "id์ž…๋ ฅ:password์ž…๋ ฅ" -H 'Content-Type: application/json' -d
'{"execution_date":"2023-05-24T00:00:00Z"}'
"http://localhost:8080/api/v1/dags/dag์ด๋ฆ„/dagRuns"

 

๊ฒฐ๊ณผ : ์ด๋ฏธ ์„ฑ๊ณต์ ์œผ๋กœ ์‹คํ–‰๋˜์—ˆ๋˜ DAG

{
 "detail": "DAGRun with DAG ID: 'HelloWorld' and DAGRun logical date: '2023-05-24 00:00:00+00:00'
already exists",
 "status": 409,
 "title": "Conflict",
 "type":
"https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/AlreadyExists"
}

 

2) ๋ชจ๋“  DAG ๋ฆฌ์ŠคํŠธ ํ•˜๊ธฐ

curl -X GET --user "id์ž…๋ ฅ:password์ž…๋ ฅ" http://localhost:8080/api/v1/dags

 

๊ฒฐ๊ณผ : ๋ชจ๋“  DAG์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ์•Œ๋ ค์ค€๋‹ค.

 {
 "dag_id": "SQL_to_Sheet",
 "default_view": "grid",
 "description": null,
 "file_token": "...",
 "fileloc": "/opt/airflow/dags/SQL_to_Sheet.py",
 "has_import_errors": false,
 "has_task_concurrency_limits": false,
 "is_active": true,
 "is_paused": true,
 "is_subdag": false,
 "last_expired": null,
 "last_parsed_time": "2023-06-18T05:21:34.266335+00:00",
 "last_pickled": null,
 "max_active_runs": 16,
 "max_active_tasks": 16,
 "next_dagrun": "2022-06-18T00:00:00+00:00",
 "next_dagrun_create_after": "2022-06-18T00:00:00+00:00",
 "next_dagrun_data_interval_end": "2022-06-18T00:00:00",
 "next_dagrun_data_interval_start": "2022-06-18T00:00:00",
 "owners": [ "airflow" ],
 "pickle_id": null,
 "root_dag_id": null,
 "schedule_interval": {
 "__type": "CronExpression",
 "value": "@once"
 },
 "scheduler_lock": null,
 "tags": [ { "name": "example" }],
 "timetable_description": "Once, as soon as possible"
}

 

3) ๋ชจ๋“  Variable ๋ฆฌ์ŠคํŠธ ํ•˜๊ธฐ

curl -X GET --user "id์ž…๋ ฅ:password์ž…๋ ฅ" http://localhost:8080/api/v1/variables

 

๊ฒฐ๊ณผ : ๋ชจ๋“  Variable์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ์•Œ๋ ค์ค€๋‹ค.

{
 "total_entries": 7,
 "variables": [
 {
 "description": null,
 "key": "api_token",
 "value": "12345667"
 },
 {
 "description": null,
 "key": "csv_url",
 "value": "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
 }

 

4) ๋ชจ๋“  Config ๋ฆฌ์ŠคํŠธ ํ•˜๊ธฐ

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config

 

๊ฒฐ๊ณผ : ๋ชจ๋“  Config ์ •๋ณด๋ฅผ ์•Œ๋ ค์ฃผ์ง€ ์•Š๊ณ  ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.(๋ณด์•ˆ๋ฌธ์ œ)

{
 "detail": "Your Airflow administrator chose not to expose the configuration, most likely for security
reasons.",
 "status": 403,
 "title": "Forbidden",
 "type":
"https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/PermissionDe
nied"
}

 

Q : Config์— ๋Œ€ํ•œ ์ •๋ณด๋Š” ๊ธฐ๋ณธ์œผ๋กœ ๋ง‰ํ˜€ ์žˆ๋‹ค. ์–ด๋–ป๊ฒŒ ํ’€์–ด์ค„ ์ˆ˜ ์žˆ์„๊นŒ?

 

5) airflow health ์ •๋ณด ํ˜ธ์ถœํ•˜๊ธฐ

/health API : Airflow์˜ metadb, scheduler์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ๋‚˜ํƒ€๋‚ธ๋‹ค.

curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health

 

๊ฒฐ๊ณผ

{"metadatabase": {"status": "healthy"}, 
"scheduler": {"latest_scheduler_heartbeat": "2024-01-04T15:45:24.935355+00:00", "status": "healthy"}}

 

 

5. cmd์—์„œ Variable, Connection ์ •๋ณด ์„ค์ • ๋ฐ ๊ฐ€์ ธ์˜ค๊ธฐ(json ์ด์šฉ)

๋‹จ์ ) ํ™˜๊ฒฝ๋ณ€์ˆ˜๋กœ ๋“ฑ๋ก์ด ๋œ Variable, Connection์ •๋ณด๋Š” ์•Œ ์ˆ˜ ์—†๋‹ค.

AIRFLOW_HOME\variables.json

AIRFLOW_HOME\connections.json ์œ„์น˜์— ์ €์žฅ๋œ๋‹ค.

airflow variables export variables.json
airflow variables import variables.json
airflow connections export connections.json
airflow connections import connections.json

 

 

โ…ค. ์ˆ™์ œ

1. ํ™œ์„ฑํ™”๋˜์–ด์žˆ๋Š” DAGS ๋ชฉ๋ก์„ ์ฐพ๋Š” ํŒŒ์ด์ฌ ์ฝ”๋“œ ์ž‘์„ฑํ•˜๊ธฐ

get_active_dags.py

import requests
from requests.auth import HTTPBasicAuth

url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow"))
for d in dags.json()["dags"]:
    if not d["is_paused"]:
        print(d["dag_id"])

 

 

2. config api ๋ณด์•ˆ ๋ฌธ์ œ ํ•ด๊ฒฐํ•˜๊ธฐ

1) airflow.cfg

[webserver]
expose_config="True"

 

2) docker-compose.yaml : up ํ•  ๋•Œ cfg expose_config ๋ณ€์ˆ˜๋ฅผ true๋กœ ๋ฎ์–ด์“ฐ๊ธฐ ์‹คํ–‰.  

x-airflow-common:
 &airflow-common
 …
 environment:
 &airflow-common-env
 AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'

 

3. variables API๋Š” ํ™˜๊ฒฝ๋ณ€์ˆ˜๋กœ ์ง€์ •๋œ ๊ฒƒ๋„ ๋ฆฌํ„ดํ•˜๋Š”์ง€ ํ™•์ธ

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables

 

yaml ํŒŒ์ผ์—์„œ ํ™˜๊ฒฝ๋ณ€์ˆ˜๋กœ ์ง€์ •๋œ ๊ฒƒ์€ ๋ฆฌํ„ดํ•˜์ง€ ์•Š๋Š”๋‹ค.

728x90