Airflow 기본적인 Operator

2022. 11. 14. 19:05Book/Apache Airflow 기반의 데이터 파이프라인

728x90
반응형

BashOperator

Bash shell 스크립트를 실행하는 오퍼레이터. 리눅스 명령어, 프로그램 실행이 가능함

# 예시
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    dag_id="listing_4_01",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@hourly",
)

get_data = BashOperator(
    task_id="get_data",
		# bash 명령어 입력
    bash_command=(
        "curl -o /tmp/wikipageviews.gz "
        "https://dumps.wikimedia.org/other/pageviews/"
        # jinja 템플릿을 활용해 인수를 전달할 수 있다.
        # 모든 오퍼레이터의 template_fields 속성에 의해 설정
				# 예시: https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/operators/bash.html#BashOperator.template_fields
        "{{ execution_date.year }}/"
        "{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
        "pageviews-{{ execution_date.year }}"
        "{{ '{:02}'.format(execution_date.month) }}"
        "{{ '{:02}'.format(execution_date.day) }}-"
        "{{ '{:02}'.format(execution_date.hour) }}0000.gz"
    ),
    dag=dag,
)

PythonOperator

파이썬 코드를 실행하기 위한 오퍼레이터

def _delete(**context):
    result = context["task_instance"].xcom_pull(task_ids='union')
    if result != "이미 합쳐짐":
        args = f"hdfs dfs -rm -r /user/jjwani/FIFA4/data/match/{datetime.now().date() - timedelta(days=1)}/temporary"
        proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        s_output, s_err = proc.communicate()
        return print(s_output)
    else:print('이미 합쳐짐')


delete = PythonOperator(
task_id="delete",
python_callable=_delete,
dag=dag
)

DummyOperator

아무 작업도 하지 않는 연산자로 여러 다른 작업들을 그룹화 하는데 사용

start = DummyOperator(task_id = 'start', dag = dag)

 

728x90
반응형

'Book > Apache Airflow 기반의 데이터 파이프라인' 카테고리의 다른 글

Airflow XCom  (0) 2022.11.30
Airflow 의존성  (0) 2022.11.30
Airflow 스케줄링  (0) 2022.11.14
Airflow 시작  (0) 2022.09.03
Airflow 소개  (0) 2022.09.02