118 lines
4.2 KiB
Python
118 lines
4.2 KiB
Python
import httpx
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy import select
|
|
from app.db.database import SessionLocal
|
|
from app.db.models import ApiSettings
|
|
|
|
class KisAuth:
|
|
BASE_URL_REAL = "https://openapi.koreainvestment.com:9443"
|
|
# BASE_URL_VIRTUAL = "https://openapivts.koreainvestment.com:29443"
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
async def get_access_token(self, db_session=None) -> str:
|
|
"""
|
|
Returns valid access token. Issues new one if expired or missing.
|
|
"""
|
|
local_session = False
|
|
if not db_session:
|
|
db_session = SessionLocal()
|
|
local_session = True
|
|
|
|
try:
|
|
# 1. Get Settings
|
|
stmt = select(ApiSettings).where(ApiSettings.id == 1)
|
|
result = await db_session.execute(stmt)
|
|
settings_obj = result.scalar_one_or_none()
|
|
|
|
if not settings_obj or not settings_obj.appKey or not settings_obj.appSecret:
|
|
raise ValueError("KIS API Credentials not configured.")
|
|
|
|
# 2. Check Expiry (Buffer 10 mins)
|
|
if settings_obj.accessToken and settings_obj.tokenExpiry:
|
|
if settings_obj.tokenExpiry > datetime.now() + timedelta(minutes=10):
|
|
return settings_obj.accessToken
|
|
|
|
# 3. Issue New Token
|
|
token_data = await self._issue_token(settings_obj.appKey, settings_obj.appSecret)
|
|
|
|
# 4. Save to DB
|
|
settings_obj.accessToken = token_data['access_token']
|
|
# expires_in is seconds (usually 86400)
|
|
settings_obj.tokenExpiry = datetime.now() + timedelta(seconds=int(token_data['expires_in']))
|
|
|
|
await db_session.commit()
|
|
return settings_obj.accessToken
|
|
|
|
except Exception as e:
|
|
await db_session.rollback()
|
|
raise e
|
|
finally:
|
|
if local_session:
|
|
await db_session.close()
|
|
|
|
async def _issue_token(self, app_key: str, app_secret: str) -> dict:
|
|
url = f"{self.BASE_URL_REAL}/oauth2/tokenP"
|
|
payload = {
|
|
"grant_type": "client_credentials",
|
|
"appkey": app_key,
|
|
"appsecret": app_secret
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(url, json=payload, headers={"Content-Type": "application/json"})
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
async def get_approval_key(self, db_session=None) -> str:
|
|
"""
|
|
Returns WebSocket Approval Key. Issues new one if missing.
|
|
"""
|
|
local_session = False
|
|
if not db_session:
|
|
db_session = SessionLocal()
|
|
local_session = True
|
|
|
|
try:
|
|
stmt = select(ApiSettings).where(ApiSettings.id == 1)
|
|
result = await db_session.execute(stmt)
|
|
settings_obj = result.scalar_one_or_none()
|
|
|
|
if not settings_obj or not settings_obj.appKey or not settings_obj.appSecret:
|
|
raise ValueError("KIS API Credentials not configured.")
|
|
|
|
if settings_obj.websocketApprovalKey:
|
|
return settings_obj.websocketApprovalKey
|
|
|
|
# Issue New Key
|
|
approval_key = await self._issue_approval_key(settings_obj.appKey, settings_obj.appSecret)
|
|
|
|
settings_obj.websocketApprovalKey = approval_key
|
|
await db_session.commit()
|
|
|
|
return approval_key
|
|
|
|
except Exception as e:
|
|
await db_session.rollback()
|
|
raise e
|
|
finally:
|
|
if local_session:
|
|
await db_session.close()
|
|
|
|
async def _issue_approval_key(self, app_key: str, app_secret: str) -> str:
|
|
url = f"{self.BASE_URL_REAL}/oauth2/Approval"
|
|
payload = {
|
|
"grant_type": "client_credentials",
|
|
"appkey": app_key,
|
|
"secretkey": app_secret # Note: Parameter name difference
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(url, json=payload, headers={"Content-Type": "application/json"})
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return data['approval_key']
|
|
|
|
kis_auth = KisAuth()
|