initial commit
This commit is contained in:
@@ -0,0 +1,4 @@
|
||||
from .kis import setup_kis_config
|
||||
from .environment import setup_environment, EnvironmentConfig
|
||||
from .master_file import MasterFileManager
|
||||
from .database import DatabaseEngine, Database
|
||||
540
한국투자증권(API)/MCP/Kis Trading MCP/module/plugin/database.py
Normal file
540
한국투자증권(API)/MCP/Kis Trading MCP/module/plugin/database.py
Normal file
@@ -0,0 +1,540 @@
|
||||
from typing import Any, Dict, List, Optional, Type, Union
|
||||
from sqlalchemy import create_engine, Engine
|
||||
from sqlalchemy.orm import sessionmaker, Session
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DatabaseEngine:
|
||||
"""1 SQLite 파일 : 1 엔진을 관리하는 클래스"""
|
||||
|
||||
def __init__(self, db_path: str, models: List[Type]):
|
||||
"""
|
||||
Args:
|
||||
db_path: SQLite 파일 경로
|
||||
models: 해당 데이터베이스에 포함될 모델 클래스들의 리스트
|
||||
"""
|
||||
self.db_path = db_path
|
||||
self.models = models
|
||||
self.engine: Optional[Engine] = None
|
||||
self.SessionLocal: Optional[sessionmaker] = None
|
||||
self._initialize_engine()
|
||||
|
||||
def _initialize_engine(self):
|
||||
"""데이터베이스 엔진 초기화"""
|
||||
try:
|
||||
# SQLite 연결 문자열 생성
|
||||
db_url = f"sqlite:///{self.db_path}"
|
||||
|
||||
# 엔진 생성
|
||||
self.engine = create_engine(
|
||||
db_url,
|
||||
echo=False, # SQL 로그 출력 여부
|
||||
pool_pre_ping=True, # 연결 상태 확인
|
||||
connect_args={"check_same_thread": False} # SQLite 멀티스레드 지원
|
||||
)
|
||||
|
||||
# 세션 팩토리 생성
|
||||
self.SessionLocal = sessionmaker(
|
||||
autocommit=False,
|
||||
autoflush=False,
|
||||
bind=self.engine
|
||||
)
|
||||
|
||||
# 테이블 생성
|
||||
self._create_tables()
|
||||
|
||||
logger.info(f"Database engine initialized: {self.db_path}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize database engine {self.db_path}: {e}")
|
||||
raise
|
||||
|
||||
def _create_tables(self):
|
||||
"""모든 모델의 테이블 생성"""
|
||||
try:
|
||||
from model.base import Base
|
||||
Base.metadata.create_all(bind=self.engine)
|
||||
logger.info(f"Tables created for {self.db_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create tables for {self.db_path}: {e}")
|
||||
raise
|
||||
|
||||
def get_session(self) -> Session:
|
||||
"""새로운 데이터베이스 세션 반환"""
|
||||
if not self.SessionLocal:
|
||||
raise RuntimeError("Database engine not initialized")
|
||||
return self.SessionLocal()
|
||||
|
||||
def insert(self, model_instance: Any) -> Any:
|
||||
"""
|
||||
모델 인스턴스를 데이터베이스에 삽입
|
||||
|
||||
Args:
|
||||
model_instance: 삽입할 모델 인스턴스
|
||||
|
||||
Returns:
|
||||
삽입된 모델 인스턴스 (ID 포함)
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
session.add(model_instance)
|
||||
session.commit()
|
||||
session.refresh(model_instance)
|
||||
logger.info(f"Inserted record: {type(model_instance).__name__}")
|
||||
return model_instance
|
||||
except SQLAlchemyError as e:
|
||||
session.rollback()
|
||||
logger.error(f"Failed to insert record: {e}")
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def update(self, model_class: Type, record_id: int, update_data: Dict[str, Any]) -> Optional[Any]:
|
||||
"""
|
||||
ID로 레코드 업데이트
|
||||
|
||||
Args:
|
||||
model_class: 업데이트할 모델 클래스
|
||||
record_id: 업데이트할 레코드의 ID
|
||||
update_data: 업데이트할 필드와 값의 딕셔너리
|
||||
|
||||
Returns:
|
||||
업데이트된 모델 인스턴스 또는 None
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
# 레코드 조회
|
||||
record = session.query(model_class).filter(model_class.id == record_id).first()
|
||||
if not record:
|
||||
logger.warning(f"Record not found: {model_class.__name__} ID {record_id}")
|
||||
return None
|
||||
|
||||
# 필드 업데이트
|
||||
for field, value in update_data.items():
|
||||
if hasattr(record, field):
|
||||
setattr(record, field, value)
|
||||
else:
|
||||
logger.warning(f"Field '{field}' not found in {model_class.__name__}")
|
||||
|
||||
session.commit()
|
||||
session.refresh(record)
|
||||
logger.info(f"Updated record: {model_class.__name__} ID {record_id}")
|
||||
return record
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
session.rollback()
|
||||
logger.error(f"Failed to update record: {e}")
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def delete(self, model_class: Type, record_id: int) -> bool:
|
||||
"""
|
||||
ID로 레코드 삭제
|
||||
|
||||
Args:
|
||||
model_class: 삭제할 모델 클래스
|
||||
record_id: 삭제할 레코드의 ID
|
||||
|
||||
Returns:
|
||||
삭제 성공 여부
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
# 레코드 조회
|
||||
record = session.query(model_class).filter(model_class.id == record_id).first()
|
||||
if not record:
|
||||
logger.warning(f"Record not found: {model_class.__name__} ID {record_id}")
|
||||
return False
|
||||
|
||||
session.delete(record)
|
||||
session.commit()
|
||||
logger.info(f"Deleted record: {model_class.__name__} ID {record_id}")
|
||||
return True
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
session.rollback()
|
||||
logger.error(f"Failed to delete record: {e}")
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def list(self, model_class: Type, filters: Optional[Dict[str, Any]] = None,
|
||||
limit: Optional[int] = None, offset: Optional[int] = None) -> List[Any]:
|
||||
"""
|
||||
조건에 맞는 레코드 목록 조회
|
||||
|
||||
Args:
|
||||
model_class: 조회할 모델 클래스
|
||||
filters: 필터 조건 딕셔너리 {field: value}
|
||||
limit: 조회할 최대 개수
|
||||
offset: 건너뛸 개수
|
||||
|
||||
Returns:
|
||||
조회된 레코드 리스트
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
query = session.query(model_class)
|
||||
|
||||
# 필터 적용
|
||||
if filters:
|
||||
for field, value in filters.items():
|
||||
if hasattr(model_class, field):
|
||||
query = query.filter(getattr(model_class, field) == value)
|
||||
else:
|
||||
logger.warning(f"Field '{field}' not found in {model_class.__name__}")
|
||||
|
||||
# 페이징 적용
|
||||
if offset:
|
||||
query = query.offset(offset)
|
||||
if limit:
|
||||
query = query.limit(limit)
|
||||
|
||||
results = query.all()
|
||||
logger.info(f"Listed {len(results)} records: {model_class.__name__}")
|
||||
return results
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"Failed to list records: {e}")
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def get(self, model_class: Type, filters: Dict[str, Any]) -> Optional[Any]:
|
||||
"""
|
||||
조건에 맞는 첫 번째 레코드 조회
|
||||
|
||||
Args:
|
||||
model_class: 조회할 모델 클래스
|
||||
filters: 필터 조건 딕셔너리 {field: value}
|
||||
|
||||
Returns:
|
||||
조회된 레코드 또는 None
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
query = session.query(model_class)
|
||||
|
||||
# 필터 적용
|
||||
for field, value in filters.items():
|
||||
if hasattr(model_class, field):
|
||||
query = query.filter(getattr(model_class, field) == value)
|
||||
else:
|
||||
logger.warning(f"Field '{field}' not found in {model_class.__name__}")
|
||||
|
||||
result = query.first()
|
||||
if result:
|
||||
logger.info(f"Found record: {model_class.__name__}")
|
||||
else:
|
||||
logger.info(f"No record found: {model_class.__name__}")
|
||||
return result
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"Failed to get record: {e}")
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def count(self, model_class: Type, filters: Optional[Dict[str, Any]] = None) -> int:
|
||||
"""
|
||||
조건에 맞는 레코드 개수 조회
|
||||
|
||||
Args:
|
||||
model_class: 조회할 모델 클래스
|
||||
filters: 필터 조건 딕셔너리 {field: value}
|
||||
|
||||
Returns:
|
||||
레코드 개수
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
query = session.query(model_class)
|
||||
|
||||
# 필터 적용
|
||||
if filters:
|
||||
for field, value in filters.items():
|
||||
if hasattr(model_class, field):
|
||||
query = query.filter(getattr(model_class, field) == value)
|
||||
else:
|
||||
logger.warning(f"Field '{field}' not found in {model_class.__name__}")
|
||||
|
||||
count = query.count()
|
||||
logger.info(f"Counted {count} records: {model_class.__name__}")
|
||||
return count
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"Failed to count records: {e}")
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def bulk_replace_master_data(self, model_class: Type, data_list: List[Dict], master_name: str) -> int:
|
||||
"""
|
||||
마스터 데이터 추가 (INSERT만) - 카테고리 레벨에서 이미 삭제됨
|
||||
|
||||
Args:
|
||||
model_class: 마스터 데이터 모델 클래스
|
||||
data_list: 삽입할 데이터 리스트 (딕셔너리 리스트)
|
||||
master_name: 마스터파일명 (로깅용)
|
||||
|
||||
Returns:
|
||||
삽입된 레코드 수
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
# 새 데이터 배치 삽입 (카테고리 레벨에서 이미 삭제되었으므로 INSERT만)
|
||||
if data_list:
|
||||
# 배치 크기 설정 (메모리 효율성을 위해 1000개씩)
|
||||
batch_size = 1000
|
||||
total_inserted = 0
|
||||
|
||||
for i in range(0, len(data_list), batch_size):
|
||||
batch = data_list[i:i + batch_size]
|
||||
batch_objects = []
|
||||
|
||||
for data in batch:
|
||||
# 모든 값을 문자열로 강제 변환 (SQLAlchemy 타입 추론 방지)
|
||||
str_data = {key: str(value) if value is not None else None for key, value in data.items()}
|
||||
|
||||
# 딕셔너리를 모델 인스턴스로 변환
|
||||
obj = model_class(**str_data)
|
||||
batch_objects.append(obj)
|
||||
|
||||
# 배치 삽입
|
||||
session.bulk_save_objects(batch_objects)
|
||||
total_inserted += len(batch_objects)
|
||||
|
||||
# 중간 커밋 (메모리 절약)
|
||||
if i + batch_size < len(data_list):
|
||||
session.commit()
|
||||
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_objects)} records")
|
||||
|
||||
# 최종 커밋
|
||||
session.commit()
|
||||
logger.info(f"Bulk replace completed: {total_inserted} records inserted into {model_class.__name__}")
|
||||
return total_inserted
|
||||
else:
|
||||
logger.warning(f"No data to insert for {master_name}")
|
||||
return 0
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
session.rollback()
|
||||
logger.error(f"Failed to bulk replace master data for {master_name}: {e}")
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def update_master_timestamp(self, tool_name: str, record_count: int = None) -> bool:
|
||||
"""
|
||||
마스터파일 업데이트 시간 기록
|
||||
|
||||
Args:
|
||||
tool_name: 툴명 (예: domestic_stock, overseas_stock)
|
||||
record_count: 레코드 수 (선택사항)
|
||||
|
||||
Returns:
|
||||
업데이트 성공 여부
|
||||
"""
|
||||
from model.updated import Updated
|
||||
|
||||
session = self.get_session()
|
||||
try:
|
||||
# 기존 레코드 조회
|
||||
existing_record = session.query(Updated).filter(Updated.tool_name == tool_name).first()
|
||||
|
||||
if existing_record:
|
||||
# 기존 레코드 업데이트
|
||||
existing_record.updated_at = datetime.now()
|
||||
logger.info(f"Updated timestamp for {tool_name}")
|
||||
else:
|
||||
# 새 레코드 생성
|
||||
new_record = Updated(
|
||||
tool_name=tool_name,
|
||||
updated_at=datetime.now()
|
||||
)
|
||||
session.add(new_record)
|
||||
logger.info(f"Created new timestamp record for {tool_name}")
|
||||
|
||||
session.commit()
|
||||
return True
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
session.rollback()
|
||||
logger.error(f"Failed to update master timestamp for {tool_name}: {e}")
|
||||
return False
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def get_master_update_time(self, tool_name: str) -> Optional[datetime]:
|
||||
"""
|
||||
마스터파일 마지막 업데이트 시간 조회
|
||||
|
||||
Args:
|
||||
tool_name: 툴명 (예: domestic_stock, overseas_stock)
|
||||
|
||||
Returns:
|
||||
마지막 업데이트 시간 또는 None
|
||||
"""
|
||||
from model.updated import Updated
|
||||
|
||||
session = self.get_session()
|
||||
try:
|
||||
record = session.query(Updated).filter(Updated.tool_name == tool_name).first()
|
||||
if record:
|
||||
logger.info(f"Found update time for {tool_name}: {record.updated_at}")
|
||||
return record.updated_at
|
||||
else:
|
||||
logger.info(f"No update record found for {tool_name}")
|
||||
return None
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"Failed to get master update time for {tool_name}: {e}")
|
||||
return None
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def is_master_data_available(self, model_class: Type) -> bool:
|
||||
"""
|
||||
마스터 데이터 존재 여부 확인
|
||||
|
||||
Args:
|
||||
model_class: 마스터 데이터 모델 클래스
|
||||
|
||||
Returns:
|
||||
데이터 존재 여부
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
count = session.query(model_class).count()
|
||||
available = count > 0
|
||||
logger.info(f"Master data availability check for {model_class.__name__}: {available} ({count} records)")
|
||||
return available
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"Failed to check master data availability for {model_class.__name__}: {e}")
|
||||
return False
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def close(self):
|
||||
"""데이터베이스 연결 종료"""
|
||||
if self.engine:
|
||||
self.engine.dispose()
|
||||
logger.info(f"Database engine closed: {self.db_path}")
|
||||
|
||||
def __repr__(self):
|
||||
return f"DatabaseEngine(db_path='{self.db_path}', models={len(self.models)})"
|
||||
|
||||
|
||||
class Database:
|
||||
"""데이터베이스 엔진들을 관리하는 Singleton 클래스"""
|
||||
|
||||
_instance: Optional['Database'] = None
|
||||
_initialized: bool = False
|
||||
|
||||
def __new__(cls) -> 'Database':
|
||||
"""Singleton 패턴 구현"""
|
||||
if cls._instance is None:
|
||||
cls._instance = super(Database, cls).__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
"""초기화 (한 번만 실행)"""
|
||||
if not self._initialized:
|
||||
self.dbs: Dict[str, DatabaseEngine] = {}
|
||||
self._initialized = True
|
||||
logger.info("Database singleton instance created")
|
||||
|
||||
def new(self, db_dir: str = "configs/master") -> None:
|
||||
"""
|
||||
마스터 데이터베이스 엔진 초기화
|
||||
|
||||
Args:
|
||||
db_dir: 데이터베이스 파일이 저장될 디렉토리
|
||||
"""
|
||||
try:
|
||||
# 데이터베이스 디렉토리 생성
|
||||
os.makedirs(db_dir, exist_ok=True)
|
||||
|
||||
# 하나의 통합 마스터 데이터베이스 엔진 생성
|
||||
self._create_master_engine(db_dir)
|
||||
|
||||
logger.info(f"Master database engine initialized: '{db_dir}/master.db'")
|
||||
logger.info(f"Available databases: {list(self.dbs.keys())}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize master database: {e}")
|
||||
raise
|
||||
|
||||
def _create_master_engine(self, db_dir: str):
|
||||
"""통합 마스터 데이터베이스 엔진 생성"""
|
||||
from model import ALL_MODELS
|
||||
|
||||
db_path = os.path.join(db_dir, "master.db")
|
||||
|
||||
self.dbs["master"] = DatabaseEngine(db_path, ALL_MODELS)
|
||||
logger.info("Created master database engine with all models")
|
||||
|
||||
def get_by_name(self, name: str) -> DatabaseEngine:
|
||||
"""
|
||||
이름으로 데이터베이스 엔진 조회
|
||||
|
||||
Args:
|
||||
name: 데이터베이스 이름
|
||||
|
||||
Returns:
|
||||
DatabaseEngine 인스턴스
|
||||
|
||||
Raises:
|
||||
KeyError: 해당 이름의 데이터베이스가 없는 경우
|
||||
"""
|
||||
if name not in self.dbs:
|
||||
available_dbs = list(self.dbs.keys())
|
||||
raise KeyError(f"Database '{name}' not found. Available databases: {available_dbs}")
|
||||
|
||||
return self.dbs[name]
|
||||
|
||||
def get_available_databases(self) -> List[str]:
|
||||
"""사용 가능한 데이터베이스 이름 목록 반환"""
|
||||
return list(self.dbs.keys())
|
||||
|
||||
def is_initialized(self) -> bool:
|
||||
"""데이터베이스가 초기화되었는지 확인"""
|
||||
return len(self.dbs) > 0
|
||||
|
||||
def ensure_initialized(self, db_dir: str = "configs/master") -> bool:
|
||||
"""데이터베이스가 초기화되지 않은 경우에만 초기화"""
|
||||
if not self.is_initialized():
|
||||
try:
|
||||
self.new(db_dir)
|
||||
logger.info("Database initialized on demand")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize database on demand: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
def close_all(self):
|
||||
"""모든 데이터베이스 연결 종료"""
|
||||
for name, engine in self.dbs.items():
|
||||
try:
|
||||
engine.close()
|
||||
logger.info(f"Closed database: {name}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to close database {name}: {e}")
|
||||
|
||||
self.dbs.clear()
|
||||
logger.info("All database connections closed")
|
||||
|
||||
def __repr__(self):
|
||||
return f"Database(engines={len(self.dbs)}, names={list(self.dbs.keys())})"
|
||||
|
||||
def __del__(self):
|
||||
"""소멸자 - 모든 연결 정리"""
|
||||
if hasattr(self, 'dbs') and self.dbs:
|
||||
self.close_all()
|
||||
43
한국투자증권(API)/MCP/Kis Trading MCP/module/plugin/environment.py
Normal file
43
한국투자증권(API)/MCP/Kis Trading MCP/module/plugin/environment.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import logging
|
||||
import os
|
||||
from collections import namedtuple
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Environment 설정을 위한 namedtuple 정의
|
||||
EnvironmentConfig = namedtuple('EnvironmentConfig', [
|
||||
'mcp_type', 'mcp_host', 'mcp_port', 'mcp_path'
|
||||
])
|
||||
|
||||
|
||||
def setup_environment(env: str) -> EnvironmentConfig:
|
||||
# get api env
|
||||
if not env:
|
||||
logging.error("Environment variable ENV not defined")
|
||||
exit(1)
|
||||
|
||||
# load .env
|
||||
dotenv_path = os.path.join(os.getcwd(), f".env.{env}")
|
||||
if not os.path.isfile(dotenv_path):
|
||||
logging.error(f"Environment variable file .env.{env} not found")
|
||||
exit(1)
|
||||
|
||||
load_dotenv(dotenv_path=dotenv_path)
|
||||
|
||||
# return environment
|
||||
# MCP_TYPE 검증 및 기본값 설정
|
||||
mcp_type = os.getenv("MCP_TYPE", "stdio")
|
||||
if mcp_type not in ['stdio', 'sse', 'streamable-http']:
|
||||
logging.warning(f"Invalid MCP_TYPE: {mcp_type}, using default: stdio")
|
||||
mcp_type = "stdio"
|
||||
|
||||
# MCP_PORT가 빈 문자열이면 기본값 사용
|
||||
mcp_port_str = os.getenv("MCP_PORT", "8000")
|
||||
mcp_port = int(mcp_port_str) if mcp_port_str.strip() else 8000
|
||||
|
||||
return EnvironmentConfig(
|
||||
mcp_type=mcp_type,
|
||||
mcp_host=os.getenv("MCP_HOST", "localhost"),
|
||||
mcp_port=mcp_port,
|
||||
mcp_path=os.getenv("MCP_PATH", "/mcp")
|
||||
)
|
||||
163
한국투자증권(API)/MCP/Kis Trading MCP/module/plugin/kis.py
Normal file
163
한국투자증권(API)/MCP/Kis Trading MCP/module/plugin/kis.py
Normal file
@@ -0,0 +1,163 @@
|
||||
import logging
|
||||
import os
|
||||
import requests
|
||||
import yaml
|
||||
|
||||
|
||||
def setup_kis_config(force_update=False):
|
||||
"""KIS 설정 파일 자동 생성 (템플릿 다운로드 + 환경변수로 값 덮어쓰기)
|
||||
|
||||
Args:
|
||||
force_update (bool): True면 기존 파일이 있어도 강제로 덮어쓰기
|
||||
"""
|
||||
|
||||
# kis_auth.py와 동일한 경로 생성 방식 사용
|
||||
kis_config_dir = os.path.join(os.path.expanduser("~"), "KIS", "config")
|
||||
|
||||
# KIS 설정 디렉토리 생성
|
||||
os.makedirs(kis_config_dir, exist_ok=True)
|
||||
|
||||
# 설정 파일 경로
|
||||
kis_config_path = os.path.join(kis_config_dir, "kis_devlp.yaml")
|
||||
|
||||
# 기존 파일 존재 확인
|
||||
if os.path.exists(kis_config_path) and not force_update:
|
||||
logging.info(f"✅ KIS 설정 파일이 이미 존재합니다: {kis_config_path}")
|
||||
logging.info("기존 파일을 사용합니다. 강제 업데이트가 필요한 경우 force_update=True 옵션을 사용하세요.")
|
||||
return True
|
||||
|
||||
# 1. kis_devlp.yaml 템플릿 다운로드
|
||||
template_url = "https://raw.githubusercontent.com/koreainvestment/open-trading-api/refs/heads/main/kis_devlp.yaml"
|
||||
|
||||
try:
|
||||
logging.info("KIS 설정 템플릿을 다운로드 중...")
|
||||
response = requests.get(template_url, timeout=30)
|
||||
response.raise_for_status()
|
||||
|
||||
# 원본 템플릿 텍스트 보존
|
||||
template_content = response.text
|
||||
logging.info("✅ KIS 설정 템플릿 다운로드 완료")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"❌ KIS 설정 템플릿 다운로드 실패: {e}")
|
||||
return False
|
||||
|
||||
# 2. 환경변수로 민감한 정보 덮어쓰기
|
||||
# 필수값 (누락 시 경고)
|
||||
app_key = os.getenv("KIS_APP_KEY")
|
||||
app_secret = os.getenv("KIS_APP_SECRET")
|
||||
|
||||
if not app_key or not app_secret:
|
||||
logging.warning("⚠️ 필수 환경변수가 설정되지 않았습니다:")
|
||||
if not app_key:
|
||||
logging.warning(" - KIS_APP_KEY")
|
||||
if not app_secret:
|
||||
logging.warning(" - KIS_APP_SECRET")
|
||||
logging.warning("실제 거래 API 사용이 불가능할 수 있습니다.")
|
||||
|
||||
# 선택적 값들 (누락 시 빈값 또는 기본값)
|
||||
paper_app_key = os.getenv("KIS_PAPER_APP_KEY", "")
|
||||
paper_app_secret = os.getenv("KIS_PAPER_APP_SECRET", "")
|
||||
hts_id = os.getenv("KIS_HTS_ID", "")
|
||||
acct_stock = os.getenv("KIS_ACCT_STOCK", "")
|
||||
acct_future = os.getenv("KIS_ACCT_FUTURE", "")
|
||||
paper_stock = os.getenv("KIS_PAPER_STOCK", "")
|
||||
paper_future = os.getenv("KIS_PAPER_FUTURE", "")
|
||||
prod_type = os.getenv("KIS_PROD_TYPE", "01") # 기본값: 종합계좌
|
||||
url_rest = os.getenv("KIS_URL_REST", "")
|
||||
url_rest_paper = os.getenv("KIS_URL_REST_PAPER", "")
|
||||
url_ws = os.getenv("KIS_URL_WS", "")
|
||||
url_ws_paper = os.getenv("KIS_URL_WS_PAPER", "")
|
||||
|
||||
# 3. YAML 파싱하여 값 업데이트
|
||||
try:
|
||||
# YAML 파싱 (주석 보존을 위해 ruamel.yaml 사용하거나, 간단히 pyyaml 사용)
|
||||
config = yaml.safe_load(template_content)
|
||||
|
||||
# 환경변수 값이 있으면 해당 필드만 업데이트
|
||||
if app_key:
|
||||
config['my_app'] = app_key
|
||||
logging.info(f"✅ 실전 App Key 설정 완료")
|
||||
if app_secret:
|
||||
config['my_sec'] = app_secret
|
||||
logging.info(f"✅ 실전 App Secret 설정 완료")
|
||||
|
||||
if paper_app_key:
|
||||
config['paper_app'] = paper_app_key
|
||||
logging.info(f"✅ 모의 App Key 설정 완료")
|
||||
if paper_app_secret:
|
||||
config['paper_sec'] = paper_app_secret
|
||||
logging.info(f"✅ 모의 App Secret 설정 완료")
|
||||
|
||||
if hts_id:
|
||||
config['my_htsid'] = hts_id
|
||||
logging.info(f"✅ HTS ID 설정 완료: {hts_id}")
|
||||
else:
|
||||
logging.warning("⚠️ KIS_HTS_ID 환경변수가 설정되지 않았습니다.")
|
||||
|
||||
if acct_stock:
|
||||
config['my_acct_stock'] = acct_stock
|
||||
logging.info(f"✅ 증권계좌 설정 완료")
|
||||
if acct_future:
|
||||
config['my_acct_future'] = acct_future
|
||||
logging.info(f"✅ 선물옵션계좌 설정 완료")
|
||||
if paper_stock:
|
||||
config['my_paper_stock'] = paper_stock
|
||||
logging.info(f"✅ 모의 증권계좌 설정 완료")
|
||||
if paper_future:
|
||||
config['my_paper_future'] = paper_future
|
||||
logging.info(f"✅ 모의 선물옵션계좌 설정 완료")
|
||||
|
||||
if prod_type != "01": # 기본값이 아닌 경우만 업데이트
|
||||
config['my_prod'] = prod_type
|
||||
logging.info(f"✅ 계좌상품코드 설정 완료: {prod_type}")
|
||||
|
||||
# URL 설정 업데이트 (직접 필드)
|
||||
if url_rest:
|
||||
config['prod'] = url_rest
|
||||
logging.info(f"✅ 실전 REST URL 설정 완료")
|
||||
if url_rest_paper:
|
||||
config['vps'] = url_rest_paper
|
||||
logging.info(f"✅ 모의 REST URL 설정 완료")
|
||||
if url_ws:
|
||||
config['ops'] = url_ws
|
||||
logging.info(f"✅ 실전 WebSocket URL 설정 완료")
|
||||
if url_ws_paper:
|
||||
config['vops'] = url_ws_paper
|
||||
logging.info(f"✅ 모의 WebSocket URL 설정 완료")
|
||||
|
||||
# YAML로 다시 변환
|
||||
updated_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
||||
|
||||
except yaml.YAMLError as e:
|
||||
logging.error(f"❌ YAML 파싱 오류: {e}")
|
||||
logging.info("문자열 치환 방식으로 대체합니다...")
|
||||
# 실패 시 기존 문자열 치환 방식 사용
|
||||
updated_content = template_content
|
||||
if app_key:
|
||||
updated_content = updated_content.replace('my_app: "앱키"', f'my_app: "{app_key}"')
|
||||
if app_secret:
|
||||
updated_content = updated_content.replace('my_sec: "앱키 시크릿"', f'my_sec: "{app_secret}"')
|
||||
if hts_id:
|
||||
updated_content = updated_content.replace('my_htsid: "사용자 HTS ID"', f'my_htsid: "{hts_id}"')
|
||||
# ... 나머지 기존 로직
|
||||
|
||||
# 4. 수정된 설정을 파일로 저장 (원본 구조 보존)
|
||||
try:
|
||||
with open(kis_config_path, 'w', encoding='utf-8') as f:
|
||||
f.write(updated_content)
|
||||
|
||||
logging.info(f"✅ KIS 설정 파일이 생성되었습니다: {kis_config_path}")
|
||||
|
||||
# 설정 요약 출력
|
||||
logging.info("📋 KIS 설정 요약:")
|
||||
logging.info(f" - 실제 거래: {'✅' if app_key and app_secret else '❌'}")
|
||||
logging.info(f" - 모의 거래: {'✅' if paper_app_key and paper_app_secret else '❌'}")
|
||||
logging.info(f" - 계좌번호: {'✅' if any([acct_stock, acct_future, paper_stock, paper_future]) else '❌'}")
|
||||
logging.info(f" - URL 설정: {'✅' if any([url_rest, url_rest_paper, url_ws, url_ws_paper]) else '❌'}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"❌ KIS 설정 파일 생성 실패: {e}")
|
||||
return False
|
||||
1593
한국투자증권(API)/MCP/Kis Trading MCP/module/plugin/master_file.py
Normal file
1593
한국투자증권(API)/MCP/Kis Trading MCP/module/plugin/master_file.py
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user