DAG 1. 선수 데이터 DAG

2023. 1. 2. 21:51Project/FIFA Online 4 데이터 Airflow로 ETL 구성

728x90
반응형

라이브러리 불러오기

import requests
import pandas as pd
import io
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from datetime import datetime

DAG 구성

매달마다 새로운 시즌이 나오기 때문에 한 달에 한 번 돌리도록 DAG를 구성

dag = DAG(
    dag_id = "player_data_write",
    description = "Player Data Crolling",
    start_date = datetime.now().date(),
    schedule_interval = "0 0 25 * *"
)

Task 1 ( start )

DummyOperator

아무 작업도 하지 않는 연산자로 여러 다른 작업들을 그룹화 하는데 사용

start = DummyOperator(task_id = 'start', dag = dag)

Task 2 ( season_class_crolling )

PythonOperator

Python Source를 실행하기 위한 Operator

def _Season_Class_Crolling():
    season_class = {}
    headers = {'Authorization' : '='} 
    season_url = "https://static.api.nexon.co.kr/fifaonline4/latest/seasonid.json"
    data = requests.get(season_url, headers=headers)
    season = data.json()
    for s in season:
        season_class[s["seasonId"]] = s["className"].split(" (")[0]
    return season_class

season_class_crolling = PythonOperator(
    task_id="season_class_crolling",
    python_callable=_Season_Class_Crolling,
    dag=dag
)

Task 3 ( player_data_write )

XCOM

- Xcom은 DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용되는데, Xcom은 DAG내에서만 공유할 수 있는 변수라는 점이다.

- Xcom을 이용해 데이터를 전달하는 경우 DataFrame이나 많은 양의 데이터를 전달하는 것은 지원하지 않으며, 소량의 데이터만 전달하는 것을 권장한다.

- ti = task_instance 로 축약 가능

XCOM 사용 방법

  • PythonOperator return 값을 이용한 Xcom 사용
  • push-pull을 이용한 Xcom 사용
  • Jinja templates을 이용한 Xcom 사용
def _player_data_write(**context):
    headers = {'Authorization' : '='} 
    player_url = "https://static.api.nexon.co.kr/fifaonline4/latest/spid.json"
    data = requests.get(player_url, headers=headers)
    player = data.json()
    player_data =[]
    season_class = context['ti'].xcom_pull(task_ids='season_class_crolling')
    for p in player:
        player_data.append({'spid' : p["id"], 'name' : p["name"], 'season_class_name' : season_class[str(int(p["id"])//1000000)]}) 	

    return player_data

player_data_write = PythonOperator(
    task_id="player_data_write", 
    python_callable=_player_data_write, 
    dag=dag
)

Task 4 ( upload )

io.BytesIO()

바이트 배열을 이진 파일로 다룰 수 있게 해주는 클래스

buffer.seek(0, 0)

buffer의 첫번째 위치로 설정

S3Hook

hook.load_file_obj()

  • file_obj : 파일
  • key : 저장하고자하는 파일위치와 이름
  • bucket_name : S3의 버킷이름
  • replace : 덮어쓰기
def _upload_to_s3(**context):
    hook = S3Hook('fifaonline4')
    player_data = context['ti'].xcom_pull(task_ids='player_data_write')
    df = pd.DataFrame(player_data)  
    buffer = io.BytesIO()
    df.to_parquet(buffer, index=False)
    buffer.seek(0, 0)
    hook.load_file_obj(file_obj = buffer, key='data/player_data/player_data_df.parquet', bucket_name='fifaonline4', replace=True)

upload = PythonOperator(
    task_id = 'upload',
    python_callable = _upload_to_s3,
    dag = dag
)

Task 5 ( end )

이메일 전송으로 수정 예정

start = DummyOperator(task_id = 'start', dag = dag)
728x90
반응형