DAG 1. 선수 데이터 DAG
2023. 1. 2. 21:51ㆍProject/FIFA Online 4 데이터 Airflow로 ETL 구성
728x90
반응형
라이브러리 불러오기
import requests
import pandas as pd
import io
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from datetime import datetime
DAG 구성
매달마다 새로운 시즌이 나오기 때문에 한 달에 한 번 돌리도록 DAG를 구성
dag = DAG(
dag_id = "player_data_write",
description = "Player Data Crolling",
start_date = datetime.now().date(),
schedule_interval = "0 0 25 * *"
)
Task 1 ( start )
DummyOperator
아무 작업도 하지 않는 연산자로 여러 다른 작업들을 그룹화 하는데 사용
start = DummyOperator(task_id = 'start', dag = dag)
Task 2 ( season_class_crolling )
PythonOperator
Python Source를 실행하기 위한 Operator
def _Season_Class_Crolling():
season_class = {}
headers = {'Authorization' : '='}
season_url = "https://static.api.nexon.co.kr/fifaonline4/latest/seasonid.json"
data = requests.get(season_url, headers=headers)
season = data.json()
for s in season:
season_class[s["seasonId"]] = s["className"].split(" (")[0]
return season_class
season_class_crolling = PythonOperator(
task_id="season_class_crolling",
python_callable=_Season_Class_Crolling,
dag=dag
)
Task 3 ( player_data_write )
XCOM
- Xcom은 DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용되는데, Xcom은 DAG내에서만 공유할 수 있는 변수라는 점이다.
- Xcom을 이용해 데이터를 전달하는 경우 DataFrame이나 많은 양의 데이터를 전달하는 것은 지원하지 않으며, 소량의 데이터만 전달하는 것을 권장한다.
- ti = task_instance 로 축약 가능
XCOM 사용 방법
- PythonOperator return 값을 이용한 Xcom 사용
- push-pull을 이용한 Xcom 사용
- Jinja templates을 이용한 Xcom 사용
def _player_data_write(**context):
headers = {'Authorization' : '='}
player_url = "https://static.api.nexon.co.kr/fifaonline4/latest/spid.json"
data = requests.get(player_url, headers=headers)
player = data.json()
player_data =[]
season_class = context['ti'].xcom_pull(task_ids='season_class_crolling')
for p in player:
player_data.append({'spid' : p["id"], 'name' : p["name"], 'season_class_name' : season_class[str(int(p["id"])//1000000)]})
return player_data
player_data_write = PythonOperator(
task_id="player_data_write",
python_callable=_player_data_write,
dag=dag
)
Task 4 ( upload )
io.BytesIO()
바이트 배열을 이진 파일로 다룰 수 있게 해주는 클래스
buffer.seek(0, 0)
buffer의 첫번째 위치로 설정
S3Hook
hook.load_file_obj()
- file_obj : 파일
- key : 저장하고자하는 파일위치와 이름
- bucket_name : S3의 버킷이름
- replace : 덮어쓰기
def _upload_to_s3(**context):
hook = S3Hook('fifaonline4')
player_data = context['ti'].xcom_pull(task_ids='player_data_write')
df = pd.DataFrame(player_data)
buffer = io.BytesIO()
df.to_parquet(buffer, index=False)
buffer.seek(0, 0)
hook.load_file_obj(file_obj = buffer, key='data/player_data/player_data_df.parquet', bucket_name='fifaonline4', replace=True)
upload = PythonOperator(
task_id = 'upload',
python_callable = _upload_to_s3,
dag = dag
)
Task 5 ( end )
이메일 전송으로 수정 예정
start = DummyOperator(task_id = 'start', dag = dag)
728x90
반응형
'Project > FIFA Online 4 데이터 Airflow로 ETL 구성' 카테고리의 다른 글
[Mac] HDFS에 있는 Parquet이 파일 읽기 (0) | 2023.03.29 |
---|---|
[Mac] NameNode 나 DataNode가 실행되지 않을때 (0) | 2023.03.22 |
FIFA Online 4 데이터 수집 (0) | 2023.01.02 |