본문 바로가기

데이터

[airflow] excution_date, macros를 이용해 DAG 실행 시점 지정하기

execution_date

  • 기준 : execution_date의 기준은 utc
  • 필요성 : datetime.now()로 설정하면 특정일에 dag이 실행되지 않아서 다음날 실행하는 경우,  backfill로 과거 날짜를 실행하는 경우 등에 문제가 될 수 있음 

execution_date에 저장되는 값 

  • 전날의 데이터를 배치하고 싶으면 그냥 execution_date를 사용하면 된다.
  • ds, ds_nodash도 마찬가지 → 전날의 데이터를 가져오기 위해 yesterday_ds를 사용하지 않아도 된다.  

ex) 아래와 같이 설정된 DAG에서 

default_args = {
    'start_date': datetime(2021, 10, 18),
    ...
}
  • → 처음 실행 시점 : 2021-10-18
  • → 그때의 execution_date : 2021-10-17

macros 

https://airflow.apache.org/docs/apache-airflow/1.10.9/macros.html

  • macros를 이용해서 실행 시점에 대한 시각을 원하는 형태로 가져올 수 있다. 
  • 실행 시점에 대한 정보 뿐만 아니라 dag object, params, run_id등 실행하는 dag에 대한 정보를 가져올 수 있다. 

Some Default Variables

PythonOperator에서 사용 방법 

전체 가져오기

  • provide_context를 True로 지정해줘야 한다. 
t1 = PythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    provide_context=True,
    dag=dag)

def temp_def(**kwargs):
    execution_date = kwargs['execution_date']
    ds = kwargs['ds']

특정 값만 args로 가져오기

  • op_args로 가져오고 싶은 값을 받아온다. 
  • op_args에 넣는 값은 BashOperator에서 bash_command설정할 때와 같이 사용하고 {{}}로 감싸준다. 
  • 아래 처럼 단일 변수만 사용할 수도 있고, no_dash = {{execution_date.strftime("%Y%m%d"}}와 같이 메소드를 포함해도 된다. 
ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = PythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)

Airflow Timezone

https://airflow.apache.org/docs/apache-airflow/stable/timezone.html#templates

 

반응형