airflow 사용 자체는 어렵지 않다.
용어나 date 처리 등에 대해서만 정리한다.
start_date: DAG가 시작되는 기준 시점. 해당 시점을 기준으로 interval적용
execution_date: Task가 실행되는 시점이 아닌 Time Window의 시작 지점.
schedule_interval: data처리 간격. cron 또는 date형식
(Timetable 형식이 도입되었다. https://airflow.apache.org/docs/apache-airflow/stable/concepts/timetable.html)
예를 들어, interval이 하루인 경우, execution_date = 2022-01-01 0시 인 작업은 2022-01-02 0시에 시작한다.
이러한 개념이 혼란을 주기 쉽기 떄문에,
airflow AIP 39 버전(2.2)에서 execution_date를 deprecated 시키고, 다음과 같은 용어를 사용한다.
data_interval_start, data_interval_end, logical_date
하위 호환성을 위해 execution_date는 그대로 사용할 수 있지만,
execution_date대신 logical_date라는 용어를 사용한다.
// 즉, 아래와 같이 변경된다.
execution_date -> data_interval_start, logical_date
next_execution_date -> next_logical_date, data_interval_end
prev_execution_date -> prev_logical_date
또한, tomorrow_ds, tomorrow_ds_nodash, yestertday_ds, yestertday_ds_nodash는 derpecated 되었다.
(맨 아래 출처 airflow 공식 문서 참고)
이를 그림으로 보면 아래와 같다.
위에 사용한 용어들을 기반으로 실제 코드에서 date 값을 형식에 맞게 가져올 수 있다.
pendulum.DateTime 형식으로 가져오기 위해서는 interval_start = {{ data_interval_start }}
string 형식으로 가져오기 위해서는
date 변환 {{ macros.datetime.strftime(date_interval_start, '%Y%m%d') }}
date 시간 추가 {{ macros.datetime.strftime(date_interval_start + macros.timedelta(hours=8), '%Y%m%d:%H%M%s) }}
// 예를 들어, python에 아래와 같이 작성하기만 하면 된다.
interval_start = {{ macros.datetime.strftime(date_interval_start, '%Y%m%d') }}
after_eight_hours = {{ macros.datetime.strftime(date_interval_start + macros.timedelta(hours=8), '%Y%m%d:%H%M%s) }}
// pendulum.DateTime 형식(data type)으로 가져오기 위해서는 아래와 같이 수행한다.
interval_start = {{ data_interval_start }}
출처:
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
'Data' 카테고리의 다른 글
Airflow 파헤치기 - 아키텍쳐 구조 (0) | 2024.11.13 |
---|---|
트랜잭션 격리 수준 / Transaction Isolation Level (0) | 2024.07.07 |
Spring JPA Transactional과 Transaction Isolation Level 격리수준 실습 (1) | 2024.06.21 |
데이터베이스 정규화 - 제1, 제2, 제3정규형과 BCNF (RDB, Database Normalization) (0) | 2022.11.03 |