Files
KisStock0/backend/kis_api.py
2026-02-04 00:16:34 +09:00

380 lines
14 KiB
Python

import requests
import json
import datetime
import os
import time
import copy
from config import get_kis_config
import logging
# Basic Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("KIS_API")
class KisApi:
def __init__(self):
self.config = get_kis_config()
self.app_key = self.config.get("app_key")
self.app_secret = self.config.get("app_secret")
self.account_no = str(self.config.get("account_no", "")).replace("-", "").strip() # 8 digits
self.account_prod = str(self.config.get("account_prod", "01")).strip() # 2 digits
self.is_paper = self.config.get("is_paper", True)
self.htsid = self.config.get("htsid", "")
logger.info(f"Initialized KIS API: Account={self.account_no}, Prod={self.account_prod}, Paper={self.is_paper}")
if self.is_paper:
self.base_url = "https://openapivts.koreainvestment.com:29443"
else:
self.base_url = "https://openapi.koreainvestment.com:9443"
self.token_file = "kis_token.tmp"
self.access_token = None
self.token_expired = None
self.last_req_time = 0
self._auth()
def _auth(self):
# Clean outdated token file if needed or load existing
if os.path.exists(self.token_file):
with open(self.token_file, 'r') as f:
data = json.load(f)
expired = datetime.datetime.strptime(data['expired'], "%Y-%m-%d %H:%M:%S")
if expired > datetime.datetime.now():
self.access_token = data['token']
self.token_expired = expired
logger.info("Loaded credentials from cache.")
return
# Request new token
url = f"{self.base_url}/oauth2/tokenP"
headers = {"content-type": "application/json"}
body = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"appsecret": self.app_secret
}
res = requests.post(url, headers=headers, data=json.dumps(body))
if res.status_code == 200:
data = res.json()
self.access_token = data['access_token']
self.token_expired = datetime.datetime.strptime(data['access_token_token_expired'], "%Y-%m-%d %H:%M:%S")
# Save to file
with open(self.token_file, 'w') as f:
json.dump({
"token": self.access_token,
"expired": data['access_token_token_expired']
}, f)
logger.info("Issued new access token.")
else:
logger.error(f"Auth Failed: {res.text}")
raise Exception("Authentication Failed")
def get_websocket_key(self):
"""
Get Approval Key for WebSocket
"""
url = f"{self.base_url}/oauth2/Approval"
headers = {"content-type": "application/json"}
body = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"secretkey": self.app_secret
}
res = requests.post(url, headers=headers, data=json.dumps(body))
if res.status_code == 200:
return res.json().get("approval_key")
else:
logger.error(f"WS Key Failed: {res.text}")
return None
def _get_header(self, tr_id=None):
header = {
"Content-Type": "application/json",
"authorization": f"Bearer {self.access_token}",
"appkey": self.app_key,
"appsecret": self.app_secret,
"tr_id": tr_id,
"custtype": "P"
}
return header
def _request(self, method, path, tr_id=None, x_tr_id_buy=None, x_tr_id_sell=None, **kwargs):
"""
Centralized request handler with auto-token refresh logic and 500ms throttling.
"""
# Throttling
now = time.time()
diff = now - self.last_req_time
if diff < 0.5:
time.sleep(0.5 - diff)
self.last_req_time = time.time()
url = f"{self.base_url}{path}"
# Determine TR ID
curr_tr_id = tr_id
if x_tr_id_buy and x_tr_id_sell:
pass
# Prepare headers
headers = self._get_header(curr_tr_id)
# Execute Request
try:
if method.upper() == "GET":
res = requests.get(url, headers=headers, **kwargs)
else:
res = requests.post(url, headers=headers, **kwargs)
# Check for Token Expiration (EGW00123)
if res.status_code == 200:
data = res.json()
if isinstance(data, dict):
msg_cd = data.get('msg_cd', '')
if msg_cd == 'EGW00123': # Expired Token
logger.warning("Token expired (EGW00123). Refreshing token and retrying...")
# Remove token file
if os.path.exists(self.token_file):
os.remove(self.token_file)
# Re-auth
self._auth()
# Update headers with new token
headers = self._get_header(curr_tr_id)
# Retry
if method.upper() == "GET":
res = requests.get(url, headers=headers, **kwargs)
else:
res = requests.post(url, headers=headers, **kwargs)
return res.json()
return data
else:
logger.error(f"API Request Failed [{res.status_code}]: {res.text}")
return None
except Exception as e:
logger.error(f"Request Exception: {e}")
return None
def get_current_price(self, code):
"""
inquire-price
"""
tr_id = "FHKST01010100"
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code
}
res = self._request("GET", "/uapi/domestic-stock/v1/quotations/inquire-price", tr_id=tr_id, params=params)
return res.get('output', {}) if res else None
def get_balance(self, source=None):
"""
inquire-balance
Cached for 2 seconds to prevent rate limit (EGW00201)
"""
# Check cache
now = time.time()
if hasattr(self, '_balance_cache') and self._balance_cache:
last_time, data = self._balance_cache
if now - last_time < 2.0: # 2 Seconds TTL
return data
log_source = f" [Source: {source}]" if source else ""
logger.info(f"get_balance{log_source}: Account={self.account_no}, Paper={self.is_paper}")
tr_id = "VTTC8434R" if self.is_paper else "TTTC8434R"
params = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_prod,
"AFHR_FLPR_YN": "N",
"OFL_YN": "",
"INQR_DVSN": "02", # 02: By Stock
"UNPR_DVSN": "01",
"FUND_STTL_ICLD_YN": "N",
"FNCG_AMT_AUTO_RDPT_YN": "N",
"PRCS_DVSN": "00",
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": ""
}
res = self._request("GET", "/uapi/domestic-stock/v1/trading/inquire-balance", tr_id=tr_id, params=params)
# Update Cache if success
if res and res.get('rt_cd') == '0':
self._balance_cache = (now, res)
return res
def get_overseas_balance(self, exchange="NASD"):
"""
overseas-stock inquire-balance
"""
# For overseas, we need to know the exchange code often, but inquire-balance might return all if configured?
# Typically requires an exchange code in some params or TR IDs specific to exchange?
# Looking at docs: TTTS3012R is for "Overseas Stock Balance"
tr_id = "VTTS3012R" if self.is_paper else "TTTS3012R"
# Paper env TR ID is tricky, usually V... but let's assume VTTS3012R or JTTT3012R?
# Common pattern: Real 'T...' -> Paper 'V...'
params = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_prod,
"OVRS_EXCG_CD": exchange, # NASD, NYSE, AMEX, HKS, TSE, etc.
"TR_CRCY_CD": "USD", # Transaction Currency
"CTX_AREA_FK200": "",
"CTX_AREA_NK200": ""
}
return self._request("GET", "/uapi/overseas-stock/v1/trading/inquire-balance", tr_id=tr_id, params=params)
def place_order(self, code, type, qty, price):
"""
order-cash
type: 'buy' or 'sell'
"""
if self.is_paper:
tr_id = "VTTC0012U" if type == 'buy' else "VTTC0011U"
else:
tr_id = "TTTC0012U" if type == 'buy' else "TTTC0011U"
# 00: Limit (Specified Price), 01: Market Price
ord_dvsn = "00" if int(price) > 0 else "01"
body = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_prod,
"PDNO": code,
"ORD_DVSN": ord_dvsn,
"ORD_QTY": str(qty),
"ORD_UNPR": str(price),
"EXCG_ID_DVSN_CD": "KRX", # KRX for Exchange
"SLL_TYPE": "01", # 01: Normal Sell
"CNDT_PRIC": ""
}
return self._request("POST", "/uapi/domestic-stock/v1/trading/order-cash", tr_id=tr_id, data=json.dumps(body))
def place_overseas_order(self, code, type, qty, price, market="NASD"):
"""
overseas-stock order
"""
# Checks for Paper vs Real and Buy vs Sell
# TR_ID might vary by country. Assuming US (NASD, NYSE, AMEX).
if self.is_paper:
tr_id = "VTTT1002U" if type == 'buy' else "VTTT1006U"
else:
# US Real: JTTT1002U (Buy), JTTT1006U (Sell)
# Note: This TR ID is for US Night Market (Main).
tr_id = "JTTT1002U" if type == 'buy' else "JTTT1006U"
# Price '0' or empty is usually Market Price, but overseas api often requires specific handling.
# US Market Price Order usually uses ord_dvsn="00" and price="0" or empty?
# KIS Docs: US Limit="00", Market="00"?? No, usually "00" is Limit.
# "32": LOO, "33": LOC, "34": MOO, "35": MOC...
# Let's stick to Limit ("00") for now. If price is 0, user might mean Market, but US API requires price for limit.
# If user sends 0, let's try to assume "00" (Limit) with price 0 (might fail) or valid Market ("01"? No).
# Safe bet: US Market Order is "00" (Limit) with Price 0? No.
# Use "00" (Limit) as default. If price is 0, we can't easily do Market on US via standard "01" like domestic.
ord_dvsn = "00" # Limit
body = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_prod,
"OVRS_EXCG_CD": market,
"PDNO": code,
"ORD_QTY": str(qty),
"OVRS_ORD_UNPR": str(price),
"ORD_SVR_DVSN_CD": "0",
"ORD_DVSN": ord_dvsn
}
return self._request("POST", "/uapi/overseas-stock/v1/trading/order", tr_id=tr_id, data=json.dumps(body))
def get_daily_orders(self, start_date=None, end_date=None, expanded_code=None):
"""
inquire-daily-ccld
"""
if self.is_paper:
tr_id = "VTTC0081R" # 3-month inner
else:
tr_id = "TTTC0081R" # 3-month inner
if not start_date:
start_date = datetime.datetime.now().strftime("%Y%m%d")
if not end_date:
end_date = datetime.datetime.now().strftime("%Y%m%d")
params = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_prod,
"INQR_STRT_DT": start_date,
"INQR_END_DT": end_date,
"SLL_BUY_DVSN_CD": "00", # All
"PDNO": "",
"CCLD_DVSN": "00", # All (Executed + Unexecuted)
"INQR_DVSN": "00", # Reverse Order
"INQR_DVSN_3": "00", # All
"ORD_GNO_BRNO": "",
"ODNO": "",
"INQR_DVSN_1": "",
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": ""
}
return self._request("GET", "/uapi/domestic-stock/v1/trading/inquire-daily-ccld", tr_id=tr_id, params=params)
def get_cancelable_orders(self):
"""
inquire-psbl-rvsecncl
"""
tr_id = "VTTC0084R" if self.is_paper else "TTTC0084R"
params = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_prod,
"INQR_DVSN_1": "0", # 0: Order No order
"INQR_DVSN_2": "0", # 0: All
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": ""
}
return self._request("GET", "/uapi/domestic-stock/v1/trading/inquire-psbl-rvsecncl", tr_id=tr_id, params=params)
def cancel_order(self, org_no, order_no, qty, is_buy, price="0", total=True):
"""
order-rvsecncl
"""
if self.is_paper:
tr_id = "VTTC0013U"
else:
tr_id = "TTTC0013U"
rvse_cncl_dvsn_cd = "02"
qty_all = "Y" if total else "N"
body = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_prod,
"KRX_FWDG_ORD_ORGNO": org_no,
"ORGN_ODNO": order_no,
"ORD_DVSN": "00",
"RVSE_CNCL_DVSN_CD": rvse_cncl_dvsn_cd,
"ORD_QTY": str(qty),
"ORD_UNPR": str(price),
"QTY_ALL_ORD_YN": qty_all,
"EXCG_ID_DVSN_CD": "KRX"
}
return self._request("POST", "/uapi/domestic-stock/v1/trading/order-rvsecncl", tr_id=tr_id, data=json.dumps(body))
# Singleton Instance
kis = KisApi()