Airflow 기본적인 Operator
2022. 11. 14. 19:05ㆍBook/Apache Airflow 기반의 데이터 파이프라인
728x90
반응형
BashOperator
Bash shell 스크립트를 실행하는 오퍼레이터. 리눅스 명령어, 프로그램 실행이 가능함
# 예시
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
dag_id="listing_4_01",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@hourly",
)
get_data = BashOperator(
task_id="get_data",
# bash 명령어 입력
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
# jinja 템플릿을 활용해 인수를 전달할 수 있다.
# 모든 오퍼레이터의 template_fields 속성에 의해 설정
# 예시: https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/operators/bash.html#BashOperator.template_fields
"{{ execution_date.year }}/"
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}"
"{{ '{:02}'.format(execution_date.day) }}-"
"{{ '{:02}'.format(execution_date.hour) }}0000.gz"
),
dag=dag,
)
PythonOperator
파이썬 코드를 실행하기 위한 오퍼레이터
def _delete(**context):
result = context["task_instance"].xcom_pull(task_ids='union')
if result != "이미 합쳐짐":
args = f"hdfs dfs -rm -r /user/jjwani/FIFA4/data/match/{datetime.now().date() - timedelta(days=1)}/temporary"
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
s_output, s_err = proc.communicate()
return print(s_output)
else:print('이미 합쳐짐')
delete = PythonOperator(
task_id="delete",
python_callable=_delete,
dag=dag
)
DummyOperator
아무 작업도 하지 않는 연산자로 여러 다른 작업들을 그룹화 하는데 사용
start = DummyOperator(task_id = 'start', dag = dag)
728x90
반응형
'Book > Apache Airflow 기반의 데이터 파이프라인' 카테고리의 다른 글
Airflow XCom (0) | 2022.11.30 |
---|---|
Airflow 의존성 (0) | 2022.11.30 |
Airflow 스케줄링 (0) | 2022.11.14 |
Airflow 시작 (0) | 2022.09.03 |
Airflow 소개 (0) | 2022.09.02 |