Files
KisStock/backend/app/services/master_service.py

113 lines
4.0 KiB
Python

import os
import zipfile
import httpx
import logging
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import delete
from app.db.models import MasterStock
from app.db.database import SessionLocal
logger = logging.getLogger(__name__)
class MasterService:
BASE_URL = "https://new.real.download.dws.co.kr/common/master"
FILES = {
"KOSPI": "kospi_code.mst.zip",
"KOSDAQ": "kosdaq_code.mst.zip"
}
TMP_DIR = "./tmp_master"
async def sync_master_data(self, db: AsyncSession):
"""
Download and parse KOSPI/KOSDAQ master files.
Populate MasterStock table.
"""
logger.info("MasterService: Starting Master Data Sync...")
os.makedirs(self.TMP_DIR, exist_ok=True)
try:
# Clear existing data? Or Upsert?
# For simplicity, Clear and Re-insert (Full Sync)
# await db.execute(delete(MasterStock)) # Optional: Clear all
total_count = 0
async with httpx.AsyncClient(verify=False) as client:
for market, filename in self.FILES.items():
url = f"{self.BASE_URL}/{filename}"
dest = os.path.join(self.TMP_DIR, filename)
# 1. Download
logger.info(f"Downloading {market} from {url}...")
try:
resp = await client.get(url, timeout=60.0)
resp.raise_for_status()
with open(dest, "wb") as f:
f.write(resp.content)
except Exception as e:
logger.error(f"Failed to download {market}: {e}")
continue
# 2. Unzip & Parse
count = await self._process_zip(dest, market, db)
total_count += count
await db.commit()
logger.info(f"MasterService: Sync Complete. Total {total_count} stocks.")
# Cleanup
# shutil.rmtree(self.TMP_DIR)
except Exception as e:
logger.error(f"MasterService: Fatal Error: {e}")
await db.rollback()
async def _process_zip(self, zip_path: str, market: str, db: AsyncSession) -> int:
try:
with zipfile.ZipFile(zip_path, 'r') as zf:
mst_filename = zf.namelist()[0] # Usually only one .mst file
zf.extract(mst_filename, self.TMP_DIR)
mst_path = os.path.join(self.TMP_DIR, mst_filename)
return await self._parse_mst(mst_path, market, db)
except Exception as e:
logger.error(f"Error processing ZIP {zip_path}: {e}")
return 0
async def _parse_mst(self, mst_path: str, market: str, db: AsyncSession) -> int:
count = 0
batch = []
# Encoding is usually cp949 for KIS files
with open(mst_path, "r", encoding="cp949", errors="replace") as f:
for line in f:
# Format:
# row[0:9] : Short Code (Example: "005930 ")
# row[9:21] : Standard Code
# row[21:len-222] : Name
if len(line) < 250: continue # Invalid line
short_code = line[0:9].strip()
# standard_code = line[9:21].strip()
name_part = line[21:len(line)-222].strip()
if not short_code or not name_part: continue
# Check for ETF/ETN? (Usually included)
obj = MasterStock(
code=short_code,
name=name_part,
market=market
)
db.add(obj)
count += 1
# Batch commit? session.add is fast, commit at end.
return count
master_service = MasterService()