chore: update workspace config and memory

This commit is contained in:
arin
2026-03-30 19:30:25 +09:00
commit f3726b39d1
3479 changed files with 346874 additions and 0 deletions

209
kis_realtime.py Normal file
View File

@@ -0,0 +1,209 @@
#!/usr/bin/env python3
"""KIS 실시간 주가 구독 예제.
환경 변수:
- KIS_APP_KEY
- KIS_APP_SECRET
- KIS_STOCK_CODE (선택, 기본값: 005930)
- KIS_ENV (선택, real 또는 mock. 기본값: real)
주의:
- 실시간 시세 WebSocket 접속 주소/헤더/메시지 포맷은 KIS 공식 문서 기준으로 작성했다.
- 계정/상품 권한에 따라 실시간 시세 이용 가능 여부가 달라질 수 있다.
"""
from __future__ import annotations
import asyncio
import json
import os
import sys
from typing import Any, Dict, List
import requests
import websockets
REST_BASE_REAL = "https://openapi.koreainvestment.com:9443"
REST_BASE_MOCK = "https://openapivts.koreainvestment.com:29443"
WS_URL_REAL = "ws://ops.koreainvestment.com:21000"
WS_URL_MOCK = "ws://ops.koreainvestment.com:31000"
TR_ID = "H0STCNT0" # 실시간 주식 체결가
TR_KEY_DEFAULT = "005930"
class KISConfigError(Exception):
pass
def get_env(name: str, required: bool = True, default: str | None = None) -> str:
value = os.getenv(name, default)
if required and not value:
raise KISConfigError(f"환경 변수 {name} 가 필요합니다.")
return value or ""
def get_base_urls(env_name: str) -> tuple[str, str]:
env_name = env_name.lower().strip()
if env_name == "mock":
return REST_BASE_MOCK, WS_URL_MOCK
return REST_BASE_REAL, WS_URL_REAL
def get_access_token(app_key: str, app_secret: str, rest_base: str) -> str:
"""OAuth access token 발급."""
url = f"{rest_base}/oauth2/tokenP"
headers = {"content-type": "application/json; charset=UTF-8"}
payload = {
"grant_type": "client_credentials",
"appkey": app_key,
"appsecret": app_secret,
}
response = requests.post(url, headers=headers, json=payload, timeout=10)
response.raise_for_status()
data = response.json()
access_token = data.get("access_token")
if not access_token:
raise RuntimeError(f"access_token 발급 실패: {data}")
return access_token
def build_subscribe_message(app_key: str, app_secret: str, stock_code: str) -> str:
"""KIS WebSocket 구독 메시지 생성."""
message = {
"header": {
"approval_key": "",
"custtype": "P",
"tr_type": "1",
"content-type": "utf-8",
},
"body": {
"input": {
"tr_id": TR_ID,
"tr_key": stock_code,
}
},
}
# 일부 문서/환경에서는 approval_key 사전 발급을 요구한다.
# 여기서는 토큰 방식 예제를 유지하되, approval_key가 필요하면 별도 API 연동으로 교체 가능.
_ = (app_key, app_secret)
return json.dumps(message)
def parse_realtime_message(raw: str) -> Dict[str, Any] | None:
"""실시간 수신 메시지 파싱.
KIS 실시간 체결 데이터는 보통 '|' 구분 후 '^' 필드 구분 형태가 온다.
예: 0|H0STCNT0|001|...^...^
"""
if not raw:
return None
# JSON 제어 메시지 처리
if raw.startswith("{"):
try:
return {"type": "json", "data": json.loads(raw)}
except json.JSONDecodeError:
return {"type": "text", "raw": raw}
parts = raw.split("|")
if len(parts) < 4:
return {"type": "text", "raw": raw}
data_flag, tr_id, _, payload = parts[0], parts[1], parts[2], parts[3]
fields: List[str] = payload.split("^")
result: Dict[str, Any] = {
"type": "realtime",
"data_flag": data_flag,
"tr_id": tr_id,
"raw": raw,
"fields": fields,
}
if tr_id == TR_ID:
# 공식 문서 기준 주요 필드만 우선 매핑
# 인덱스는 문서 개정에 따라 바뀔 수 있어 필요 시 조정해야 함
result["stock_code"] = fields[0] if len(fields) > 0 else None
result["trade_time"] = fields[1] if len(fields) > 1 else None
result["current_price"] = fields[2] if len(fields) > 2 else None
result["compare_sign"] = fields[3] if len(fields) > 3 else None
result["compare_price"] = fields[4] if len(fields) > 4 else None
result["change_rate"] = fields[5] if len(fields) > 5 else None
result["weighted_avg_price"] = fields[6] if len(fields) > 6 else None
result["open_price"] = fields[7] if len(fields) > 7 else None
result["high_price"] = fields[8] if len(fields) > 8 else None
result["low_price"] = fields[9] if len(fields) > 9 else None
result["trade_volume"] = fields[12] if len(fields) > 12 else None
result["accum_volume"] = fields[13] if len(fields) > 13 else None
result["accum_trade_value"] = fields[14] if len(fields) > 14 else None
return result
def print_realtime_data(parsed: Dict[str, Any]) -> None:
if parsed.get("type") == "json":
print("[JSON]", json.dumps(parsed["data"], ensure_ascii=False))
return
if parsed.get("type") != "realtime":
print("[RAW]", parsed.get("raw"))
return
if parsed.get("tr_id") != TR_ID:
print("[OTHER]", parsed.get("raw"))
return
print(
"[체결] "
f"종목={parsed.get('stock_code')} "
f"시간={parsed.get('trade_time')} "
f"현재가={parsed.get('current_price')} "
f"대비={parsed.get('compare_price')} "
f"등락률={parsed.get('change_rate')} "
f"거래량={parsed.get('trade_volume')}"
)
async def subscribe_realtime(app_key: str, app_secret: str, stock_code: str, ws_url: str) -> None:
subscribe_message = build_subscribe_message(app_key, app_secret, stock_code)
async with websockets.connect(ws_url, ping_interval=30, ping_timeout=10) as websocket:
print(f"WebSocket 연결 성공: {ws_url}")
await websocket.send(subscribe_message)
print(f"실시간 구독 요청 전송: TR={TR_ID}, 종목코드={stock_code}")
async for message in websocket:
parsed = parse_realtime_message(message)
if parsed:
print_realtime_data(parsed)
def main() -> None:
try:
app_key = get_env("KIS_APP_KEY")
app_secret = get_env("KIS_APP_SECRET")
stock_code = get_env("KIS_STOCK_CODE", required=False, default=TR_KEY_DEFAULT)
env_name = get_env("KIS_ENV", required=False, default="real")
rest_base, ws_url = get_base_urls(env_name)
access_token = get_access_token(app_key, app_secret, rest_base)
print("OAuth access token 발급 성공")
# 현재 예제는 토큰 발급 절차를 먼저 수행해 인증 정보 유효성을 확인한다.
# WebSocket 구독 시 실제 운영 환경에서 approval_key가 필요하면 관련 API 추가 구현이 필요하다.
_ = access_token
asyncio.run(subscribe_realtime(app_key, app_secret, stock_code, ws_url))
except KeyboardInterrupt:
print("\n종료합니다.")
except Exception as exc:
print(f"오류 발생: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()