# -*- coding: utf-8 -*- # ====| (REST) 접근 토큰 / (Websocket) 웹소켓 접속키 발급 에 필요한 API 호출 샘플 아래 참고하시기 바랍니다. |===================== # ====| API 호출 공통 함수 포함 |===================== import asyncio import copy import json import logging import os import time from base64 import b64decode from collections import namedtuple from collections.abc import Callable from datetime import datetime from io import StringIO import pandas as pd # pip install requests (패키지설치) import requests # 웹 소켓 모듈을 선언한다. import websockets # pip install PyYAML (패키지설치) import yaml from Crypto.Cipher import AES # pip install pycryptodome from Crypto.Util.Padding import unpad clearConsole = lambda: os.system("cls" if os.name in ("nt", "dos") else "clear") key_bytes = 32 config_root = os.path.join(os.path.expanduser("~"), "KIS", "config") # config_root = "$HOME/KIS/config/" # 토큰 파일이 저장될 폴더, 제3자가 찾기 어렵도록 경로 설정하시기 바랍니다. # token_tmp = config_root + 'KIS000000' # 토큰 로컬저장시 파일 이름 지정, 파일이름을 토큰값이 유추가능한 파일명은 삼가바랍니다. # token_tmp = config_root + 'KIS' + datetime.today().strftime("%Y%m%d%H%M%S") # 토큰 로컬저장시 파일명 년월일시분초 token_tmp = os.path.join( config_root, f"KIS{datetime.today().strftime("%Y%m%d")}" ) # 토큰 로컬저장시 파일명 년월일 # 접근토큰 관리하는 파일 존재여부 체크, 없으면 생성 if os.path.exists(token_tmp) == False: f = open(token_tmp, "w+") # 앱키, 앱시크리트, 토큰, 계좌번호 등 저장관리, 자신만의 경로와 파일명으로 설정하시기 바랍니다. # pip install PyYAML (패키지설치) with open(os.path.join(config_root, "kis_devlp.yaml"), encoding="UTF-8") as f: _cfg = yaml.load(f, Loader=yaml.FullLoader) _TRENV = tuple() _last_auth_time = datetime.now() _autoReAuth = False _DEBUG = False _isPaper = False _smartSleep = 0.1 # 기본 헤더값 정의 _base_headers = { "Content-Type": "application/json", "Accept": "text/plain", "charset": "UTF-8", "User-Agent": _cfg["my_agent"], } # 토큰 발급 받아 저장 (토큰값, 토큰 유효시간,1일, 6시간 이내 발급신청시는 기존 토큰값과 동일, 발급시 알림톡 발송) def save_token(my_token, my_expired): # print(type(my_expired), my_expired) valid_date = datetime.strptime(my_expired, "%Y-%m-%d %H:%M:%S") # print('Save token date: ', valid_date) with open(token_tmp, "w", encoding="utf-8") as f: f.write(f"token: {my_token}\n") f.write(f"valid-date: {valid_date}\n") # 토큰 확인 (토큰값, 토큰 유효시간_1일, 6시간 이내 발급신청시는 기존 토큰값과 동일, 발급시 알림톡 발송) def read_token(): try: # 토큰이 저장된 파일 읽기 with open(token_tmp, encoding="UTF-8") as f: tkg_tmp = yaml.load(f, Loader=yaml.FullLoader) # 토큰 만료 일,시간 exp_dt = datetime.strftime(tkg_tmp["valid-date"], "%Y-%m-%d %H:%M:%S") # 현재일자,시간 now_dt = datetime.today().strftime("%Y-%m-%d %H:%M:%S") # print('expire dt: ', exp_dt, ' vs now dt:', now_dt) # 저장된 토큰 만료일자 체크 (만료일시 > 현재일시 인경우 보관 토큰 리턴) if exp_dt > now_dt: return tkg_tmp["token"] else: # print('Need new token: ', tkg_tmp['valid-date']) return None except Exception: # print('read token error: ', e) return None # 토큰 유효시간 체크해서 만료된 토큰이면 재발급처리 def _getBaseHeader(): if _autoReAuth: reAuth() return copy.deepcopy(_base_headers) # 가져오기 : 앱키, 앱시크리트, 종합계좌번호(계좌번호 중 숫자8자리), 계좌상품코드(계좌번호 중 숫자2자리), 토큰, 도메인 def _setTRENV(cfg): nt1 = namedtuple( "KISEnv", ["my_app", "my_sec", "my_acct", "my_prod", "my_htsid", "my_token", "my_url", "my_url_ws"], ) d = { "my_app": cfg["my_app"], # 앱키 "my_sec": cfg["my_sec"], # 앱시크리트 "my_acct": cfg["my_acct"], # 종합계좌번호(8자리) "my_prod": cfg["my_prod"], # 계좌상품코드(2자리) "my_htsid": cfg["my_htsid"], # HTS ID "my_token": cfg["my_token"], # 토큰 "my_url": cfg[ "my_url" ], # 실전 도메인 (https://openapi.koreainvestment.com:9443) "my_url_ws": cfg["my_url_ws"], } # 모의 도메인 (https://openapivts.koreainvestment.com:29443) # print(cfg['my_app']) global _TRENV _TRENV = nt1(**d) def isPaperTrading(): # 모의투자 매매 return _isPaper # 실전투자면 'prod', 모의투자면 'vps'를 셋팅 하시기 바랍니다. def changeTREnv(token_key, svr="prod", product=_cfg["my_prod"]): cfg = dict() global _isPaper if svr == "prod": # 실전투자 ak1 = "my_app" # 실전투자용 앱키 ak2 = "my_sec" # 실전투자용 앱시크리트 _isPaper = False _smartSleep = 0.05 elif svr == "vps": # 모의투자 ak1 = "paper_app" # 모의투자용 앱키 ak2 = "paper_sec" # 모의투자용 앱시크리트 _isPaper = True _smartSleep = 0.5 cfg["my_app"] = _cfg[ak1] cfg["my_sec"] = _cfg[ak2] if svr == "prod" and product == "01": # 실전투자 주식투자, 위탁계좌, 투자계좌 cfg["my_acct"] = _cfg["my_acct_stock"] elif svr == "prod" and product == "03": # 실전투자 선물옵션(파생) cfg["my_acct"] = _cfg["my_acct_future"] elif svr == "prod" and product == "08": # 실전투자 해외선물옵션(파생) cfg["my_acct"] = _cfg["my_acct_future"] elif svr == "prod" and product == "22": # 실전투자 개인연금저축계좌 cfg["my_acct"] = _cfg["my_acct_stock"] elif svr == "prod" and product == "29": # 실전투자 퇴직연금계좌 cfg["my_acct"] = _cfg["my_acct_stock"] elif svr == "vps" and product == "01": # 모의투자 주식투자, 위탁계좌, 투자계좌 cfg["my_acct"] = _cfg["my_paper_stock"] elif svr == "vps" and product == "03": # 모의투자 선물옵션(파생) cfg["my_acct"] = _cfg["my_paper_future"] cfg["my_prod"] = product cfg["my_htsid"] = _cfg["my_htsid"] cfg["my_url"] = _cfg[svr] try: my_token = _TRENV.my_token except AttributeError: my_token = "" cfg["my_token"] = my_token if token_key else token_key cfg["my_url_ws"] = _cfg["ops" if svr == "prod" else "vops"] # print(cfg) _setTRENV(cfg) def _getResultObject(json_data): _tc_ = namedtuple("res", json_data.keys()) return _tc_(**json_data) # Token 발급, 유효기간 1일, 6시간 이내 발급시 기존 token값 유지, 발급시 알림톡 무조건 발송 # 모의투자인 경우 svr='vps', 투자계좌(01)이 아닌경우 product='XX' 변경하세요 (계좌번호 뒤 2자리) def auth(svr="prod", product=_cfg["my_prod"], url=None): p = { "grant_type": "client_credentials", } # 개인 환경파일 "kis_devlp.yaml" 파일을 참조하여 앱키, 앱시크리트 정보 가져오기 # 개인 환경파일명과 위치는 고객님만 아는 위치로 설정 바랍니다. if svr == "prod": # 실전투자 ak1 = "my_app" # 앱키 (실전투자용) ak2 = "my_sec" # 앱시크리트 (실전투자용) elif svr == "vps": # 모의투자 ak1 = "paper_app" # 앱키 (모의투자용) ak2 = "paper_sec" # 앱시크리트 (모의투자용) # 앱키, 앱시크리트 가져오기 p["appkey"] = _cfg[ak1] p["appsecret"] = _cfg[ak2] # 기존 발급된 토큰이 있는지 확인 saved_token = read_token() # 기존 발급 토큰 확인 # print("saved_token: ", saved_token) if saved_token is None: # 기존 발급 토큰 확인이 안되면 발급처리 url = f"{_cfg[svr]}/oauth2/tokenP" res = requests.post( url, data=json.dumps(p), headers=_getBaseHeader() ) # 토큰 발급 rescode = res.status_code if rescode == 200: # 토큰 정상 발급 my_token = _getResultObject(res.json()).access_token # 토큰값 가져오기 my_expired = _getResultObject( res.json() ).access_token_token_expired # 토큰값 만료일시 가져오기 save_token(my_token, my_expired) # 새로 발급 받은 토큰 저장 else: print("Get Authentification token fail!\nYou have to restart your app!!!") return else: my_token = saved_token # 기존 발급 토큰 확인되어 기존 토큰 사용 # 발급토큰 정보 포함해서 헤더값 저장 관리, API 호출시 필요 changeTREnv(my_token, svr, product) _base_headers["authorization"] = f"Bearer {my_token}" _base_headers["appkey"] = _TRENV.my_app _base_headers["appsecret"] = _TRENV.my_sec global _last_auth_time _last_auth_time = datetime.now() if _DEBUG: print(f"[{_last_auth_time}] => get AUTH Key completed!") # end of initialize, 토큰 재발급, 토큰 발급시 유효시간 1일 # 프로그램 실행시 _last_auth_time에 저장하여 유효시간 체크, 유효시간 만료시 토큰 발급 처리 def reAuth(svr="prod", product=_cfg["my_prod"]): n2 = datetime.now() if (n2 - _last_auth_time).seconds >= 86400: # 유효시간 1일 auth(svr, product) def getEnv(): return _cfg def smart_sleep(): if _DEBUG: print(f"[RateLimit] Sleeping {_smartSleep}s ") time.sleep(_smartSleep) def getTREnv(): return _TRENV # 주문 API에서 사용할 hash key값을 받아 header에 설정해 주는 함수 # 현재는 hash key 필수 사항아님, 생략가능, API 호출과정에서 변조 우려를 하는 경우 사용 # Input: HTTP Header, HTTP post param # Output: None def set_order_hash_key(h, p): url = f"{getTREnv().my_url}/uapi/hashkey" # hashkey 발급 API URL res = requests.post(url, data=json.dumps(p), headers=h) rescode = res.status_code if rescode == 200: h["hashkey"] = _getResultObject(res.json()).HASH else: print("Error:", rescode) # API 호출 응답에 필요한 처리 공통 함수 class APIResp: def __init__(self, resp): self._rescode = resp.status_code self._resp = resp self._header = self._setHeader() self._body = self._setBody() self._err_code = self._body.msg_cd self._err_message = self._body.msg1 def getResCode(self): return self._rescode def _setHeader(self): fld = dict() for x in self._resp.headers.keys(): if x.islower(): fld[x] = self._resp.headers.get(x) _th_ = namedtuple("header", fld.keys()) return _th_(**fld) def _setBody(self): _tb_ = namedtuple("body", self._resp.json().keys()) return _tb_(**self._resp.json()) def getHeader(self): return self._header def getBody(self): return self._body def getResponse(self): return self._resp def isOK(self): try: if self.getBody().rt_cd == "0": return True else: return False except: return False def getErrorCode(self): return self._err_code def getErrorMessage(self): return self._err_message def printAll(self): print("
") for x in self.getHeader()._fields: print(f"\t-{x}: {getattr(self.getHeader(), x)}") print("") for x in self.getBody()._fields: print(f"\t-{x}: {getattr(self.getBody(), x)}") def printError(self, url): print( "-------------------------------\nError in response: ", self.getResCode(), " url=", url, ) print( "rt_cd : ", self.getBody().rt_cd, "/ msg_cd : ", self.getErrorCode(), "/ msg1 : ", self.getErrorMessage(), ) print("-------------------------------") # end of class APIResp class APIRespError(APIResp): def __init__(self, status_code, error_text): # 부모 생성자 호출하지 않고 직접 초기화 self.status_code = status_code self.error_text = error_text self._error_code = str(status_code) self._error_message = error_text def isOK(self): return False def getErrorCode(self): return self._error_code def getErrorMessage(self): return self._error_message def getBody(self): # 빈 객체 리턴 (속성 접근 시 AttributeError 방지) class EmptyBody: def __getattr__(self, name): return None return EmptyBody() def getHeader(self): # 빈 객체 리턴 class EmptyHeader: tr_cont = "" def __getattr__(self, name): return "" return EmptyHeader() def printAll(self): print(f"=== ERROR RESPONSE ===") print(f"Status Code: {self.status_code}") print(f"Error Message: {self.error_text}") print(f"======================") def printError(self, url=""): print(f"Error Code : {self.status_code} | {self.error_text}") if url: print(f"URL: {url}") ########### API call wrapping : API 호출 공통 def _url_fetch( api_url, ptr_id, tr_cont, params, appendHeaders=None, postFlag=False, hashFlag=True ): url = f"{getTREnv().my_url}{api_url}" headers = _getBaseHeader() # 기본 header 값 정리 # 추가 Header 설정 tr_id = ptr_id if ptr_id[0] in ("T", "J", "C"): # 실전투자용 TR id 체크 if isPaperTrading(): # 모의투자용 TR id 식별 tr_id = "V" + ptr_id[1:] headers["tr_id"] = tr_id # 트랜젝션 TR id headers["custtype"] = "P" # 일반(개인고객,법인고객) "P", 제휴사 "B" headers["tr_cont"] = tr_cont # 트랜젝션 TR id if appendHeaders is not None: if len(appendHeaders) > 0: for x in appendHeaders.keys(): headers[x] = appendHeaders.get(x) if _DEBUG: print("< Sending Info >") print(f"URL: {url}, TR: {tr_id}") print(f"
\n{headers}") print(f"\n{params}") if postFlag: # if (hashFlag): set_order_hash_key(headers, params) res = requests.post(url, headers=headers, data=json.dumps(params)) else: res = requests.get(url, headers=headers, params=params) if res.status_code == 200: ar = APIResp(res) if _DEBUG: ar.printAll() return ar else: print("Error Code : " + str(res.status_code) + " | " + res.text) return APIRespError(res.status_code, res.text) # auth() # print("Pass through the end of the line") ########### New - websocket 대응 _base_headers_ws = { "content-type": "utf-8", } def _getBaseHeader_ws(): if _autoReAuth: reAuth_ws() return copy.deepcopy(_base_headers_ws) def auth_ws(svr="prod", product=_cfg["my_prod"]): p = {"grant_type": "client_credentials"} if svr == "prod": ak1 = "my_app" ak2 = "my_sec" elif svr == "vps": ak1 = "paper_app" ak2 = "paper_sec" p["appkey"] = _cfg[ak1] p["secretkey"] = _cfg[ak2] url = f"{_cfg[svr]}/oauth2/Approval" res = requests.post(url, data=json.dumps(p), headers=_getBaseHeader()) # 토큰 발급 rescode = res.status_code if rescode == 200: # 토큰 정상 발급 approval_key = _getResultObject(res.json()).approval_key else: print("Get Approval token fail!\nYou have to restart your app!!!") return changeTREnv(None, svr, product) _base_headers_ws["approval_key"] = approval_key global _last_auth_time _last_auth_time = datetime.now() if _DEBUG: print(f"[{_last_auth_time}] => get AUTH Key completed!") def reAuth_ws(svr="prod", product=_cfg["my_prod"]): n2 = datetime.now() if (n2 - _last_auth_time).seconds >= 86400: auth_ws(svr, product) def data_fetch(tr_id, tr_type, params, appendHeaders=None) -> dict: headers = _getBaseHeader_ws() # 기본 header 값 정리 headers["tr_type"] = tr_type headers["custtype"] = "P" if appendHeaders is not None: if len(appendHeaders) > 0: for x in appendHeaders.keys(): headers[x] = appendHeaders.get(x) if _DEBUG: print("< Sending Info >") print(f"TR: {tr_id}") print(f"
\n{headers}") inp = { "tr_id": tr_id, } inp.update(params) return {"header": headers, "body": {"input": inp}} # iv, ekey, encrypt 는 각 기능 메소드 파일에 저장할 수 있도록 dict에서 return 하도록 def system_resp(data): isPingPong = False isUnSub = False isOk = False tr_msg = None tr_key = None encrypt, iv, ekey = None, None, None rdic = json.loads(data) tr_id = rdic["header"]["tr_id"] if tr_id != "PINGPONG": tr_key = rdic["header"]["tr_key"] encrypt = rdic["header"]["encrypt"] if rdic.get("body", None) is not None: isOk = True if rdic["body"]["rt_cd"] == "0" else False tr_msg = rdic["body"]["msg1"] # 복호화를 위한 key 를 추출 if "output" in rdic["body"]: iv = rdic["body"]["output"]["iv"] ekey = rdic["body"]["output"]["key"] isUnSub = True if tr_msg[:5] == "UNSUB" else False else: isPingPong = True if tr_id == "PINGPONG" else False nt2 = namedtuple( "SysMsg", [ "isOk", "tr_id", "tr_key", "isUnSub", "isPingPong", "tr_msg", "iv", "ekey", "encrypt", ], ) d = { "isOk": isOk, "tr_id": tr_id, "tr_key": tr_key, "tr_msg": tr_msg, "isUnSub": isUnSub, "isPingPong": isPingPong, "iv": iv, "ekey": ekey, "encrypt": encrypt, } return nt2(**d) def aes_cbc_base64_dec(key, iv, cipher_text): if key is None or iv is None: raise AttributeError("key and iv cannot be None") cipher = AES.new(key.encode("utf-8"), AES.MODE_CBC, iv.encode("utf-8")) return bytes.decode(unpad(cipher.decrypt(b64decode(cipher_text)), AES.block_size)) ##### open_map: dict = {} def add_open_map( name: str, request: Callable[[str, str, ...], (dict, list[str])], data: str | list[str], kwargs: dict = None, ): if open_map.get(name, None) is None: open_map[name] = { "func": request, "items": [], "kwargs": kwargs, } if type(data) is list: open_map[name]["items"] += data elif type(data) is str: open_map[name]["items"].append(data) data_map: dict = {} def add_data_map( tr_id: str, columns: list = None, encrypt: str = None, key: str = None, iv: str = None, ): if data_map.get(tr_id, None) is None: data_map[tr_id] = {"columns": [], "encrypt": False, "key": None, "iv": None} if columns is not None: data_map[tr_id]["columns"] = columns if encrypt is not None: data_map[tr_id]["encrypt"] = encrypt if key is not None: data_map[tr_id]["key"] = key if iv is not None: data_map[tr_id]["iv"] = iv class KISWebSocket: api_url: str = "" on_result: Callable[ [websockets.ClientConnection, str, pd.DataFrame, dict], None ] = None result_all_data: bool = False retry_count: int = 0 amx_retries: int = 0 # init def __init__(self, api_url: str, max_retries: int = 3): self.api_url = api_url self.max_retries = max_retries # private async def __subscriber(self, ws: websockets.ClientConnection): async for raw in ws: logging.info("received message >> %s" % raw) show_result = False df = pd.DataFrame() if raw[0] in ["0", "1"]: d1 = raw.split("|") if len(d1) < 4: raise ValueError("data not found...") tr_id = d1[1] dm = data_map[tr_id] d = d1[3] if dm.get("encrypt", None) == "Y": d = aes_cbc_base64_dec(dm["key"], dm["iv"], d) df = pd.read_csv( StringIO(d), header=None, sep="^", names=dm["columns"], dtype=object ) show_result = True else: rsp = system_resp(raw) tr_id = rsp.tr_id add_data_map( tr_id=rsp.tr_id, encrypt=rsp.encrypt, key=rsp.ekey, iv=rsp.iv ) if rsp.isPingPong: print(f"### RECV [PINGPONG] [{raw}]") await ws.pong(raw) print(f"### SEND [PINGPONG] [{raw}]") if self.result_all_data: show_result = True if show_result is True and self.on_result is not None: self.on_result(ws, tr_id, df, data_map[tr_id]) async def __runner(self): if len(open_map.keys()) > 40: raise ValueError("Subscription's max is 40") url = f"{getTREnv().my_url_ws}{self.api_url}" while self.retry_count < self.max_retries: try: async with websockets.connect(url) as ws: # request subscribe for name, obj in open_map.items(): await self.send_multiple( ws, obj["func"], "1", obj["items"], obj["kwargs"] ) # subscriber await asyncio.gather( self.__subscriber(ws), ) except Exception as e: print("Connection exception >> ", e) self.retry_count += 1 await asyncio.sleep(1) # func @classmethod async def send( cls, ws: websockets.ClientConnection, request: Callable[[str, str, ...], (dict, list[str])], tr_type: str, data: str, kwargs: dict = None, ): k = {} if kwargs is None else kwargs msg, columns = request(tr_type, data, **k) add_data_map(tr_id=msg["body"]["input"]["tr_id"], columns=columns) logging.info("send message >> %s" % json.dumps(msg)) await ws.send(json.dumps(msg)) smart_sleep() async def send_multiple( self, ws: websockets.ClientConnection, request: Callable[[str, str, ...], (dict, list[str])], tr_type: str, data: list | str, kwargs: dict = None, ): if type(data) is str: await self.send(ws, request, tr_type, data, kwargs) elif type(data) is list: for d in data: await self.send(ws, request, tr_type, d, kwargs) else: raise ValueError("data must be str or list") @classmethod def subscribe( cls, request: Callable[[str, str, ...], (dict, list[str])], data: list | str, kwargs: dict = None, ): add_open_map(request.__name__, request, data, kwargs) def unsubscribe( self, ws: websockets.ClientConnection, request: Callable[[str, str, ...], (dict, list[str])], data: list | str, ): self.send_multiple(ws, request, "2", data) # start def start( self, on_result: Callable[ [websockets.ClientConnection, str, pd.DataFrame, dict], None ], result_all_data: bool = False, ): self.on_result = on_result self.result_all_data = result_all_data try: asyncio.run(self.__runner()) except KeyboardInterrupt: print("Closing by KeyboardInterrupt")