Files
KisStock/backend/app/services/kis_auth.py

129 lines
4.9 KiB
Python

import httpx
from datetime import datetime, timedelta
from sqlalchemy import select
from app.db.database import SessionLocal
from app.db.models import ApiSettings
from app.db.models import ApiSettings
from app.core.crypto import decrypt_str, encrypt_str
class KisAuth:
BASE_URL_REAL = "https://openapi.koreainvestment.com:9443"
# BASE_URL_VIRTUAL = "https://openapivts.koreainvestment.com:29443"
def __init__(self):
pass
async def get_access_token(self, db_session=None) -> str:
"""
Returns valid access token. Issues new one if expired or missing.
"""
local_session = False
if not db_session:
db_session = SessionLocal()
local_session = True
try:
# 1. Get Settings
stmt = select(ApiSettings).where(ApiSettings.id == 1)
result = await db_session.execute(stmt)
settings_obj = result.scalar_one_or_none()
if not settings_obj or not settings_obj.appKey or not settings_obj.appSecret:
raise ValueError("KIS API Credentials not configured.")
# 2. Check Expiry (Buffer 10 mins)
# 2. Check Expiry (Buffer 10 mins)
if settings_obj.accessToken and settings_obj.tokenExpiry:
token_dec = decrypt_str(settings_obj.accessToken)
if token_dec and token_dec != "[Decryption Failed]":
if settings_obj.tokenExpiry > datetime.now() + timedelta(minutes=10):
return token_dec
# 3. Issue New Token
app_key_dec = decrypt_str(settings_obj.appKey)
app_secret_dec = decrypt_str(settings_obj.appSecret)
token_data = await self._issue_token(app_key_dec, app_secret_dec)
# 4. Save to DB (Encrypt Token)
settings_obj.accessToken = encrypt_str(token_data['access_token'])
# expires_in is seconds (usually 86400)
settings_obj.tokenExpiry = datetime.now() + timedelta(seconds=int(token_data['expires_in']))
await db_session.commit()
return token_data['access_token']
except Exception as e:
await db_session.rollback()
raise e
finally:
if local_session:
await db_session.close()
async def _issue_token(self, app_key: str, app_secret: str) -> dict:
url = f"{self.BASE_URL_REAL}/oauth2/tokenP"
payload = {
"grant_type": "client_credentials",
"appkey": app_key,
"appsecret": app_secret
}
async with httpx.AsyncClient() as client:
resp = await client.post(url, json=payload, headers={"Content-Type": "application/json"})
resp.raise_for_status()
return resp.json()
async def get_approval_key(self, db_session=None) -> str:
"""
Returns WebSocket Approval Key. Issues new one if missing.
"""
local_session = False
if not db_session:
db_session = SessionLocal()
local_session = True
try:
stmt = select(ApiSettings).where(ApiSettings.id == 1)
result = await db_session.execute(stmt)
settings_obj = result.scalar_one_or_none()
if not settings_obj or not settings_obj.appKey or not settings_obj.appSecret:
raise ValueError("KIS API Credentials not configured.")
if settings_obj.websocketApprovalKey:
approval_key_dec = decrypt_str(settings_obj.websocketApprovalKey)
if approval_key_dec and approval_key_dec != "[Decryption Failed]":
return approval_key_dec
# Issue New Key
app_key_dec = decrypt_str(settings_obj.appKey)
app_secret_dec = decrypt_str(settings_obj.appSecret)
approval_key = await self._issue_approval_key(app_key_dec, app_secret_dec)
settings_obj.websocketApprovalKey = encrypt_str(approval_key)
await db_session.commit()
return approval_key
except Exception as e:
await db_session.rollback()
raise e
finally:
if local_session:
await db_session.close()
async def _issue_approval_key(self, app_key: str, app_secret: str) -> str:
url = f"{self.BASE_URL_REAL}/oauth2/Approval"
payload = {
"grant_type": "client_credentials",
"appkey": app_key,
"secretkey": app_secret # Note: Parameter name difference
}
async with httpx.AsyncClient() as client:
resp = await client.post(url, json=payload, headers={"Content-Type": "application/json"})
resp.raise_for_status()
data = resp.json()
return data['approval_key']
kis_auth = KisAuth()