Code 실전

에이전트 실행 순서, DAG와 위상 정렬로 코드에 새기기

seunghyeonlab 2026. 5. 12. 18:02

hero

Claude Code로 멀티 에이전트 시스템을 짜다 보면 어느 순간 "에이전트가 몇 개 안 되는데 왜 이렇게 엉키지?"라는 벽에 부딪힌다. 순서 문제다. 이 글은 포트폴리오 평가 시스템에서 6개 에이전트를 운영하다 22분짜리 파이프라인을 9분으로 줄인 경험을 다룬다. DAG 선언, 위상 정렬 구현, 그리고 레이스 컨디션 삽질까지 순서대로 정리한다.

전체 실행 흐름 — 직렬에서 DAG 병렬로


왜 순서가 깨지는가

에이전트를 그냥 나열해서 호출하면 된다고 생각했다. 3개까지는 실제로 그게 통한다.

문제는 의존성이 생기는 순간이다. '리포트 생성 에이전트'가 '데이터 수집 에이전트' 결과를 참조해야 하고, '알림 에이전트'는 리포트와 리스크 계산 둘 다 끝난 뒤에야 실행되어야 한다면? 이건 단순한 실행 순서가 아니다. 의존성 그래프 문제다.

Mac Mini 4호 n8n 2.8.4 환경에서 포트폴리오 평가 에이전트 6개를 순차 실행하자 총 소요가 22분이었다. 각 에이전트가 앞 결과를 기다리며 줄 서 있으니 병렬로 돌릴 수 있는 구간도 그냥 직렬로 소모하고 있었던 것이다.

직렬 실행의 문제 — 기다림이 쌓이는 구조


DAG 구조 설계: 의존성을 코드로 선언한다

요리에 비유하면 이렇다. 육수 끓이기와 채소 썰기는 동시에 할 수 있다. 완성 담기는 둘 다 끝나야 가능하다. 에이전트 DAG도 정확히 같은 원리다.

아래가 실전에서 쓰는 의존성 선언 구조다.

tasks = {
    'collect': {'deps': [],               'fn': run_collect},
    'risk':    {'deps': ['collect'],      'fn': run_risk},
    'report':  {'deps': ['collect'],      'fn': run_report},
    'notify':  {'deps': ['risk', 'report'], 'fn': run_notify},
}

notifyriskreport 둘 다 완료돼야 실행된다. riskreportcollect만 끝나면 동시에 출발한다. 딕셔너리 선언 하나가 실행 순서 전체를 결정한다. 에이전트를 추가할 때도 이 딕셔너리만 수정하면 된다.


실행기 구현: 위상 정렬로 순서를 자동 확정

DAG를 선언했으면 실행 순서를 자동으로 뽑아야 한다. 위상 정렬(Topological Sort)이 그 역할이다. 진입 차수(in-degree)가 0인 노드부터 큐에 넣고, 하나씩 꺼내며 연결된 노드의 진입 차수를 줄여가는 방식이다.

from collections import deque

def topo_sort(tasks):
    in_degree = {t: len(v['deps']) for t, v in tasks.items()}
    queue = deque([t for t, d in in_degree.items() if d == 0])
    order = []
    while queue:
        node = queue.popleft()
        order.append(node)
        for t, v in tasks.items():
            if node in v['deps']:
                in_degree[t] -= 1
                if in_degree[t] == 0:
                    queue.append(t)
    if len(order) != len(tasks):
        raise ValueError('순환 의존성 감지: 실행 불가')
    return order

순환 의존성(A→B→A)이 생기면 ValueError를 즉시 던진다. 에이전트들이 서로를 기다리며 무한 대기하는 상황을 원천에서 막는다.

위상 정렬 결과가 ['collect', 'risk', 'report', 'notify']로 나오면, 같은 레벨의 에이전트는 asyncio.gather로 묶어서 병렬 실행한다.

import asyncio

async def run_dag(tasks):
    results = {}
    order = topo_sort(tasks)
    # 레벨별로 묶어서 병렬 실행
    pending = set(order)
    while pending:
        ready = [
            t for t in pending
            if all(d in results for d in tasks[t]['deps'])
        ]
        batch_results = await asyncio.gather(
            *[tasks[t]['fn'](results) for t in ready]
        )
        for t, r in zip(ready, batch_results):
            results[t] = r
            pending.remove(t)
    return results

DAG 병렬 실행 — 레벨 단위 처리


실측: 22분 → 9분, 그리고 레이스 컨디션 삽질

Mac Mini 4호 기준 직렬 실행 22분 3초, DAG 병렬 적용 후 9분 11초. 59% 단축이다.

처음 적용했을 때 한 가지 함정이 있었다. riskreport가 동시에 같은 수집 결과 파일에 쓰기를 시도한 것이다. 레이스 컨디션이 터졌고, 그 파일이 두 번 오염됐다.

문제 원인 증상 해결 방법
두 에이전트가 같은 파일에 동시 쓰기 결과 파일 오염, 데이터 누락 수집 결과를 메모리 객체로만 전달
파일 I/O 공유 비결정적 오류 (재현 어려움) 파일 쓰기를 notify 단 하나에서만 처리

해결은 단순했다. 수집 에이전트가 반환한 데이터를 메모리 객체로 넘기고, 파일 쓰기는 notify 단계 한 곳에서만 하도록 구조를 고정했다.

병렬 실행의 핵심은 속도가 아니다. '누가 무엇을 언제 쓰는가'를 설계 단계에서 미리 분리하는 것이다.

레이스 컨디션 발생과 해결 구조


운영 팁: 에이전트가 늘어날수록 이 구조가 더 빛난다

에이전트 3개 이하라면 솔직히 그냥 순차 실행해도 괜찮다. 4개를 넘는 순간부터 의존성이 머릿속에서 관리 불가 상태가 된다.

몇 가지 운영 중에 발견한 함정을 정리한다.

  • Mac vs Linux 차이: Mac Mini에서는 asyncio 이벤트 루프가 기본값이지만, Docker 컨테이너(Linux) 안에서 n8n Python 노드를 쓸 경우 uvloop을 명시적으로 설치해야 성능이 나온다.
  • 에이전트 타임아웃 설정: asyncio.wait_for로 각 에이전트에 개별 타임아웃을 걸어두지 않으면, 하나가 죽었을 때 전체 DAG가 무한 대기한다.
  • deps 키 오타: 'deps': ['colect']처럼 오타가 나면 위상 정렬이 그 태스크를 영영 큐에 올리지 못한다. 선언 시점에 deps 값이 tasks 키에 실제로 존재하는지 검증하는 코드를 별도로 붙이는 게 안전하다.
# deps 유효성 검증 (선언 직후 실행)
def validate_deps(tasks):
    for name, v in tasks.items():
        for dep in v['deps']:
            if dep not in tasks:
                raise KeyError(f"'{name}'의 deps에 없는 태스크: '{dep}'")

마무리

에이전트가 3개를 넘는 순간, 실행 순서는 머릿속에서 관리할 수 없다. DAG 선언 + 위상 정렬 + 병렬 실행, 이 세 가지를 묶으면 시스템이 알아서 순서를 강제한다.

다음 글에서는 에이전트 중 하나가 실패했을 때 DAG를 어떻게 부분 재시작하는지, 체크포인트 저장 전략을 다룰 예정이다.


🐦 X에서 더 빠르게: @baegseungh7061
📚 이 시리즈 더 보기: Code 실전
💌 새 글 알림: X 팔로우 또는 블로그 RSS 구독