import logging from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete from datetime import datetime from app.db.database import SessionLocal from app.db.models import AccountStatus, Holding from app.services.kis_client import kis_client logger = logging.getLogger(__name__) class SyncService: async def sync_account(self, db: AsyncSession): """ Fetches balance from KIS and updates DB (AccountStatus & Holdings). Currently supports Domestic only. """ logger.info("SyncService: Starting Account Sync...") try: # 1. Fetch Domestic Balance # kis_client.get_balance returns dict with 'output1', 'output2' or None res = await kis_client.get_balance("Domestic") if not res: logger.error("SyncService: Failed to fetch balance (API Error or No Data).") return # output1: Holdings List # output2: Account Summary output1 = res.get('output1', []) output2 = res.get('output2', []) # KIS API returns output2 as a LIST of 1 dict usually summary_data = output2[0] if output2 else {} # --- Update AccountStatus --- # Map KIS fields to AccountStatus model # tot_evlu_amt: 총평가금액 (Total Assets) # dnca_tot_amt: 예수금총액 (Buying Power) # evlu_pfls_smt_tl: 평가손익합계 (Daily Profit - approximation) # evlu_pfls_rt: 수익률 total_assets = float(summary_data.get('tot_evlu_amt', 0) or 0) buying_power = float(summary_data.get('dnca_tot_amt', 0) or 0) daily_profit = float(summary_data.get('evlu_pfls_smt_tl', 0) or 0) # Calculate daily profit rate if not provided directly # profit_rate = float(summary_data.get('evlu_pfls_rt', 0)) # Sometimes available daily_profit_rate = 0.0 if total_assets > 0: daily_profit_rate = (daily_profit / total_assets) * 100 # Upsert AccountStatus (ID=1) stmt = select(AccountStatus).where(AccountStatus.id == 1) result = await db.execute(stmt) status = result.scalar_one_or_none() if not status: status = AccountStatus(id=1) db.add(status) status.totalAssets = total_assets status.buyingPower = buying_power status.dailyProfit = daily_profit status.dailyProfitRate = daily_profit_rate # --- Update Holdings --- # Strategy: Delete all existing holdings (refresh) or Upsert? # Refresh is safer to remove sold items. await db.execute(delete(Holding)) for item in output1: # Map fields # pdno: 종목번호 # prdt_name: 종목명 # hldg_qty: 보유수량 # pchs_avg_pric: 매입평균가격 # prpr: 현재가 # evlu_pfls_amt: 평가손익금액 # evlu_pfls_rt: 평가손익율 # evlu_amt: 평가금액 code = item.get('pdno') if not code: continue h = Holding( stockCode=code, stockName=item.get('prdt_name', 'Unknown'), quantity=int(item.get('hldg_qty', 0) or 0), avgPrice=float(item.get('pchs_avg_pric', 0) or 0), currentPrice=float(item.get('prpr', 0) or 0), profit=float(item.get('evlu_pfls_amt', 0) or 0), profitRate=float(item.get('evlu_pfls_rt', 0) or 0), marketValue=float(item.get('evlu_amt', 0) or 0) ) db.add(h) await db.commit() logger.info(f"SyncService: Account Sync Complete. Assets: {total_assets}, Holdings: {len(output1)}") except Exception as e: await db.rollback() logger.error(f"SyncService: Error during sync: {e}") sync_service = SyncService()