import os import zipfile import httpx import logging import asyncio from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import delete from app.db.models import MasterStock from app.db.database import SessionLocal logger = logging.getLogger(__name__) class MasterService: BASE_URL = "https://new.real.download.dws.co.kr/common/master" FILES = { "KOSPI": "kospi_code.mst.zip", "KOSDAQ": "kosdaq_code.mst.zip" } TMP_DIR = "./tmp_master" async def sync_master_data(self, db: AsyncSession): """ Download and parse KOSPI/KOSDAQ master files. Populate MasterStock table. """ logger.info("MasterService: Starting Master Data Sync...") os.makedirs(self.TMP_DIR, exist_ok=True) try: # Clear existing data? Or Upsert? # For simplicity, Clear and Re-insert (Full Sync) # await db.execute(delete(MasterStock)) # Optional: Clear all total_count = 0 async with httpx.AsyncClient(verify=False) as client: for market, filename in self.FILES.items(): url = f"{self.BASE_URL}/{filename}" dest = os.path.join(self.TMP_DIR, filename) # 1. Download logger.info(f"Downloading {market} from {url}...") try: resp = await client.get(url, timeout=60.0) resp.raise_for_status() with open(dest, "wb") as f: f.write(resp.content) except Exception as e: logger.error(f"Failed to download {market}: {e}") continue # 2. Unzip & Parse count = await self._process_zip(dest, market, db) total_count += count await db.commit() logger.info(f"MasterService: Sync Complete. Total {total_count} stocks.") # Cleanup # shutil.rmtree(self.TMP_DIR) except Exception as e: logger.error(f"MasterService: Fatal Error: {e}") await db.rollback() async def _process_zip(self, zip_path: str, market: str, db: AsyncSession) -> int: try: with zipfile.ZipFile(zip_path, 'r') as zf: mst_filename = zf.namelist()[0] # Usually only one .mst file zf.extract(mst_filename, self.TMP_DIR) mst_path = os.path.join(self.TMP_DIR, mst_filename) return await self._parse_mst(mst_path, market, db) except Exception as e: logger.error(f"Error processing ZIP {zip_path}: {e}") return 0 async def _parse_mst(self, mst_path: str, market: str, db: AsyncSession) -> int: count = 0 batch = [] # Encoding is usually cp949 for KIS files with open(mst_path, "r", encoding="cp949", errors="replace") as f: for line in f: # Format: # row[0:9] : Short Code (Example: "005930 ") # row[9:21] : Standard Code # row[21:len-222] : Name if len(line) < 250: continue # Invalid line short_code = line[0:9].strip() # standard_code = line[9:21].strip() name_part = line[21:len(line)-222].strip() if not short_code or not name_part: continue # Check for ETF/ETN? (Usually included) obj = MasterStock( code=short_code, name=name_part, market=market ) db.add(obj) count += 1 # Batch commit? session.add is fast, commit at end. return count master_service = MasterService()