Data

Airflow 정리 execution_date, data_interval_start, logical_date, start_date

jw92 2022. 11. 3. 11:13

airflow 사용 자체는 어렵지 않다.

용어나 date 처리 등에 대해서만 정리한다.

 

start_date: DAG가 시작되는 기준 시점. 해당 시점을 기준으로 interval적용

execution_date: Task가 실행되는 시점이 아닌 Time Window의 시작 지점.

schedule_interval: data처리 간격. cron 또는 date형식

(Timetable 형식이 도입되었다. https://airflow.apache.org/docs/apache-airflow/stable/concepts/timetable.html)

 

예를 들어, interval이 하루인 경우, execution_date = 2022-01-01 0시 인 작업은 2022-01-02 0시에 시작한다.

 

이러한 개념이 혼란을 주기 쉽기 떄문에,

 

airflow AIP 39 버전(2.2)에서 execution_date를 deprecated 시키고, 다음과 같은 용어를 사용한다.

data_interval_start, data_interval_end, logical_date

 

하위 호환성을 위해 execution_date는 그대로 사용할 수 있지만,

execution_date대신 logical_date라는 용어를 사용한다.

// 즉, 아래와 같이 변경된다.
execution_date -> data_interval_start, logical_date
next_execution_date -> next_logical_date, data_interval_end
prev_execution_date -> prev_logical_date

또한, tomorrow_ds, tomorrow_ds_nodash, yestertday_ds, yestertday_ds_nodash는 derpecated 되었다.

(맨 아래 출처 airflow 공식 문서 참고)

 

이를 그림으로 보면 아래와 같다.

 

2022-01-03 0시 ~ 2022-01-04 0시 데이터를 처리하는 Task

 

 

위에 사용한 용어들을 기반으로 실제 코드에서 date 값을 형식에 맞게 가져올 수 있다.

 

pendulum.DateTime 형식으로 가져오기 위해서는 interval_start = {{ data_interval_start }}

 

string 형식으로 가져오기 위해서는

date 변환 {{ macros.datetime.strftime(date_interval_start, '%Y%m%d') }}

date 시간 추가 {{ macros.datetime.strftime(date_interval_start + macros.timedelta(hours=8), '%Y%m%d:%H%M%s) }}

// 예를 들어, python에 아래와 같이 작성하기만 하면 된다.
interval_start = {{ macros.datetime.strftime(date_interval_start, '%Y%m%d') }}
after_eight_hours = {{ macros.datetime.strftime(date_interval_start + macros.timedelta(hours=8), '%Y%m%d:%H%M%s) }}

// pendulum.DateTime 형식(data type)으로 가져오기 위해서는 아래와 같이 수행한다.
interval_start = {{ data_interval_start }}

 

출처:

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html