Airflow에는 4가지 종류의 로그가 존재한다.
Task/Scheduler/WebServer/Worker
하지만 Task 로그를 제외하고는
정작 Airflow에 문제가 발생했을 때, 내가 원하는 로그가 대체 어디에 출력되는 건지 알기가 힘들다.
이에 각 상황별 로그가 어디로 흘러가는지에 대해 정리하고자 한다.
Airflow Architecture에 대한 이해
이를 위해서 우선적으로 해야할 것은 Airflow를 이해하는 것이다.
Web Server
- Web Server는 Airflow의 UI(User Interface)로, 사용자가 DAG 및 태스크의 상태를 모니터링하고 관리
- 이를 통해 DAG을 수동으로 트리거하거나, 실패한 태스크를 재실행하고, 로그를 확인하는 등의 작업이 가능
- 웹 서버는 Flask 프레임워크를 기반으로 하며, HTTP 요청을 통해 유저와 상호작용
Scheduler
- Scheduler는 Airflow의 핵심 컴포넌트 중 하나로, DAG와 태스크의 실행을 관리하고 트리거
- DAG 파일을 주기적으로 읽어 DAG과 태스크의 상태를 점검하고, 실행이 필요한 태스크들을 큐에 넣어줌
- 기본적으로는 순차적인 DAG 실행을 지원하지만, 필요에 따라 동시 실행을 관리하는 등 복잡한 스케줄링을 지원
Executor
- Executor는 Scheduler가 큐에 넣은 태스크를 실제로 실행하는 역할
- Airflow는 다양한 Executor를 지원하며, 각각의 환경에 따라 적합한 Executor를 선택 가능
- AWS 서비스인 MWAA의 경우 SQS 기반의 CeleryQueue를 이용
Worker
- Worker는 실제로 태스크를 수행하는 프로세스입니다. CeleryExecutor나 KubernetesExecutor와 같은 분산형 Executor를 사용할 때 활성화
- Worker는 각 태스크의 실행을 담당하고, 완료되면 결과를 Scheduler에게 전달
- 이 구성은 필요에 따라 Worker의 수를 확장하거나 축소할 수 있어 확장성이 높음
Metadata Database
- Metadata Database는 Airflow의 모든 상태 정보와 메타데이터를 저장하는 데이터베이스
- 여기에는 DAG, 태스크, 실행 이력, 스케줄 정보, 로그 등이 저장되며, PostgreSQL이나 MySQL과 같은 RDBMS를 주로 사용
- Scheduler와 Web Server는 이 데이터베이스와 상호작용하여 필요한 정보를 주고받고, 작업 진행 상태를 갱신합니다.
Dags Folder
- DAGs Folder는 사용자가 정의한 DAG 파일들이 저장되는 디렉토리
- Airflow는 정기적으로 이 폴더를 스캔하여 DAG 정의 파일을 읽고 스케줄링
- 이 파일들은 Python 코드로 작성되며, DAG의 구성, 태스크, 의존성 등이 정의
Airflow Architecture의 동작원리
Dag
Airflow를 자세히보면 다음과 같은 단계로 나눌 수 있다.
Scheduler가 DAG 파일을 스캔하고 DAG 및 태스크를 스케줄링
1. DAGs Folder 스캔
- DAGs Folder는 Airflow 시스템에서 DAG 파일을 저장하는 기본 디렉토리.
이 디렉토리는 airflow.cfg의 dags_folder 설정을 통해 지정 - Scheduler는 DAGs Folder를 주기적으로 스캔하여 새로운 DAG 파일이나 변경된 파일이 있는지 확인
이 스캔은 일정한 시간 간격으로 수행되며, scheduler 설정의 min_file_process_interval 값으로 조정 - 스캔 과정에서는 각 DAG 파일을 파싱하고, DAG의 구조와 태스크 간 의존성을 분석하여 메모리에 저장
메모리에 적재된 DAG 정보는 이후 스케줄링에 사용
2. 새로운 DAG 및 태스크의 인스턴스 생성
- DAG 파일을 스캔하여 새로운 DAG을 발견하면, Scheduler는 이 DAG에 정의된 태스크의 실행 일정을 확인
- 각 태스크는 정의된 스케줄에 따라 실행되어야 하므로,
Scheduler는 DAG과 태스크의 start_date, end_date, schedule_interval 등의 파라미터를 기반으로 다음 실행 시점을 결정 - 만약 현재 시간에 실행할 새로운 태스크가 있다면, Scheduler는 해당 태스크의 Task Instance를 생성
이 Task Instance는 특정 실행 날짜와 연결된 태스크의 구체적인 실행 인스턴스이며, 태스크가 언제 실행되어야 하고, 어떤 상태인지를 나타냄
3. Task Queue에 태스크를 등록
- Scheduler는 실행할 Task Instance를 Task Queue에 등록
이때 Task Queue는 CeleryExecutor나 KubernetesExecutor와 같은 Executor가 처리할 수 있도록 설정 - 각 Task Instance는 스케줄러가 지정한 큐로 등록되며,
이후 큐에 있는 태스크는 해당 Executor의 Worker가 가져가 실행 - 이 과정에서 태스크의 상태는 queued로 업데이트되며, 큐에 등록된 태스크는 곧 Worker에 의해 실행될 준비
4. 태스크 의존성 및 DAG 실행 흐름 관리
- DAG 내 태스크는 서로 의존성이 있을 수 있기 때문에, Scheduler는 이를 고려하여 실행 순서를 관리
- 예를 들어, 태스크 B가 태스크 A의 실행 후에만 실행될 수 있도록 DAG에 정의되어 있다면, Scheduler는 A가 성공적으로 완료될 때까지 B를 대기 상태로 유지
- 태스크의 상태(성공, 실패 등)를 지속적으로 확인하며, 각 태스크가 의존성에 맞게 실행되도록 DAG의 전체 실행 흐름을 조정
5. 메타데이터 데이터베이스와의 상호작용
- Scheduler는 DAG과 태스크의 스케줄링 정보를 메타데이터 데이터베이스에 저장
이 데이터베이스는 DAG과 태스크의 상태, 스케줄링 정보, 실행 기록 등을 저장하는 중앙 데이터베이스 - 데이터베이스에 저장된 정보를 통해 Web Server는 DAG의 현재 상태를 실시간으로 확인하고 사용자에게 표시
- 이를 통해 사용자나 관리자도 웹 UI를 통해 DAG의 실행 상태를 확인하고,
현재 스케줄링된 태스크와 실행 로그를 쉽게 모니터링
Scheduler가 태스크를 큐에 넣고, Executor가 큐에 있는 태스크를 Worker에게 할당
Worker가 태스크를 받아 실제로 작업을 수행
태스크 실행 후 결과가 Scheduler로 전달되고 상태가 Metadata Database에 저장
Web Server를 통해 UI에서 DAG 및 태스크의 상태를 모니터링하고 관리
출처
'Data' 카테고리의 다른 글
트랜잭션 격리 수준 / Transaction Isolation Level (0) | 2024.07.07 |
---|---|
Spring JPA Transactional과 Transaction Isolation Level 격리수준 실습 (1) | 2024.06.21 |
데이터베이스 정규화 - 제1, 제2, 제3정규형과 BCNF (RDB, Database Normalization) (0) | 2022.11.03 |
Airflow 정리 execution_date, data_interval_start, logical_date, start_date (0) | 2022.11.03 |