113 lines
4.0 KiB
Python
113 lines
4.0 KiB
Python
import os
|
|
import zipfile
|
|
import httpx
|
|
import logging
|
|
import asyncio
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import delete
|
|
from app.db.models import MasterStock
|
|
from app.db.database import SessionLocal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MasterService:
|
|
BASE_URL = "https://new.real.download.dws.co.kr/common/master"
|
|
FILES = {
|
|
"KOSPI": "kospi_code.mst.zip",
|
|
"KOSDAQ": "kosdaq_code.mst.zip"
|
|
}
|
|
TMP_DIR = "./tmp_master"
|
|
|
|
async def sync_master_data(self, db: AsyncSession):
|
|
"""
|
|
Download and parse KOSPI/KOSDAQ master files.
|
|
Populate MasterStock table.
|
|
"""
|
|
logger.info("MasterService: Starting Master Data Sync...")
|
|
os.makedirs(self.TMP_DIR, exist_ok=True)
|
|
|
|
try:
|
|
# Clear existing data? Or Upsert?
|
|
# For simplicity, Clear and Re-insert (Full Sync)
|
|
# await db.execute(delete(MasterStock)) # Optional: Clear all
|
|
|
|
total_count = 0
|
|
|
|
async with httpx.AsyncClient(verify=False) as client:
|
|
for market, filename in self.FILES.items():
|
|
url = f"{self.BASE_URL}/{filename}"
|
|
dest = os.path.join(self.TMP_DIR, filename)
|
|
|
|
# 1. Download
|
|
logger.info(f"Downloading {market} from {url}...")
|
|
try:
|
|
resp = await client.get(url, timeout=60.0)
|
|
resp.raise_for_status()
|
|
with open(dest, "wb") as f:
|
|
f.write(resp.content)
|
|
except Exception as e:
|
|
logger.error(f"Failed to download {market}: {e}")
|
|
continue
|
|
|
|
# 2. Unzip & Parse
|
|
count = await self._process_zip(dest, market, db)
|
|
total_count += count
|
|
|
|
await db.commit()
|
|
logger.info(f"MasterService: Sync Complete. Total {total_count} stocks.")
|
|
|
|
# Cleanup
|
|
# shutil.rmtree(self.TMP_DIR)
|
|
|
|
except Exception as e:
|
|
logger.error(f"MasterService: Fatal Error: {e}")
|
|
await db.rollback()
|
|
|
|
async def _process_zip(self, zip_path: str, market: str, db: AsyncSession) -> int:
|
|
try:
|
|
with zipfile.ZipFile(zip_path, 'r') as zf:
|
|
mst_filename = zf.namelist()[0] # Usually only one .mst file
|
|
zf.extract(mst_filename, self.TMP_DIR)
|
|
mst_path = os.path.join(self.TMP_DIR, mst_filename)
|
|
|
|
return await self._parse_mst(mst_path, market, db)
|
|
except Exception as e:
|
|
logger.error(f"Error processing ZIP {zip_path}: {e}")
|
|
return 0
|
|
|
|
async def _parse_mst(self, mst_path: str, market: str, db: AsyncSession) -> int:
|
|
count = 0
|
|
batch = []
|
|
|
|
# Encoding is usually cp949 for KIS files
|
|
with open(mst_path, "r", encoding="cp949", errors="replace") as f:
|
|
for line in f:
|
|
# Format:
|
|
# row[0:9] : Short Code (Example: "005930 ")
|
|
# row[9:21] : Standard Code
|
|
# row[21:len-222] : Name
|
|
|
|
if len(line) < 250: continue # Invalid line
|
|
|
|
short_code = line[0:9].strip()
|
|
# standard_code = line[9:21].strip()
|
|
name_part = line[21:len(line)-222].strip()
|
|
|
|
if not short_code or not name_part: continue
|
|
|
|
# Check for ETF/ETN? (Usually included)
|
|
|
|
obj = MasterStock(
|
|
code=short_code,
|
|
name=name_part,
|
|
market=market
|
|
)
|
|
db.add(obj)
|
|
count += 1
|
|
|
|
# Batch commit? session.add is fast, commit at end.
|
|
|
|
return count
|
|
|
|
master_service = MasterService()
|