import httpx from datetime import datetime, timedelta from sqlalchemy import select from app.db.database import SessionLocal from app.db.models import ApiSettings from app.db.models import ApiSettings from app.core.crypto import decrypt_str, encrypt_str import logging class KisAuth: BASE_URL_REAL = "https://openapi.koreainvestment.com:9443" # BASE_URL_VIRTUAL = "https://openapivts.koreainvestment.com:29443" def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) async def get_access_token(self, db_session=None) -> str: """ Returns valid access token. Issues new one if expired or missing. """ local_session = False if not db_session: db_session = SessionLocal() local_session = True try: # 1. Get Settings stmt = select(ApiSettings).where(ApiSettings.id == 1) result = await db_session.execute(stmt) settings_obj = result.scalar_one_or_none() if not settings_obj or not settings_obj.appKey or not settings_obj.appSecret: raise ValueError("KIS API Credentials not configured.") # 2. Check Expiry (Buffer 10 mins) # 2. Check Expiry (Buffer 10 mins) if settings_obj.accessToken and settings_obj.tokenExpiry: token_dec = decrypt_str(settings_obj.accessToken) if token_dec and token_dec != "[Decryption Failed]": if settings_obj.tokenExpiry > datetime.now() + timedelta(minutes=10): # self.logger.debug("Using cached Access Token.") # Too verbose? return token_dec # 3. Issue New Token self.logger.info("Access Token Expired or Missing. Issuing New Token...") app_key_dec = decrypt_str(settings_obj.appKey) app_secret_dec = decrypt_str(settings_obj.appSecret) token_data = await self._issue_token(app_key_dec, app_secret_dec) # 4. Save to DB (Encrypt Token) settings_obj.accessToken = encrypt_str(token_data['access_token']) # expires_in is seconds (usually 86400) settings_obj.tokenExpiry = datetime.now() + timedelta(seconds=int(token_data['expires_in'])) await db_session.commit() return token_data['access_token'] except Exception as e: await db_session.rollback() raise e finally: if local_session: await db_session.close() async def _issue_token(self, app_key: str, app_secret: str) -> dict: url = f"{self.BASE_URL_REAL}/oauth2/tokenP" payload = { "grant_type": "client_credentials", "appkey": app_key, "appsecret": app_secret } async with httpx.AsyncClient() as client: resp = await client.post(url, json=payload, headers={"Content-Type": "application/json"}) resp.raise_for_status() return resp.json() async def get_approval_key(self, db_session=None) -> str: """ Returns WebSocket Approval Key. Issues new one if missing. """ local_session = False if not db_session: db_session = SessionLocal() local_session = True try: stmt = select(ApiSettings).where(ApiSettings.id == 1) result = await db_session.execute(stmt) settings_obj = result.scalar_one_or_none() if not settings_obj or not settings_obj.appKey or not settings_obj.appSecret: raise ValueError("KIS API Credentials not configured.") if settings_obj.websocketApprovalKey: approval_key_dec = decrypt_str(settings_obj.websocketApprovalKey) if approval_key_dec and approval_key_dec != "[Decryption Failed]": return approval_key_dec # Issue New Key app_key_dec = decrypt_str(settings_obj.appKey) app_secret_dec = decrypt_str(settings_obj.appSecret) approval_key = await self._issue_approval_key(app_key_dec, app_secret_dec) settings_obj.websocketApprovalKey = encrypt_str(approval_key) await db_session.commit() return approval_key except Exception as e: await db_session.rollback() raise e finally: if local_session: await db_session.close() async def _issue_approval_key(self, app_key: str, app_secret: str) -> str: url = f"{self.BASE_URL_REAL}/oauth2/Approval" payload = { "grant_type": "client_credentials", "appkey": app_key, "secretkey": app_secret # Note: Parameter name difference } async with httpx.AsyncClient() as client: resp = await client.post(url, json=payload, headers={"Content-Type": "application/json"}) resp.raise_for_status() data = resp.json() return data['approval_key'] kis_auth = KisAuth()