โ . Redshift, Gsheet ์ฐ๋ํ๊ธฐ
1. GCP ๊ธฐ๋ณธ์ค์
๊ตฌ๊ธ ํด๋ผ์ฐ๋์ ๋ก๊ทธ์ธํ๋ค.
https://console.cloud.google.com/welcome?project=programmers-devcourse-docker
Google ํด๋ผ์ฐ๋ ํ๋ซํผ
๋ก๊ทธ์ธ Google ํด๋ผ์ฐ๋ ํ๋ซํผ์ผ๋ก ์ด๋
accounts.google.com
ํ๋ก์ ํธ๋ฅผ ์์ฑํ๋ค.
API ๋ฐ ์๋น์ค>์ฌ์ฉ์ ์ธ์ฆ ์ ๋ณด>์ฌ์ฉ์ ์ธ์ฆ ์ ๋ณด ๋ง๋ค๊ธฐ>์๋น์ค๊ณ์ ์ ์ ํํ๋ค.
์๋น์ค ๊ณ์ ์ด๋ฆ : Gsheet๋ก ์ ๋ ฅ
์ด ์๋น์ค ๊ณ์ ์ ํ๋ก์ ํธ์ ๋ํ ์ก์ธ์ค ๊ถํ ๋ถ์ฌ : ํธ์ง์๋ฅผ ์ ํํ๋ค.
๊ทธ๋ค์ ๋ค์ API ๋ฐ ์๋น์ค>์ฌ์ฉ์ ์ธ์ฆ ์ ๋ณด ํ์ด์ง์์
์์ ๋ง๋ Gsheet ์๋น์ค๊ณ์ ์ ์ด๋ฉ์ผ์ ์ ํํ๋ค.
ํค>ํค์ถ๊ฐ>JSON ๋ฒํผ์ ๋๋ฌ ์ ํค๋ฅผ ๋ค์ด๋ก๋ํ๋ค.
2๊ฐ์ API๋ฅผ ํ์ฉํ๋ค.API ๋ฐ ์๋น์ค>์ฌ์ฉ ์ค์ ๋ API ๋ฐ ์๋น์ค>API ๋ฐ ์๋น์ค ์ฌ์ฉ ์ค์ ์ ์ ํํ๋ค.
Google Sheets API ์ฌ์ฉ ๋ฒํผ์ ๋๋ฅธ๋ค.
Google Drive API๋ฅผ ํ์ฑํํ๋ค.
๋ง์ฝ ํด๋น API๋ฅผ ํ์ฑํํ์ง ์์ผ๋ฉด ๋ค์๊ณผ ๊ฐ์ ์ค๋ฅ๊ฐ ๋ํ๋๋ค.
์ค๋ฅ๋ด์ฉ
"code": 403,
"message": "Access Not Configured. Drive API has not been used in project {your project} before or it is disabled. Enable it by visiting https://console.developers.google.com/apis/api/drive.googleapis.com/overview?project={your project} then retry. If you enabled this API recently, wait a few minutes for the action to propagate to our systems and retry."
2. Airflow Variable ๋ฑ๋กํ๊ธฐ
key : google_sheet_access_token
Var : 1๋ฒ์์ ๋ค์ด๋ก๋ํ ํค ๋ด์ฉ
์ ์ ๋ ฅํ๋ค.
โ ก. Gsheet๋ด์ฉ Redshfit ํ ์ด๋ธ๋ก ์ ์ฅํ๊ธฐ
1. DAG ๋ง๋ค๊ธฐ : Gsheet_to_Redshift
ํด๋น DAG๋ ์ธ ๊ฐ์ task๋ก ๊ตฌ์ฑ๋์ด ์๋ค.
- download_tab_in_gsheet (PythonOperator) : gsheet ํน์ tab์ ์๋ ์ ๋ณด๋ฅผ csvํ์ผ๋ก ๋ค์ด๋ก๋
download_tab_in_gsheet = PythonOperator(
task_id = 'download_{}_in_gsheet'.format(sheet["table"]),
python_callable = download_tab_in_gsheet,
params = sheet,
dag = dag)
def download_tab_in_gsheet(**context):
url = context["params"]["url"]
tab = context["params"]["tab"]
table = context["params"]["table"]
data_dir = Variable.get("DATA_DIR")
gsheet.get_google_sheet_to_csv(
url,
tab,
data_dir+'{}.csv'.format(table)
)
- copy_to_s3 (PythonOperator) : csvํ์ผ์ AWS S3์ ์ ์ฅ
copy_to_s3 = PythonOperator(
task_id = 'copy_{}_to_s3'.format(sheet["table"]),
python_callable = copy_to_s3,
params = {
"table": sheet["table"],
"s3_key": s3_key
},
dag = dag)
def copy_to_s3(**context):
table = context["params"]["table"]
s3_key = context["params"]["s3_key"]
s3_conn_id = "aws_conn_id"
s3_bucket = "grepp-data-engineering"
data_dir = Variable.get("DATA_DIR")
local_files_to_upload = [ data_dir+'{}.csv'.format(table) ]
replace = True
s3.upload_to_s3(s3_conn_id, s3_bucket, s3_key, local_files_to_upload, replace)
- run_copy_sql (S3ToRedshiftOperator) : AWS S3์ ์ ์ฅ๋ csvํ์ผ Redshift์ ๋ฒํฌ ์ ๋ฐ์ดํธ(COPY)
S3ToRedshiftOperator(
task_id = 'run_copy_sql_{}'.format(sheet["table"]),
s3_bucket = "grepp-data-engineering",
s3_key = s3_key,
schema = sheet["schema"],
table = sheet["table"],
copy_options=['csv', 'IGNOREHEADER 1'],
method = 'REPLACE',
redshift_conn_id = "redshift_dev_db",
aws_conn_id = 'aws_conn_id',
dag = dag
)
โ ข. Redshift ํ ์ด๋ธ ๋ด์ฉ GSheet์ ๋ฐ์ํ๊ธฐ
1. DAG ๋ง๋ค๊ธฐ : SQL_to_Sheet
ํด๋น DAG๋ ํ๋์ task๋ก ๊ตฌ์ฑ๋์ด ์๋ค.
- update_sql_to_sheet1 : Redshift ํ ์ด๋ธ์ ์ ๋ณด๋ฅผ sql์ ์ด์ฉํด์ ์ถ์ถ ํ ๋ค, GSheet์ ๋ฐ์ํ๋ task์ด๋ค.
sheet_update = PythonOperator(
dag=dag,
task_id='update_sql_to_sheet1',
python_callable=update_gsheet,
params = {
"sql": "SELECT * FROM analytics.nps_summary",
"sheetfilename": "spreadsheet-copy-testing",
"sheetgid": "RedshiftToSheet"
}
)
def update_gsheet(**context):
sql = context["params"]["sql"]
sheetfilename = context["params"]["sheetfilename"]
sheetgid = context["params"]["sheetgid"]
gsheet.update_sheet(sheetfilename, sheetgid, sql, "redshift_dev_db")
โ ฃ. Airflow API์ ๋ชจ๋ํฐ๋ง
API๋ฅผ ํ์ฉํ์ฌ Airflow์ ๋ํ ๋ด์ฉ์ ์ป์ ์ ์๋ค.
1. Airflow API ํ์ฑํ
airflow.cfg
[api]
auth_backend = airflow.api.auth.backend.basic_auth
๋ง์ฝ, yamlํ์ผ์ ์์ ํด์ Docker compose up ํ ๋ cfg์ ์ค์ ์ Override ํ๋ ค๋ฉด ๋ค์๊ณผ ๊ฐ์ ๋ฐฉ๋ฒ์ ์ฌ์ฉํ๋ฉด ๋๋ค.
docker-compose.yaml
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
์ ๋๋ก ์ค์ ๋์๋์ง ํ์ธํด ๋ณด์.
docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend
2. ๋ณด์์ ์ํด Airflow์์ ์๋ก์ด ์ฌ์ฉ์๋ฅผ ์์ฑ
airflow ์ฌ์ฉ์๋ ๋ชจ๋ ๊ถํ์ ๋ํด ํ์ฉ๋์ด ์์ผ๋ฏ๋ก ๋ ธ์ถ๋๋ฉด ์์นซ ๋ฌธ์ ๊ฐ ์๊ธธ ์ ์๋ค.
๋ฐ๋ผ์ ๋ ์ ์ ๊ถํ์ ๊ฐ์ง๋ ๋ชจ๋ํฐ๋ง์ ์ํ ์๋ก์ด ์ฌ์ฉ์๋ฅผ ์์ฑํด ๋ณด์.
Airflow Web UI >Security>List Usert>+ ๋ฒํผ์ ์ ํํ๋ค.
3. Airflow API๋ฅผ ํธ์ถํ๊ธฐ
์ฐธ๊ณ ๋ฌธ์
Airflow REST API
airflow.apache.org
1) ํน์ DAG๋ฅผ API๋ก Trigger ํ๊ธฐ
curl -X POST --user "id์
๋ ฅ:password์
๋ ฅ" -H 'Content-Type: application/json' -d
'{"execution_date":"2023-05-24T00:00:00Z"}'
"http://localhost:8080/api/v1/dags/dag์ด๋ฆ/dagRuns"
๊ฒฐ๊ณผ : ์ด๋ฏธ ์ฑ๊ณต์ ์ผ๋ก ์คํ๋์๋ DAG
{
"detail": "DAGRun with DAG ID: 'HelloWorld' and DAGRun logical date: '2023-05-24 00:00:00+00:00'
already exists",
"status": 409,
"title": "Conflict",
"type":
"https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/AlreadyExists"
}
2) ๋ชจ๋ DAG ๋ฆฌ์คํธ ํ๊ธฐ
curl -X GET --user "id์
๋ ฅ:password์
๋ ฅ" http://localhost:8080/api/v1/dags
๊ฒฐ๊ณผ : ๋ชจ๋ DAG์ ๋ํ ์ ๋ณด๋ฅผ ์๋ ค์ค๋ค.
{
"dag_id": "SQL_to_Sheet",
"default_view": "grid",
"description": null,
"file_token": "...",
"fileloc": "/opt/airflow/dags/SQL_to_Sheet.py",
"has_import_errors": false,
"has_task_concurrency_limits": false,
"is_active": true,
"is_paused": true,
"is_subdag": false,
"last_expired": null,
"last_parsed_time": "2023-06-18T05:21:34.266335+00:00",
"last_pickled": null,
"max_active_runs": 16,
"max_active_tasks": 16,
"next_dagrun": "2022-06-18T00:00:00+00:00",
"next_dagrun_create_after": "2022-06-18T00:00:00+00:00",
"next_dagrun_data_interval_end": "2022-06-18T00:00:00",
"next_dagrun_data_interval_start": "2022-06-18T00:00:00",
"owners": [ "airflow" ],
"pickle_id": null,
"root_dag_id": null,
"schedule_interval": {
"__type": "CronExpression",
"value": "@once"
},
"scheduler_lock": null,
"tags": [ { "name": "example" }],
"timetable_description": "Once, as soon as possible"
}
3) ๋ชจ๋ Variable ๋ฆฌ์คํธ ํ๊ธฐ
curl -X GET --user "id์
๋ ฅ:password์
๋ ฅ" http://localhost:8080/api/v1/variables
๊ฒฐ๊ณผ : ๋ชจ๋ Variable์ ๋ํ ์ ๋ณด๋ฅผ ์๋ ค์ค๋ค.
{
"total_entries": 7,
"variables": [
{
"description": null,
"key": "api_token",
"value": "12345667"
},
{
"description": null,
"key": "csv_url",
"value": "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
}
4) ๋ชจ๋ Config ๋ฆฌ์คํธ ํ๊ธฐ
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
๊ฒฐ๊ณผ : ๋ชจ๋ Config ์ ๋ณด๋ฅผ ์๋ ค์ฃผ์ง ์๊ณ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ค.(๋ณด์๋ฌธ์ )
{
"detail": "Your Airflow administrator chose not to expose the configuration, most likely for security
reasons.",
"status": 403,
"title": "Forbidden",
"type":
"https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/PermissionDe
nied"
}
Q : Config์ ๋ํ ์ ๋ณด๋ ๊ธฐ๋ณธ์ผ๋ก ๋งํ ์๋ค. ์ด๋ป๊ฒ ํ์ด์ค ์ ์์๊น?
5) airflow health ์ ๋ณด ํธ์ถํ๊ธฐ
/health API : Airflow์ metadb, scheduler์ ๋ํ ์ ๋ณด๋ฅผ ๋ํ๋ธ๋ค.
curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health
๊ฒฐ๊ณผ
{"metadatabase": {"status": "healthy"},
"scheduler": {"latest_scheduler_heartbeat": "2024-01-04T15:45:24.935355+00:00", "status": "healthy"}}
5. cmd์์ Variable, Connection ์ ๋ณด ์ค์ ๋ฐ ๊ฐ์ ธ์ค๊ธฐ(json ์ด์ฉ)
๋จ์ ) ํ๊ฒฝ๋ณ์๋ก ๋ฑ๋ก์ด ๋ Variable, Connection์ ๋ณด๋ ์ ์ ์๋ค.
AIRFLOW_HOME\variables.json
AIRFLOW_HOME\connections.json ์์น์ ์ ์ฅ๋๋ค.
airflow variables export variables.json
airflow variables import variables.json
airflow connections export connections.json
airflow connections import connections.json
โ ค. ์์
1. ํ์ฑํ๋์ด์๋ DAGS ๋ชฉ๋ก์ ์ฐพ๋ ํ์ด์ฌ ์ฝ๋ ์์ฑํ๊ธฐ
get_active_dags.py
import requests
from requests.auth import HTTPBasicAuth
url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow"))
for d in dags.json()["dags"]:
if not d["is_paused"]:
print(d["dag_id"])
2. config api ๋ณด์ ๋ฌธ์ ํด๊ฒฐํ๊ธฐ
1) airflow.cfg
[webserver]
expose_config="True"
2) docker-compose.yaml : up ํ ๋ cfg expose_config ๋ณ์๋ฅผ true๋ก ๋ฎ์ด์ฐ๊ธฐ ์คํ.
x-airflow-common:
&airflow-common
…
environment:
&airflow-common-env
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'
3. variables API๋ ํ๊ฒฝ๋ณ์๋ก ์ง์ ๋ ๊ฒ๋ ๋ฆฌํดํ๋์ง ํ์ธ
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
yaml ํ์ผ์์ ํ๊ฒฝ๋ณ์๋ก ์ง์ ๋ ๊ฒ์ ๋ฆฌํดํ์ง ์๋๋ค.