#!/usr/bin/env python3 """KIS 실시간 주가 구독 예제. 환경 변수: - KIS_APP_KEY - KIS_APP_SECRET - KIS_STOCK_CODE (선택, 기본값: 005930) - KIS_ENV (선택, real 또는 mock. 기본값: real) 주의: - 실시간 시세 WebSocket 접속 주소/헤더/메시지 포맷은 KIS 공식 문서 기준으로 작성했다. - 계정/상품 권한에 따라 실시간 시세 이용 가능 여부가 달라질 수 있다. """ from __future__ import annotations import asyncio import json import os import sys from typing import Any, Dict, List import requests import websockets REST_BASE_REAL = "https://openapi.koreainvestment.com:9443" REST_BASE_MOCK = "https://openapivts.koreainvestment.com:29443" WS_URL_REAL = "ws://ops.koreainvestment.com:21000" WS_URL_MOCK = "ws://ops.koreainvestment.com:31000" TR_ID = "H0STCNT0" # 실시간 주식 체결가 TR_KEY_DEFAULT = "005930" class KISConfigError(Exception): pass def get_env(name: str, required: bool = True, default: str | None = None) -> str: value = os.getenv(name, default) if required and not value: raise KISConfigError(f"환경 변수 {name} 가 필요합니다.") return value or "" def get_base_urls(env_name: str) -> tuple[str, str]: env_name = env_name.lower().strip() if env_name == "mock": return REST_BASE_MOCK, WS_URL_MOCK return REST_BASE_REAL, WS_URL_REAL def get_access_token(app_key: str, app_secret: str, rest_base: str) -> str: """OAuth access token 발급.""" url = f"{rest_base}/oauth2/tokenP" headers = {"content-type": "application/json; charset=UTF-8"} payload = { "grant_type": "client_credentials", "appkey": app_key, "appsecret": app_secret, } response = requests.post(url, headers=headers, json=payload, timeout=10) response.raise_for_status() data = response.json() access_token = data.get("access_token") if not access_token: raise RuntimeError(f"access_token 발급 실패: {data}") return access_token def build_subscribe_message(app_key: str, app_secret: str, stock_code: str) -> str: """KIS WebSocket 구독 메시지 생성.""" message = { "header": { "approval_key": "", "custtype": "P", "tr_type": "1", "content-type": "utf-8", }, "body": { "input": { "tr_id": TR_ID, "tr_key": stock_code, } }, } # 일부 문서/환경에서는 approval_key 사전 발급을 요구한다. # 여기서는 토큰 방식 예제를 유지하되, approval_key가 필요하면 별도 API 연동으로 교체 가능. _ = (app_key, app_secret) return json.dumps(message) def parse_realtime_message(raw: str) -> Dict[str, Any] | None: """실시간 수신 메시지 파싱. KIS 실시간 체결 데이터는 보통 '|' 구분 후 '^' 필드 구분 형태가 온다. 예: 0|H0STCNT0|001|...^...^ """ if not raw: return None # JSON 제어 메시지 처리 if raw.startswith("{"): try: return {"type": "json", "data": json.loads(raw)} except json.JSONDecodeError: return {"type": "text", "raw": raw} parts = raw.split("|") if len(parts) < 4: return {"type": "text", "raw": raw} data_flag, tr_id, _, payload = parts[0], parts[1], parts[2], parts[3] fields: List[str] = payload.split("^") result: Dict[str, Any] = { "type": "realtime", "data_flag": data_flag, "tr_id": tr_id, "raw": raw, "fields": fields, } if tr_id == TR_ID: # 공식 문서 기준 주요 필드만 우선 매핑 # 인덱스는 문서 개정에 따라 바뀔 수 있어 필요 시 조정해야 함 result["stock_code"] = fields[0] if len(fields) > 0 else None result["trade_time"] = fields[1] if len(fields) > 1 else None result["current_price"] = fields[2] if len(fields) > 2 else None result["compare_sign"] = fields[3] if len(fields) > 3 else None result["compare_price"] = fields[4] if len(fields) > 4 else None result["change_rate"] = fields[5] if len(fields) > 5 else None result["weighted_avg_price"] = fields[6] if len(fields) > 6 else None result["open_price"] = fields[7] if len(fields) > 7 else None result["high_price"] = fields[8] if len(fields) > 8 else None result["low_price"] = fields[9] if len(fields) > 9 else None result["trade_volume"] = fields[12] if len(fields) > 12 else None result["accum_volume"] = fields[13] if len(fields) > 13 else None result["accum_trade_value"] = fields[14] if len(fields) > 14 else None return result def print_realtime_data(parsed: Dict[str, Any]) -> None: if parsed.get("type") == "json": print("[JSON]", json.dumps(parsed["data"], ensure_ascii=False)) return if parsed.get("type") != "realtime": print("[RAW]", parsed.get("raw")) return if parsed.get("tr_id") != TR_ID: print("[OTHER]", parsed.get("raw")) return print( "[체결] " f"종목={parsed.get('stock_code')} " f"시간={parsed.get('trade_time')} " f"현재가={parsed.get('current_price')} " f"대비={parsed.get('compare_price')} " f"등락률={parsed.get('change_rate')} " f"거래량={parsed.get('trade_volume')}" ) async def subscribe_realtime(app_key: str, app_secret: str, stock_code: str, ws_url: str) -> None: subscribe_message = build_subscribe_message(app_key, app_secret, stock_code) async with websockets.connect(ws_url, ping_interval=30, ping_timeout=10) as websocket: print(f"WebSocket 연결 성공: {ws_url}") await websocket.send(subscribe_message) print(f"실시간 구독 요청 전송: TR={TR_ID}, 종목코드={stock_code}") async for message in websocket: parsed = parse_realtime_message(message) if parsed: print_realtime_data(parsed) def main() -> None: try: app_key = get_env("KIS_APP_KEY") app_secret = get_env("KIS_APP_SECRET") stock_code = get_env("KIS_STOCK_CODE", required=False, default=TR_KEY_DEFAULT) env_name = get_env("KIS_ENV", required=False, default="real") rest_base, ws_url = get_base_urls(env_name) access_token = get_access_token(app_key, app_secret, rest_base) print("OAuth access token 발급 성공") # 현재 예제는 토큰 발급 절차를 먼저 수행해 인증 정보 유효성을 확인한다. # WebSocket 구독 시 실제 운영 환경에서 approval_key가 필요하면 관련 API 추가 구현이 필요하다. _ = access_token asyncio.run(subscribe_realtime(app_key, app_secret, stock_code, ws_url)) except KeyboardInterrupt: print("\n종료합니다.") except Exception as exc: print(f"오류 발생: {exc}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()