암호화폐 거래를 자동화하고 싶다면, 봇 하나에 세 가지 I/O 흐름을 억지로 우겨넣는 실수부터 피해야 한다. 이 글은 텔레그램 명령 수신, 거래소 실시간 시세 스트림, 주문 실행을 각각 독립된 레이어로 분리해서 asyncio 단일 이벤트 루프 안에 올리는 과정을 처음부터 따라갈 수 있게 정리했다. Python 비동기 프로그래밍 기본 문법은 알고 있다고 가정한다.
왜 레이어를 분리해야 하는가
처음에는 단순하게 시작하고 싶다. 텔레그램 핸들러 안에서 시세도 가져오고, 조건 만족하면 주문까지 때리는 구조. 실제로 짜보면 금방 문제가 드러난다.
첫 번째 함정은 블로킹이다. 텔레그램 polling이 응답을 기다리는 동안 WebSocket 메시지가 유실된다. 두 번째는 순서 보장 실패다. 시세 이벤트와 사용자 명령이 같은 스레드에서 뒤섞이면 어느 타이밍에 주문이 나갔는지 추적이 안 된다.
세 가지 레이어를 명확히 나누면 이 두 문제가 구조적으로 해소된다.
- TelegramGateway — 사용자 명령을 이벤트로 변환해 큐에 넣는다
- ExchangeStream — 거래소 WebSocket 연결을 유지하며 tick을 큐에 넣는다
- Router — 큐에서 꺼내 구독자에게 dispatch한다
세 컴포넌트 모두 asyncio.Queue 하나를 공유한다. 실제 처리 순서는 큐가 보장한다.
의존성 설치와 프로젝트 구조
pip install python-telegram-bot[webhooks] websockets ccxt aiohttp pydantic
python-telegram-bot v20 이상은 내부가 asyncio 기반으로 재작성되어 있다. 별도 스레드나 ThreadPoolExecutor 없이 메인 이벤트 루프에 그냥 붙는다.
디렉터리는 이렇게 잡는다.
bot/
├── main.py
├── gateway/
│ ├── telegram_gw.py
│ └── exchange_ws.py
├── core/
│ ├── router.py
│ └── events.py
├── strategy/
│ └── base.py
└── config.py
이벤트 타입 먼저 정의하기
레이어 간 통신은 데이터클래스 이벤트 객체를 통해서만 한다. 타입을 먼저 정해두면 각 컴포넌트가 서로를 직접 참조하지 않아도 된다.
# core/events.py
from dataclasses import dataclass, field
from decimal import Decimal
from datetime import datetime
from enum import Enum
class EventType(Enum):
COMMAND = "command"
TICK = "tick"
ORDER_ACK = "order_ack"
@dataclass
class TickEvent:
type: EventType = field(default=EventType.TICK, init=False)
symbol: str
price: Decimal
volume: Decimal
ts: datetime
@dataclass
class CommandEvent:
type: EventType = field(default=EventType.COMMAND, init=False)
user_id: int
command: str
args: list[str]
float 대신 Decimal을 쓰는 이유가 있다. 주문 수량 계산에서 0.1 + 0.2 == 0.30000000000000004 같은 부동소수점 오차는 실제 돈 차이로 이어진다. 거래소 API는 수량 소수점 자릿수를 엄격하게 검증한다. 나중에 고치기 너무 번거로우니 처음부터 Decimal로 박아두는 게 낫다.
텔레그램 게이트웨이
# gateway/telegram_gw.py
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
from core.events import CommandEvent
import asyncio
class TelegramGateway:
def __init__(self, token: str, event_queue: asyncio.Queue):
self.app = Application.builder().token(token).build()
self.queue = event_queue
self._register_handlers()
def _register_handlers(self):
for cmd in ["start", "stop", "status", "buy", "sell"]:
self.app.add_handler(CommandHandler(cmd, self._dispatch))
async def _dispatch(self, update: Update, ctx: ContextTypes.DEFAULT_TYPE):
msg = update.message
args = ctx.args or []
evt = CommandEvent(
user_id=msg.from_user.id,
command=msg.text.split()[0].lstrip("/"),
args=args,
)
await self.queue.put(evt)
await msg.reply_text(f"✓ {evt.command} queued")
async def start(self):
await self.app.initialize()
await self.app.start()
await self.app.updater.start_polling(drop_pending_updates=True)
async def stop(self):
await self.app.updater.stop()
await self.app.stop()
await self.app.shutdown()
drop_pending_updates=True가 중요하다. 봇을 재시작했을 때 다운타임 동안 밀려 있던 명령들을 모조리 무시한다. 이 옵션 없이 재시작하면, 몇 시간 전에 누군가 보낸 /buy 명령이 그대로 실행되는 사고가 난다.
거래소 WebSocket — 재연결 루프가 핵심
# gateway/exchange_ws.py
import asyncio, json, websockets
from decimal import Decimal
from datetime import datetime, timezone
from core.events import TickEvent
BINANCE_WS = "wss://stream.binance.com:9443/stream?streams={}"
class ExchangeStream:
def __init__(self, symbols: list[str], event_queue: asyncio.Queue):
self.symbols = [s.lower() for s in symbols]
self.queue = event_queue
self._running = False
def _build_url(self) -> str:
streams = "/".join(f"{s}@bookTicker" for s in self.symbols)
return BINANCE_WS.format(streams)
async def _reconnect_loop(self):
backoff = 1
while self._running:
try:
async with websockets.connect(
self._build_url(),
ping_interval=20,
ping_timeout=10,
) as ws:
backoff = 1
async for raw in ws:
data = json.loads(raw).get("data", {})
symbol = data.get("s", "")
if not symbol:
continue
evt = TickEvent(
symbol=symbol,
price=Decimal(data["b"]), # best bid
volume=Decimal(data["B"]),
ts=datetime.now(timezone.utc),
)
await self.queue.put(evt)
except (websockets.WebSocketException, ConnectionError):
await asyncio.sleep(min(backoff, 60))
backoff *= 2
async def start(self):
self._running = True
asyncio.create_task(self._reconnect_loop())
async def stop(self):
self._running = False
지수 백오프 재연결을 빠뜨리면 나중에 반드시 후회한다. 바이낸스 WebSocket은 서버 측에서 24시간마다 연결을 강제 종료한다. 네트워크 순단도 수시로 생긴다. 재연결 로직 없이 프로덕션에 올리면 밤사이 연결이 끊기고 봇은 소리 없이 멈춰버린다.
backoff *= 2로 최대 60초까지 대기 간격을 늘린다. 거래소 서버가 장애 상태일 때 미친 듯이 재연결을 시도해 IP 밴을 맞는 상황도 이렇게 방지한다.
라우터와 메인 루프 조립
# core/router.py
import asyncio
from core.events import EventType
class Router:
def __init__(self, queue: asyncio.Queue):
self.queue = queue
self._subscribers: dict[EventType, list] = {}
def subscribe(self, etype: EventType, handler):
self._subscribers.setdefault(etype, []).append(handler)
async def run(self):
while True:
evt = await self.queue.get()
for handler in self._subscribers.get(evt.type, []):
asyncio.create_task(handler(evt))
self.queue.task_done()
핸들러를 asyncio.create_task()로 감싸는 이유가 있다. 느린 전략 계산이 라우터 루프 자체를 블로킹하지 않게 하기 위해서다. 핸들러가 직접 await로 실행되면, 하나가 느려질 때 나머지 이벤트 처리 전체가 밀린다.
# main.py
import asyncio
from config import Config
from gateway.telegram_gw import TelegramGateway
from gateway.exchange_ws import ExchangeStream
from core.router import Router
from core.events import EventType
async def on_tick(evt):
print(f"[TICK] {evt.symbol} {evt.price}")
async def on_command(evt):
print(f"[CMD] /{evt.command} {evt.args}")
async def main():
queue = asyncio.Queue(maxsize=1000)
router = Router(queue)
router.subscribe(EventType.TICK, on_tick)
router.subscribe(EventType.COMMAND, on_command)
tg = TelegramGateway(Config.TG_TOKEN, queue)
ws = ExchangeStream(["BTCUSDT", "ETHUSDT"], queue)
await tg.start()
await ws.start()
await router.run()
if __name__ == "__main__":
asyncio.run(main())
asyncio.Queue(maxsize=1000)으로 상한을 걸어두는 이유가 중요하다. 전략 처리가 느려질 때 이벤트가 무한정 쌓이면 메모리가 터진다. 큐가 꽉 차면 queue.put()이 블로킹되거나 put_nowait()이 QueueFull을 던진다. 이 예외를 잡아 텔레그램으로 경보를 보내는 backpressure 알림 로직은 다음 에피소드에서 붙인다.
시크릿 관리와 실행 확인
# config.py
import os
from dataclasses import dataclass
@dataclass(frozen=True)
class Config:
TG_TOKEN: str = os.environ["TG_BOT_TOKEN"]
TG_CHAT_ID: int = int(os.environ["TG_CHAT_ID"])
ENV: str = os.getenv("ENV", "dev")
.env 파일을 커밋하는 순간 API 키가 노출된다. 환경변수로만 주입한다.
TG_BOT_TOKEN=xxx TG_CHAT_ID=yyy python main.py
실행 후 텔레그램에서 /status를 보내면 터미널에 아래 두 줄이 동시에 보여야 한다.
[CMD] /status []
[TICK] BTCUSDT 68423.12000
[TICK] ETHUSDT 3124.56000
...
명령 처리와 시세 스트림이 서로 막지 않고 동시에 흐르면 뼈대는 완성이다.
다음 단계에서 붙일 것들
이 뼈대 위에 다음 에피소드에서 StrategyEngine을 올린다. 이동평균 크로스오버를 실시간 tick으로 계산하는 로직, ccxt를 통한 실제 주문 실행, 그리고 QueueFull 예외 시 텔레그램 경보까지 연결할 예정이다.
핵심은 하나다 — 이벤트 큐 하나가 전체 비동기 흐름을 중재한다. 컴포넌트끼리 직접 호출하지 않고 큐를 통해서만 통신하면, 나중에 전략 엔진을 교체하거나 거래소를 바꿀 때 건드릴 파일이 최소화된다.
🐦 X에서 더 빠르게: @baegseungh7061
📚 이 시리즈 더 보기: 전체 글 목록
💌 새 글 알림: X 팔로우 또는 블로그 RSS 구독