210 lines
6.9 KiB
Python
210 lines
6.9 KiB
Python
#!/usr/bin/env python3
|
|
"""KIS 실시간 주가 구독 예제.
|
|
|
|
환경 변수:
|
|
- KIS_APP_KEY
|
|
- KIS_APP_SECRET
|
|
- KIS_STOCK_CODE (선택, 기본값: 005930)
|
|
- KIS_ENV (선택, real 또는 mock. 기본값: real)
|
|
|
|
주의:
|
|
- 실시간 시세 WebSocket 접속 주소/헤더/메시지 포맷은 KIS 공식 문서 기준으로 작성했다.
|
|
- 계정/상품 권한에 따라 실시간 시세 이용 가능 여부가 달라질 수 있다.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
from typing import Any, Dict, List
|
|
|
|
import requests
|
|
import websockets
|
|
|
|
|
|
REST_BASE_REAL = "https://openapi.koreainvestment.com:9443"
|
|
REST_BASE_MOCK = "https://openapivts.koreainvestment.com:29443"
|
|
WS_URL_REAL = "ws://ops.koreainvestment.com:21000"
|
|
WS_URL_MOCK = "ws://ops.koreainvestment.com:31000"
|
|
|
|
TR_ID = "H0STCNT0" # 실시간 주식 체결가
|
|
TR_KEY_DEFAULT = "005930"
|
|
|
|
|
|
class KISConfigError(Exception):
|
|
pass
|
|
|
|
|
|
def get_env(name: str, required: bool = True, default: str | None = None) -> str:
|
|
value = os.getenv(name, default)
|
|
if required and not value:
|
|
raise KISConfigError(f"환경 변수 {name} 가 필요합니다.")
|
|
return value or ""
|
|
|
|
|
|
def get_base_urls(env_name: str) -> tuple[str, str]:
|
|
env_name = env_name.lower().strip()
|
|
if env_name == "mock":
|
|
return REST_BASE_MOCK, WS_URL_MOCK
|
|
return REST_BASE_REAL, WS_URL_REAL
|
|
|
|
|
|
def get_access_token(app_key: str, app_secret: str, rest_base: str) -> str:
|
|
"""OAuth access token 발급."""
|
|
url = f"{rest_base}/oauth2/tokenP"
|
|
headers = {"content-type": "application/json; charset=UTF-8"}
|
|
payload = {
|
|
"grant_type": "client_credentials",
|
|
"appkey": app_key,
|
|
"appsecret": app_secret,
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=payload, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
access_token = data.get("access_token")
|
|
if not access_token:
|
|
raise RuntimeError(f"access_token 발급 실패: {data}")
|
|
return access_token
|
|
|
|
|
|
def build_subscribe_message(app_key: str, app_secret: str, stock_code: str) -> str:
|
|
"""KIS WebSocket 구독 메시지 생성."""
|
|
message = {
|
|
"header": {
|
|
"approval_key": "",
|
|
"custtype": "P",
|
|
"tr_type": "1",
|
|
"content-type": "utf-8",
|
|
},
|
|
"body": {
|
|
"input": {
|
|
"tr_id": TR_ID,
|
|
"tr_key": stock_code,
|
|
}
|
|
},
|
|
}
|
|
|
|
# 일부 문서/환경에서는 approval_key 사전 발급을 요구한다.
|
|
# 여기서는 토큰 방식 예제를 유지하되, approval_key가 필요하면 별도 API 연동으로 교체 가능.
|
|
_ = (app_key, app_secret)
|
|
return json.dumps(message)
|
|
|
|
|
|
def parse_realtime_message(raw: str) -> Dict[str, Any] | None:
|
|
"""실시간 수신 메시지 파싱.
|
|
|
|
KIS 실시간 체결 데이터는 보통 '|' 구분 후 '^' 필드 구분 형태가 온다.
|
|
예: 0|H0STCNT0|001|...^...^
|
|
"""
|
|
if not raw:
|
|
return None
|
|
|
|
# JSON 제어 메시지 처리
|
|
if raw.startswith("{"):
|
|
try:
|
|
return {"type": "json", "data": json.loads(raw)}
|
|
except json.JSONDecodeError:
|
|
return {"type": "text", "raw": raw}
|
|
|
|
parts = raw.split("|")
|
|
if len(parts) < 4:
|
|
return {"type": "text", "raw": raw}
|
|
|
|
data_flag, tr_id, _, payload = parts[0], parts[1], parts[2], parts[3]
|
|
fields: List[str] = payload.split("^")
|
|
|
|
result: Dict[str, Any] = {
|
|
"type": "realtime",
|
|
"data_flag": data_flag,
|
|
"tr_id": tr_id,
|
|
"raw": raw,
|
|
"fields": fields,
|
|
}
|
|
|
|
if tr_id == TR_ID:
|
|
# 공식 문서 기준 주요 필드만 우선 매핑
|
|
# 인덱스는 문서 개정에 따라 바뀔 수 있어 필요 시 조정해야 함
|
|
result["stock_code"] = fields[0] if len(fields) > 0 else None
|
|
result["trade_time"] = fields[1] if len(fields) > 1 else None
|
|
result["current_price"] = fields[2] if len(fields) > 2 else None
|
|
result["compare_sign"] = fields[3] if len(fields) > 3 else None
|
|
result["compare_price"] = fields[4] if len(fields) > 4 else None
|
|
result["change_rate"] = fields[5] if len(fields) > 5 else None
|
|
result["weighted_avg_price"] = fields[6] if len(fields) > 6 else None
|
|
result["open_price"] = fields[7] if len(fields) > 7 else None
|
|
result["high_price"] = fields[8] if len(fields) > 8 else None
|
|
result["low_price"] = fields[9] if len(fields) > 9 else None
|
|
result["trade_volume"] = fields[12] if len(fields) > 12 else None
|
|
result["accum_volume"] = fields[13] if len(fields) > 13 else None
|
|
result["accum_trade_value"] = fields[14] if len(fields) > 14 else None
|
|
|
|
return result
|
|
|
|
|
|
def print_realtime_data(parsed: Dict[str, Any]) -> None:
|
|
if parsed.get("type") == "json":
|
|
print("[JSON]", json.dumps(parsed["data"], ensure_ascii=False))
|
|
return
|
|
|
|
if parsed.get("type") != "realtime":
|
|
print("[RAW]", parsed.get("raw"))
|
|
return
|
|
|
|
if parsed.get("tr_id") != TR_ID:
|
|
print("[OTHER]", parsed.get("raw"))
|
|
return
|
|
|
|
print(
|
|
"[체결] "
|
|
f"종목={parsed.get('stock_code')} "
|
|
f"시간={parsed.get('trade_time')} "
|
|
f"현재가={parsed.get('current_price')} "
|
|
f"대비={parsed.get('compare_price')} "
|
|
f"등락률={parsed.get('change_rate')} "
|
|
f"거래량={parsed.get('trade_volume')}"
|
|
)
|
|
|
|
|
|
async def subscribe_realtime(app_key: str, app_secret: str, stock_code: str, ws_url: str) -> None:
|
|
subscribe_message = build_subscribe_message(app_key, app_secret, stock_code)
|
|
|
|
async with websockets.connect(ws_url, ping_interval=30, ping_timeout=10) as websocket:
|
|
print(f"WebSocket 연결 성공: {ws_url}")
|
|
await websocket.send(subscribe_message)
|
|
print(f"실시간 구독 요청 전송: TR={TR_ID}, 종목코드={stock_code}")
|
|
|
|
async for message in websocket:
|
|
parsed = parse_realtime_message(message)
|
|
if parsed:
|
|
print_realtime_data(parsed)
|
|
|
|
|
|
def main() -> None:
|
|
try:
|
|
app_key = get_env("KIS_APP_KEY")
|
|
app_secret = get_env("KIS_APP_SECRET")
|
|
stock_code = get_env("KIS_STOCK_CODE", required=False, default=TR_KEY_DEFAULT)
|
|
env_name = get_env("KIS_ENV", required=False, default="real")
|
|
rest_base, ws_url = get_base_urls(env_name)
|
|
|
|
access_token = get_access_token(app_key, app_secret, rest_base)
|
|
print("OAuth access token 발급 성공")
|
|
# 현재 예제는 토큰 발급 절차를 먼저 수행해 인증 정보 유효성을 확인한다.
|
|
# WebSocket 구독 시 실제 운영 환경에서 approval_key가 필요하면 관련 API 추가 구현이 필요하다.
|
|
_ = access_token
|
|
|
|
asyncio.run(subscribe_realtime(app_key, app_secret, stock_code, ws_url))
|
|
except KeyboardInterrupt:
|
|
print("\n종료합니다.")
|
|
except Exception as exc:
|
|
print(f"오류 발생: {exc}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|