Airflow를 사용할 때 DAG(Directed Acyclic Graph)를 수정하는 과정에서 현재 실행 중인 작업에 어떤 영향이 있는지 그리고 코드 변경 시 발생할 수 있는 잠재적인 문제들을 어떻게 방지할 수 있는지에 대해 알아본다.
1. 현재 실행 중인 DAG를 수정하면 영향이 있을까?
현재 DAG Run에 영향 미치지 않음
- 현재 실행 중인 DAG Run: 이미 시작된 DAG Run은 그 시점에 파싱된 DAG 정의를 기반으로 Instance가 생성되어 실행된다. 따라서 DAG 파일을 수정해도 수정 전의 DAG를 기반으로 Inscance가 생성되었기 때문에 현재 실행 중인 DAG Run에는 전혀 영향을 주지 않는다. 그러므로 다음 작업 스케줄에만 영향 있고 현재 스케줄은 영향 없다.
- 수정된 내용의 적용 시점: DAG 파일이 수정되면 Airflow는 주기적으로 파일을 재파싱하여 변경 사항을 인식한다. 이후에 스케줄되는 DAG Run부터 새로운 정의가 적용된다.
* Airflow는 기본적으로 30초마다 DAG 파일을 재파싱한다.(min_file_process_interval 설정에 따라 다름).
2. 만약 코드 변경 시점과 작업 스케줄 시간이 우연히 겹친다면 어떻게 될까?
이런 상황에서 Airflow의 작동 방식을 이해하면 문제가 발생하지 않음을 알 수 있다.
Airflow의 동작 원리
- DAG 파싱 시점의 코드 사용: Airflow 스케줄러는 일정한 주기로 DAG 파일을 파싱하기 때문에 이때의 DAG 정의를 기반으로 DAG Run Instance와 Task Instance를 생성한다.
- 원자성 보장: DAG 파싱은 파싱 중 코드가 변경되어도 에어플로우는 변경된 것을 인지하지 못한 상태로 현재 파싱 과정에는 영향을 주지 않고 다음 파싱 주기가 되어야 반영이 된다.
발생할 수 있는 상황
- 코드 변경 전에 파싱된 경우
- 스케줄러가 코드 변경 이전의 DAG 파일을 파싱하여 DAG Run을 생성한다.
- 이 경우 이전 코드로 작업이 실행된다.
- 코드 변경 후에 파싱된 경우:
- 스케줄러가 변경된 DAG 파일을 파싱하여 새로운 DAG Run을 생성한다.
- 이 경우 수정된 코드로 작업이 실행된다.
* DAG Run은 파싱 시점의 코드 상태를 기준으로 실행되므로 코드가 섞일 가능성은 없다.
주의해야 할 사항
매우 희박하지만 vi, vim 등 편집기를 이용할 경우 파일 저장 중간 상태를 파싱할 가능성이 있다.
Vim 저장 방식과 중간 상태의 영향
Vim에서 파일을 저장하는 과정은 다음과 같다.
- 메모리 버퍼의 내용을 임시 파일에 기록.
- 기존 파일 삭제.
- 임시 파일을 기존 파일 이름으로 변경.
이 과정이 매우 빠르게 이루어지지만 파일이 쓰이는 순간에 Airflow가 해당 파일을 파싱하려고 시도하면 문제가 발생할 수 있다.
가능성 낮은 이유
- Vim의 저장 속도: Vim은 파일 저장을 밀리초(ms) 단위로 처리하며 저장 완료 전까지 파일은 완전히 쓰이지 않는다.
- Airflow의 파싱 주기: Airflow는 DAG 디렉토리를 매 초마다 스캔하지 않는다. 기본적으로 scheduler는 파일 변경을 감지한 후 파싱을 수행하므로 파일 저장 중에 정확히 충돌할 확률은 매우 낮다.
하지만 불가능하지 않은 이유
- 운 나쁜 타이밍: Airflow가 파일을 읽으려는 시점과 Vim이 파일을 쓰는 시점이 겹칠 경우 Airflow가 불완전한 파일을 읽어 Syntax Error를 발생시킬 수 있다.
희박한 확률로 파일 저장 중간 상태를 파싱해도 결국 파일이 완전히 저장되면 다음 파싱 주기에서 Airflow는 새로 파일을 읽으므로 이전 파싱 오류(Syntax Error 등)는 고쳐진다.
3. 예상치 못한 장애로 서버가 갑자기 재시작되면 DAG Run은 어떻게 될까?
서버가 갑자기 재시작되면 메모리에 있던 DAG RunInstance와 Task Instance는 소멸된다. 하지만 Airflow는 메타데이터 데이터베이스에 모든 DAG Run Instance와 Task Instance의 상태를 저장하고 있기 때문에 서버가 다시 시작되면 이 정보를 바탕으로 DAG Run을 다시 생성하고 재시작할 수 있다.
서버 재시작 전 상태 저장
- DAG Run과 Task Instance의 모든 상태 정보는 메타데이터 데이터베이스에 저장된다. ( 메모리의 상태를 실시간으로 DB에 저장 및 업데이트 )
- 특정 Task Instance가 실행 중이었다면 그 상태(예: "실행 중")가 데이터베이스에 기록된다.
서버 재시작 후
- Airflow 스케줄러가 다시 시작되면 메타데이터 데이터베이스에서 이전에 저장된 DAG Run과 Task Instance 정보를 읽어온다.
- 스케줄러는 이 정보를 기반으로 DAG Run과 Task Instance를 재생성하고 필요한 작업을 재시작하거나 재할당한다.
- 예를 들어 실행 중이던 Task Instance가 "실행 중" 상태로 데이터베이스에 기록되어 있다면 스케줄러는 이를 인식하고 해당 Task를 다시 큐에 넣어 실행을 계속한다.
4. 서버 재시작 시 실행 중이던 외부 리소스(DB 커넥션 등) 는 어떻게 될까?
DAG Run과 Task Instance는 메타데이터 데이터베이스에 상태를 저장하여 서버 재시작 후에도 작업을 다시 시도할 수 지만 데이터베이스(DB) 커넥션과 같은 외부 리소스는 메모리에만 존재했다가 서버가 재기동되면 소멸되기 때문에 롤백 처리나 데이터의 중복 처리를 방지하는 추가적인 관리가 필요하다.
- 트랜잭션 관리
- 각 작업에서 트랜잭션을 시작하고 작업이 성공적으로 완료되면 커밋한다.
- 오류가 발생하면 트랜잭션을 롤백하여 데이터 일관성을 유지한다.
- 멱등성(Idempotency) 작업 설계
- 동일한 작업을 여러 번 수행해도 결과가 동일하도록 설계하여 데이터 중복을 방지한다.
- 재시도 정책 설정
- Airflow의 재시도 메커니즘을 활용하여 작업 실패 시 자동으로 재시도하도록 설정하여 일시적인 네트워크 문제나 서버 재시작 후에도 작업이 안정적으로 완료될 수 있도록 한다.
# 재시도 처리
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
with DAG(
'example_retry_dag',
start_date=days_ago(1),
schedule_interval='@daily',
) as dag:
retry_task = PythonOperator(
task_id='connect_and_execute',
python_callable=connect_and_execute, # 위의 transactional_task 함수 호출
retries=3, # 최대 3회 재시도
retry_delay=timedelta(minutes=5), # 5분 간격으로 재시도
provide_context=True,
)
'Airflow' 카테고리의 다른 글
Airflow PostgreSQL 최대 커넥션 개수 docker-compose.yaml 파일 설정 (2) | 2024.11.17 |
---|---|
Docker에서 Airflow를 설정하고 서버 간 로그 공유하기 (0) | 2024.08.01 |
Airflow 로그와 DAG 설정: 환경 변수 사용 예시 (0) | 2024.07.13 |
파이썬 스크립트 실행을 위한 Airflow DAG 설정하기 (0) | 2024.04.12 |
Airflow 에어플로우 Docker 컨테이너화 (1) | 2024.02.24 |