라이브 트레이딩 리스크 오버레이 — Kill Switch, 오더 실패율, 익스포저 캡
일별/주별 손실 한도, 슬라이딩 윈도우 실패율, 섹터별 익스포저 비율 제어까지 계좌 레벨 안전장치
전략 레벨 stop loss와 별개로, 계좌 전체를 보호하는 리스크 오버레이가 필요하다. 전략이 여러 개 돌아갈 때 개별 전략 stop만으로는 전체 손실을 제어할 수 없다. 또 브로커 API 오류나 네트워크 장애가 연속으로 나면 잘못된 주문이 누적될 수 있다.
RiskManager는 세 가지를 본다.
- 일별/주별 손실이 한도를 넘으면 kill switch
- 주문 실패율이 임계치를 넘으면 kill switch
- 포지션이 심볼/섹터/전체 한도를 초과하면 비율 조정
Kill Switch 조건
@dataclass
class RiskLimits:
daily_loss_limit: float = -0.02 # 하루 -2%
weekly_loss_limit: float = -0.05 # 주간 -5%
failure_rate_limit: float = 0.20 # 5분 내 실패율 20%
failure_window_minutes: int = 5
min_orders_in_window_for_rate: int = 20
consecutive_failures_limit: int = 3 # 연속 실패 3회
absolute_failures_in_window_limit: int = 5 # 절대 실패 5건
손실 한도
def mark_equity(self, ts: datetime, equity: float) -> None:
day = ts.date()
week = self._week_key(ts)
if st.day_anchor is None or st.day_anchor.date() != day:
st.day_anchor = ts
st.day_start_equity = float(equity)
if st.week_anchor is None or self._week_key(st.week_anchor) != week:
st.week_anchor = ts
st.week_start_equity = float(equity)
day_pnl = float(equity / st.day_start_equity - 1.0)
if day_pnl <= self.limits.daily_loss_limit:
self._activate_kill_switch(ts, f"daily_loss_limit:{day_pnl:.4f}", "daily_loss_limit")
week_pnl = float(equity / st.week_start_equity - 1.0)
if week_pnl <= self.limits.weekly_loss_limit:
self._activate_kill_switch(ts, f"weekly_loss_limit:{week_pnl:.4f}", "weekly_loss_limit")
봉이 들어올 때마다 mark_equity를 호출한다. 일/주 시작 기준 equity를 기억해 두고 현재 대비 낙폭을 계산한다. 날짜가 바뀌면 자동으로 앵커를 갱신한다.
주문 실패율
오더를 낼 때마다 결과를 기록한다. deque로 슬라이딩 윈도우를 관리한다.
def record_order_result(self, ts: datetime, success: bool,
status_code: int | None = None,
error_type: str | None = None) -> None:
self.state.order_events.append((ts, success, status_code, error_type))
self._trim_order_events(ts) # 윈도우 밖 이벤트 제거
self._refresh_consecutive_failures()
# 401/403은 즉시 kill switch (인증 오류는 일시적이 아님)
if not success and status_code == 401:
self._activate_kill_switch(ts, "fatal_order_error_status:401", "http_401")
return
if not success and status_code == 403 and error_type == "auth_error":
self._activate_kill_switch(ts, "fatal_order_error_status:403", "http_403")
return
n, failures = self._failure_stats(ts)
# 절대 실패 건수 초과
if not success and failures >= self.limits.absolute_failures_in_window_limit:
self._activate_kill_switch(ts, f"absolute_failures:{failures}", "absolute_failures")
return
# 연속 실패
if not success and self.state.consecutive_failures >= self.limits.consecutive_failures_limit:
self._activate_kill_switch(ts, f"consecutive_failures:{self.state.consecutive_failures}",
"consecutive_failures")
return
# 비율 초과 (최소 20건 이상일 때만 평가)
if n >= self.limits.min_orders_in_window_for_rate and n > 0:
failure_rate = failures / n
if failure_rate > self.limits.failure_rate_limit:
self._activate_kill_switch(ts, f"order_failure_rate:{failure_rate:.4f}", "order_failure_rate")
_trim_order_events는 윈도우 밖 이벤트를 deque 앞에서 제거한다.
def _trim_order_events(self, ts: datetime) -> None:
cutoff = ts - timedelta(minutes=self.limits.failure_window_minutes)
while self.state.order_events and self.state.order_events[0][0] < cutoff:
self.state.order_events.popleft()
연속 실패 카운터는 최근 이벤트부터 역순으로 성공이 나올 때까지 실패를 센다.
def _refresh_consecutive_failures(self) -> None:
c = 0
for _, ok, _, _ in reversed(self.state.order_events):
if ok:
break
c += 1
self.state.consecutive_failures = c
세 조건을 병렬로 본다.
- 비율: 통계적 노이즈를 무시하려면 최소 20건이 필요
- 절대값: 건수가 적어도 5건 연속 실패면 위험
- 연속: 마지막 3건이 전부 실패하면 즉시 중단
자동 복구
kill switch가 걸려도 원인에 따라 자동 복구를 허용한다.
def maybe_auto_recover(self, ts: datetime) -> bool:
fatal_reason_codes = {"daily_loss_limit", "weekly_loss_limit", "http_401", "http_403"}
if self.state.reason_code in fatal_reason_codes:
return False # 손실 한도 / 인증 오류는 수동 해제만
cooldown = timedelta(minutes=self.limits.recover_cooldown_minutes)
if ts - self.state.last_kill_switch_ts < cooldown:
return False # 5분 쿨다운
n, failures = self._failure_stats(ts)
self._refresh_consecutive_failures()
if failures == 0 and self.state.consecutive_failures == 0:
# 윈도우 내 실패가 사라졌으면 복구
self.state.kill_switch = False
...
return True
손실 한도와 인증 오류는 자동 복구 금지다. 운영자가 직접 확인하고 clear_kill_switch()를 호출해야 한다. 일시적인 주문 실패율 초과는 5분 쿨다운 후 윈도우에서 실패가 사라지면 자동 복구된다.
익스포저 캡
목표 포지션을 직접 실행하기 전에 apply_exposure_caps를 거친다.
@dataclass
class RiskLimits:
max_symbol_exposure: float = 0.15 # 종목당 최대 15%
max_total_exposure: float = 0.60 # 전체 포지션 최대 60%
max_sector_exposure: float = 0.20 # 섹터당 최대 20%
def apply_exposure_caps(self, target_positions: dict[str, float]) -> dict[str, float]:
# 1단계: 종목별 캡
capped = {}
for sym, w in target_positions.items():
capped[sym] = max(-self.limits.max_symbol_exposure,
min(self.limits.max_symbol_exposure, float(w)))
# 2단계: 섹터별 캡
capped = self._apply_sector_caps(capped)
# 3단계: 전체 총합 캡
total = sum(abs(w) for w in capped.values())
if total > self.limits.max_total_exposure and total > 0:
scale = self.limits.max_total_exposure / total
capped = {k: v * scale for k, v in capped.items()}
return capped
섹터 캡은 종목별 캡 이후에 적용된다. 섹터 내 총 익스포저가 한도를 넘으면 섹터 내 모든 종목을 비율 축소한다.
def _apply_sector_caps(self, capped: dict[str, float]) -> dict[str, float]:
for sec, syms in by_sector.items():
sec_cap = float(overrides.get(sec, self.limits.max_sector_exposure))
gross = sum(abs(capped[sym]) for sym in syms)
if gross > sec_cap and gross > 0:
scale = sec_cap / gross
for sym in syms:
capped[sym] = capped[sym] * scale
return capped
sector_exposure_overrides로 섹터별로 다른 한도를 줄 수 있다. 에너지 섹터는 10%, 기술 섹터는 30%처럼.
데이터 신선도 체크
주문 전에 시장 데이터가 최신인지 확인한다.
def assess_data_freshness(
now: datetime,
last_15m: datetime | None,
last_60m: datetime | None,
stale_15m_bars: int = 2,
stale_60m_bars: int = 1,
) -> FreshnessStatus:
if last_15m is None or last_60m is None:
return FreshnessStatus(status="missing", can_trade=False, reason="missing_bar_data")
if now_utc - last_15m > timedelta(minutes=15 * stale_15m_bars):
return FreshnessStatus(status="stale", can_trade=False, reason="stale_15m")
if now_utc - last_60m > timedelta(minutes=60 * stale_60m_bars):
return FreshnessStatus(status="stale", can_trade=False, reason="stale_60m")
return FreshnessStatus(status="fresh", can_trade=True, reason="fresh")
15분봉은 2봉(30분), 60분봉은 1봉(60분) 이상 오지 않으면 stale로 판단한다. 데이터 피드가 끊겼을 때 오래된 신호로 주문이 나가는 걸 막는다.
US 정규장 여부도 별도로 체크한다.
def is_us_regular_hours(now: datetime) -> bool:
ts = now.astimezone(ZoneInfo("America/New_York"))
if ts.weekday() >= 5:
return False
mins = ts.hour * 60 + ts.minute
return 570 <= mins < 960 # 09:30 ~ 16:00
실제 적용 흐름
봉 수신
↓
mark_equity(ts, current_equity) ← 손실 한도 체크
↓
assess_data_freshness(...) ← 데이터 신선도 체크
↓
can_trade() == True? ← kill switch 확인
↓
전략 신호 생성
↓
apply_exposure_caps(raw_positions) ← 익스포저 조정
↓
주문 실행
↓
record_order_result(ts, success, ...) ← 실패율 업데이트
↓
maybe_auto_recover(ts) ← 복구 가능 여부 확인
kill switch가 걸리면 신규 주문을 내지 않고 기존 포지션 청산 로직만 실행한다. reason_code를 로그에 남겨두면 kill switch가 왜 발동됐는지 사후에 추적할 수 있다.