동시 세션이 늘어날수록 스트리밍 버퍼가 터지는 문제를 직접 겪었다. asyncio.Queue 하나로 메모리를 60% 줄이고 P99 응답 지연을 절반 이하로 낮춘 과정을 기록한다.
문제: 스트리밍이 빠를수록 좋다는 착각
Mac Mini 4대 클러스터에 Claude Agent SDK를 붙이고 풀 스트리밍으로 돌렸다. 처음엔 빠르고 쾌적했다. 동시 세션이 8개를 넘는 순간, 소비자 쪽 버퍼가 폭발했다. 응답 일부가 조용히 유실됐고, 로그에는 OOM 경고가 쌓였다.
원인은 단순했다. Agent SDK는 생성 속도에 맞춰 토큰을 밀어낸다(push). DB 저장, 알림 발송, 렌더링 같은 소비자 로직이 조금이라도 느리면 큐가 쌓인다. 수도꼭지를 최대로 열고 컵 하나로 받으면 물이 넘치는 것과 같다.
실측해보니 M2 Pro 1대 기준, 소비자가 50ms 처리 지연만 가져도 분당 약 1,200토큰이 유실됐다. 이게 세션 12개로 불어나면 피해 규모는 선형으로 커진다.
해결책: 밀기(push)에서 당기기(pull)로
역압(Back-pressure)의 핵심은 소비자가 준비됐을 때만 생산자가 다음 청크를 보내는 구조다. 소비자가 '지금 받을 수 있어'라는 신호를 보낼 때까지 생산자가 기다린다.
asyncio.Queue에 maxsize를 지정하면 이 구조가 자동으로 작동한다. 큐가 꽉 차면 queue.put()이 블로킹되고, 소비자가 하나를 꺼낸 뒤에야 생산자가 다음 토큰을 넣을 수 있다. 티켓팅 사이트의 대기열과 동일한 원리다.
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def producer(queue: asyncio.Queue, stream):
async for event in stream:
if event.type == 'content_block_delta':
await queue.put(event.delta.text) # 큐가 꽉 차면 여기서 자동 대기
await queue.put(None) # 종료 신호
async def consumer(queue: asyncio.Queue):
while True:
token = await queue.get() # 준비됐을 때만 꺼냄
if token is None:
break
await process_token(token)
queue.task_done()
async def run_with_backpressure(prompt: str):
queue = asyncio.Queue(maxsize=16) # 버퍼 상한선 고정
async with client.messages.stream(
model='claude-opus-4-5',
max_tokens=1024,
messages=[{'role': 'user', 'content': prompt}]
) as stream:
await asyncio.gather(
producer(queue, stream),
consumer(queue)
)
maxsize=16이 핵심이다. 이 숫자가 없으면 큐는 무제한으로 커지고, 버퍼 폭발 문제는 그대로 남는다.
Mac Mini 클러스터 실측: 역압 제어 전후 비교
동시 세션 12개 기준으로 측정했다.
| 지표 | 제어 전 | 제어 후 | 변화 |
|---|---|---|---|
| 세션당 메모리 | 220MB | 88MB | -60% |
| P99 응답 지연 | 3,400ms | 1,200ms | -65% |
| 버퍼 오버플로우 | 빈번 | 0건 | 완전 제거 |
maxsize=16은 내 클러스터에서 실측한 최적값이다. 소비자 처리가 느릴수록 이 값을 낮춰야 한다. 환경마다 다르므로 직접 측정이 필요하다.
멀티 세션 확장: Semaphore로 전체 동시성 잡기
세션 수가 늘어나면 asyncio.Semaphore로 전체 동시성 상한을 추가로 걸어야 한다.
semaphore = asyncio.Semaphore(4) # 물리 코어 수의 절반 기준
async def consume_stream(stream):
async with semaphore: # 슬롯 확보 후에만 처리 시작
async for event in stream:
if event.type == 'content_block_delta':
await process_token(event.delta.text)
await asyncio.sleep(0) # 이벤트 루프에 제어권 반환
asyncio.Semaphore(4)는 M2 Pro 기준 물리 코어 8개의 절반으로 잡았다. Ollama 셀프호스팅 환경과 혼합 운용할 때는 모델별로 별도 Semaphore를 두는 게 낫다.
Claude Agent SDK가 상위 오케스트레이터면 하위 Ollama 호출도 같은 역압 패턴을 물려받게 설계했다. 구조를 '소비자 주도(consumer-driven)'로 고정하면 어떤 하위 모델을 붙여도 동일하게 동작한다.
운영 팁과 주의할 함정
maxsize 튜닝 기준: 소비자 process_token() 평균 처리 시간(ms) × 목표 초당 토큰 수로 최솟값을 추정한 뒤, 실측으로 보정한다.
asyncio.sleep(0) 의 역할: 이걸 빼면 소비자가 이벤트 루프를 독점해 다른 코루틴이 굶는다. 한 줄이지만 필수다.
Docker 환경 주의: 컨테이너에서 cgroup 메모리 제한이 걸려 있으면 OOM killer가 maxsize와 무관하게 프로세스를 죽인다. -m 플래그 설정과 maxsize를 함께 조율해야 한다.
Linux vs macOS: macOS의 asyncio 이벤트 루프는 Linux보다 컨텍스트 전환 비용이 크다. 같은 maxsize여도 Linux 서버에서 처리량이 15~20% 더 높게 나왔다.
스트리밍을 빠르게 쏘는 게 목표가 아니다. 소비자가 안전하게 처리할 수 있는 속도로 흐름을 맞추는 게 실전 설계다. asyncio.Queue(maxsize=N) 한 줄이 버퍼 오버플로우를 막고, Semaphore가 세션 수를 잡는다. 이 패턴을 고정하면 클러스터 규모를 키워도 메모리 사용이 선형을 유지한다.
다음 글에서는 역압 제어가 적용된 멀티 에이전트 환경에서 에러 복구(retry with backoff) 전략을 다룬다.
🐦 X에서 더 빠르게: @baegseungh7061
📚 이 시리즈 더 보기: Code 실전
💌 새 글 알림: X 팔로우 또는 블로그 RSS 구독
'Code 실전' 카테고리의 다른 글
| 에이전트 실행 순서, DAG와 위상 정렬로 코드에 새기기 (0) | 2026.05.12 |
|---|---|
| Claude API 프롬프트 캐싱으로 API 비용 89% 줄이는 법 — Mac Mini 클러스터 실측 (1) | 2026.05.11 |
| Claude Code Pre/Post 훅을 이벤트 버스로 연결해 비동기 파이프라인 구성하기 (0) | 2026.05.09 |
| Claude Code 실행 이력을 JSON으로 수집해 이상 탐지 자동화하기 (0) | 2026.05.08 |
| MCP 서버 내부 상태를 실시간으로 들여다보는 법 — 스냅샷 엔드포인트로 툴 호출 흐름 시각화하기 (0) | 2026.05.07 |