카테고리 없음

플러그인 성능 프로파일러 직접 만들기: 실행 시간·메모리·CPU 스파이크 자동 추적

seunghyeonlab 2026. 5. 18. 13:03

플러그인 수가 늘어날수록 어느 플러그인이 전체 파이프라인을 느리게 만드는지 육안으로 파악하기 어려워진다. 타이머 몇 줄 붙이는 것으로는 부족하다. 비동기 스택 추적, 메모리 스냅샷, CPU 점유율까지 한꺼번에 잡아야 병목이 보인다. 이 글은 플러그인 코드를 건드리지 않고 로더 레벨에서 계측하는 방법을 처음부터 구현한다.

전체 프로파일링 흐름 다이어그램


왜 단순 타이머로는 부족한가

처음에는 time.perf_counter()를 플러그인 앞뒤에 심는 방식으로 시작했다. 숫자는 나왔다. 문제는 이 숫자가 무엇을 의미하는지 해석이 안 됐다는 것이다.

wall_time이 500ms가 나왔을 때, 이게 실제 연산이 느린 건지 I/O 대기인지 알 수 없었다. 메모리는 아예 측정이 안 됐다. 플러그인 A가 항상 느렸지만 나머지가 다 30ms 이하라면 A의 500ms가 이상치인지도 판단 기준이 없었다.

필요한 건 세 가지였다.

측정 항목 도구 판단 기준
실행 시간 (wall) time.perf_counter() z-score > 2.0
CPU 연산 시간 psutil / time.thread_time_ns() wall 대비 비율
메모리 증감 tracemalloc 스냅샷 차분 피크 절대값

wall time과 cpu time 괴리 패턴


계측 레이어 구현

플러그인 코드를 건드리지 않으려면 로더 자체에 미들웨어를 두는 구조가 맞다. 데코레이터로 각 플러그인 함수를 래핑하되, 래핑은 등록 시점에 자동으로 이루어지게 했다.

import time
import tracemalloc
import psutil
import os
from functools import wraps
from dataclasses import dataclass, field
from typing import Callable, Any

@dataclass
class PluginProfile:
    name: str
    wall_time_ms: float = 0.0
    cpu_time_ms: float = 0.0
    memory_delta_kb: float = 0.0
    peak_memory_kb: float = 0.0
    call_count: int = 0
    errors: list = field(default_factory=list)

class PluginProfiler:
    def __init__(self):
        self._profiles: dict[str, PluginProfile] = {}
        self._process = psutil.Process(os.getpid())

    def wrap(self, name: str, fn: Callable) -> Callable:
        if name not in self._profiles:
            self._profiles[name] = PluginProfile(name=name)

        @wraps(fn)
        async def _measured(*args, **kwargs):
            profile = self._profiles[name]
            tracemalloc.start()
            cpu_before = self._process.cpu_times()
            wall_start = time.perf_counter()

            try:
                result = await fn(*args, **kwargs)
                return result
            except Exception as e:
                profile.errors.append(str(e))
                raise
            finally:
                wall_elapsed = (time.perf_counter() - wall_start) * 1000
                cpu_after = self._process.cpu_times()
                _, peak = tracemalloc.get_traced_memory()
                tracemalloc.stop()

                cpu_elapsed = (
                    (cpu_after.user - cpu_before.user) +
                    (cpu_after.system - cpu_before.system)
                ) * 1000

                profile.wall_time_ms += wall_elapsed
                profile.cpu_time_ms += cpu_elapsed
                profile.peak_memory_kb = max(profile.peak_memory_kb, peak / 1024)
                profile.call_count += 1

        return _measured

finally 블록에 측정을 몰아넣은 이유는 예외 발생 시에도 계측을 빠뜨리지 않기 위해서다. 에러 케이스일수록 느리거나 메모리를 과다 점유하는 경우가 많았다.


멀티스레드 환경의 CPU 시간 분리

psutil.Process.cpu_times()는 프로세스 전체 누계라 플러그인별로 분리가 안 된다. 스레드가 여럿 돌아가는 환경이라면 time.thread_time_ns()를 스레드 로컬로 관리해야 플러그인 단위 CPU 시간이 정확하게 나온다.

import threading

_thread_cpu = threading.local()

def thread_cpu_start():
    _thread_cpu.start = time.thread_time_ns()

def thread_cpu_elapsed_ms() -> float:
    return (time.thread_time_ns() - _thread_cpu.start) / 1e6

싱글스레드 asyncio 루프라면 psutil 방식으로 충분하다. 멀티프로세스(multiprocessing)라면 각 프로세스에서 별도 PluginProfiler 인스턴스를 두고 결과를 큐로 모으는 구조가 된다.


메모리 스냅샷 차분으로 원인 추적

실행 시간만 봐서는 놓치는 게 있다. 빠른데 메모리를 과다 점유하는 플러그인은 시간 지표에서는 정상으로 분류된다. 호출 전후로 스냅샷을 찍어 차분하면 어느 줄에서 어떤 객체를 얼마나 만들었는지 보인다.

def capture_memory_diff(before: tracemalloc.Snapshot,
                        after: tracemalloc.Snapshot,
                        top_n=5):
    stats = after.compare_to(before, 'lineno')
    return [
        {
            "file": str(stat.traceback[0]),
            "size_diff_kb": stat.size_diff / 1024,
            "count_diff": stat.count_diff,
        }
        for stat in stats[:top_n]
    ]

실제로 이 방법으로 한 플러그인이 매 호출마다 bytes 객체를 400KB씩 새로 할당하고 해제하는 패턴을 잡았다. 캐싱으로 해결했고 GC 압력이 눈에 띄게 줄었다.

메모리 스냅샷 차분 흐름


병목 자동 판정: z-score 기반

절대 기준값("100ms 넘으면 느림")은 플러그인마다 기대치가 달라 맞지 않았다. 전체 분포에서 얼마나 벗어났는지를 보는 z-score가 더 실용적이었다.

import statistics

def detect_bottlenecks(profiles: dict[str, PluginProfile],
                       z_threshold=2.0):
    times = [p.wall_time_ms / max(p.call_count, 1) for p in profiles.values()]
    if len(times) < 2:
        return []

    mean = statistics.mean(times)
    stdev = statistics.stdev(times)

    bottlenecks = []
    for name, profile in profiles.items():
        avg_time = profile.wall_time_ms / max(profile.call_count, 1)
        z = (avg_time - mean) / stdev if stdev > 0 else 0
        if z > z_threshold:
            bottlenecks.append({
                "plugin": name,
                "avg_wall_ms": round(avg_time, 2),
                "z_score": round(z, 2),
                "peak_mem_kb": round(profile.peak_memory_kb, 1),
                "errors": len(profile.errors),
            })

    return sorted(bottlenecks, key=lambda x: x["z_score"], reverse=True)

z-score 임계값 2.0은 전체의 약 상위 2.5%를 잡는다. 플러그인 수가 적을 때는 1.5로 낮추는 게 감도가 좋았다.


플러그인 로더에 통합하기

class PluginLoader:
    def __init__(self, profiler: PluginProfiler):
        self._plugins = {}
        self._profiler = profiler

    def register(self, name: str, plugin_fn: Callable):
        self._plugins[name] = self._profiler.wrap(name, plugin_fn)

    async def invoke(self, name: str, *args, **kwargs) -> Any:
        if name not in self._plugins:
            raise KeyError(f"Plugin '{name}' not registered")
        return await self._plugins[name](*args, **kwargs)

    def report(self):
        bottlenecks = detect_bottlenecks(self._profiler._profiles)
        for b in bottlenecks:
            print(
                f"[BOTTLENECK] {b['plugin']} "
                f"| avg {b['avg_wall_ms']}ms "
                f"| z={b['z_score']} "
                f"| mem={b['peak_mem_kb']}KB"
            )
        return bottlenecks

register() 한 번으로 끝이다. 이후 모든 invoke() 호출에서 자동으로 계측이 붙는다.


CPU 스파이크 실시간 감시

장기 실행 에이전트는 호출 단위 계측만으로는 부족하다. 특정 시간대에 CPU가 치솟는 패턴은 백그라운드 루프로 잡아야 한다.

import asyncio
from contextvars import ContextVar

current_plugin: ContextVar[str] = ContextVar('current_plugin', default='unknown')

async def cpu_spike_monitor(interval=1.0, spike_pct=80.0):
    process = psutil.Process(os.getpid())
    while True:
        await asyncio.sleep(interval)
        cpu = process.cpu_percent(interval=None)
        if cpu > spike_pct:
            plugin = current_plugin.get()
            print(f"[CPU SPIKE] {cpu:.1f}% — 실행 중 플러그인: {plugin}")

_measured 래퍼 안에서 current_plugin.set(name)을 호출해두면, 스파이크 발생 시점에 어느 플러그인이 실행 중이었는지 정확히 매핑된다.

CPU 스파이크 감지와 플러그인 매핑


운영 환경 투입 시 주의사항

tracemalloc은 CPython 전용이고, 활성화하면 메모리 오버헤드가 약 10~30% 붙는다. 프로덕션에서는 항상 켜두면 안 된다. 환경 변수로 제어하는 패턴이 실용적이다.

# 의심 구간에만 활성화
PLUGIN_PROFILING=1 python agent.py
PROFILING_ENABLED = os.getenv("PLUGIN_PROFILING", "0") == "1"

수집한 프로파일 데이터는 OpenTelemetry span으로 내보내면 Jaeger나 Grafana Tempo와 연동해 분산 추적 뷰로 확장할 수 있다. 플러그인 호출을 span으로 감싸면 avg_wall_ms, peak_mem_kb를 span attribute로 붙여 대시보드에서 시계열로 볼 수 있다.


계측 레이어를 로더에 두면 플러그인 코드 수정 없이 모든 플러그인의 성능 데이터를 일괄 수집할 수 있다. wall_timecpu_time 차이로 I/O 병목과 연산 병목을 구분하고, tracemalloc 차분으로 메모리 과다 할당 위치를 줄 단위로 좁히는 것이 핵심이다. 다음 글에서는 이 데이터를 OpenTelemetry로 내보내 Grafana에서 실시간 플러그인 히트맵을 구성하는 과정을 다룬다.


🐦 X에서 더 빠르게: @baegseungh7061
📚 이 시리즈 더 보기: 전체 글 목록
💌 새 글 알림: X 팔로우 또는 블로그 RSS 구독