initial commit

This commit is contained in:
2026-02-04 00:16:34 +09:00
commit ae11528dd9
867 changed files with 209640 additions and 0 deletions

View File

@@ -0,0 +1,77 @@
# Git 관련
.git
.gitignore
# Python 관련
__pycache__
*.pyc
*.pyo
*.pyd
.Python
env
pip-log.txt
pip-delete-this-directory.txt
.tox
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.log
.git
.mypy_cache
.pytest_cache
.hypothesis
# 개발 환경
.venv
venv/
ENV/
env/
.env
.env.*
!.env.live
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS 관련
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# 프로젝트 특정
tmp/
logs/
*.tmp
*.log
# Docker 관련
Dockerfile
.dockerignore
docker-compose*.yml
# 문서
*.md
!README.md
# 테스트
test_*.py
*_test.py
tests/
# 기타
.env.example
env.example
*.csv
*.tmp

View File

@@ -0,0 +1,4 @@
MCP_TYPE=sse
MCP_HOST=0.0.0.0
MCP_PORT=3000
MCP_PATH=/sse

View File

@@ -0,0 +1,23 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
.idea
.vscode
*.log
*_log/
*_logs/
tmp/
*.csv
!standalone_util/*.csv
*.tmp
*.db

View File

@@ -0,0 +1 @@
3.13

View File

@@ -0,0 +1,63 @@
# Python 3.13 slim 이미지 사용
FROM python:3.13-slim
# 작업 디렉토리 설정
WORKDIR /app
# 시스템 패키지 업데이트 및 필요한 패키지 설치
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Python 의존성 설치를 위한 uv 설치
RUN pip install uv
# pyproject.toml 복사 (uv.lock이 없을 수 있으므로)
COPY pyproject.toml ./
# uv.lock이 있으면 복사, 없으면 의존성만 설치
COPY uv.lock* ./
# 의존성 설치 (uv.lock이 있으면 frozen, 없으면 일반 설치)
RUN if [ -f uv.lock ]; then uv sync --frozen; else uv sync; fi
# 애플리케이션 코드 복사
COPY . .
# 환경변수 설정
ENV ENV=live
ENV PYTHONPATH=/app
# 포트 노출 (HTTP 서버용)
EXPOSE 3000
# 환경변수 정의 (런타임에 설정됨)
ENV KIS_APP_KEY=""
ENV KIS_APP_SECRET=""
ENV KIS_PAPER_APP_KEY=""
ENV KIS_PAPER_APP_SECRET=""
ENV KIS_HTS_ID=""
ENV KIS_ACCT_STOCK=""
ENV KIS_ACCT_FUTURE=""
ENV KIS_PAPER_STOCK=""
ENV KIS_PAPER_FUTURE=""
ENV KIS_PROD_TYPE=""
ENV KIS_URL_REST=""
ENV KIS_URL_REST_PAPER=""
ENV KIS_URL_WS=""
ENV KIS_URL_WS_PAPER=""
# 시작 스크립트 생성
RUN echo '#!/bin/bash\n\
set -e\n\
\n\
echo "Starting KIS Trade MCP Server..."\n\
echo "Environment: $ENV"\n\
\n\
# MCP 서버 시작 (HTTP 모드)\n\
exec uv run python server.py\n\
' > /app/start.sh && chmod +x /app/start.sh
# 시작 스크립트 실행
CMD ["/app/start.sh"]

View File

@@ -0,0 +1,391 @@
# 중요 : MCP에 대한 내용을 완전히 숙지하신 뒤 사용해 주십시오.
# 이 프로그램을 실행하여 발생한 모든 책임은 사용자 본인에게 있습니다.
# 한국투자증권 OPEN API MCP 서버 - Docker 설치 가이드
한국투자증권의 다양한 금융 API를 Docker를 통해 Claude Desktop에서 쉽게 사용할 수 있도록 하는 설치 가이드입니다.
## 🚀 주요 기능
### 지원하는 API 카테고리
| 카테고리 | 개수 | 주요 기능 |
|---------|------|----------|
| 국내주식 | 74개 | 현재가, 호가, 차트, 잔고, 주문, 순위분석, 시세분석, 종목정보, 실시간시세 등 |
| 해외주식 | 34개 | 미국/아시아 주식 시세, 잔고, 주문, 체결내역, 거래량순위, 권리종합 등 |
| 국내선물옵션 | 20개 | 선물옵션 시세, 호가, 차트, 잔고, 주문, 야간거래, 실시간체결 등 |
| 해외선물옵션 | 19개 | 해외선물 시세, 주문내역, 증거금, 체결추이, 옵션호가 등 |
| 국내채권 | 14개 | 채권 시세, 호가, 발행정보, 잔고조회, 주문체결내역 등 |
| ETF/ETN | 2개 | NAV 비교추이, 현재가 등 |
| ELW | 1개 | ELW 거래량순위 |
**전체 API 총합계: 166개**
### 핵심 특징
- 🐳 **Docker 컨테이너화**: 완전 격리된 환경에서 안전한 실행
-**동적 코드 실행**: GitHub에서 실시간으로 API 코드를 다운로드하여 실행
- 🔧 **설정 기반**: JSON 파일로 API 설정 및 파라미터 관리
- 🛡️ **안전한 실행**: 격리된 임시 환경에서 코드 실행
- 🔍 **검증 기능**: API 상세 정보 조회로 파라미터 확인
- 🌍 **환경 지원**: 실전/모의 환경 구분 지원
- 🔐 **자동 설정**: 서버 시작 시 KIS 인증 설정 자동 생성
- 🖥️ **크로스 플랫폼**: Windows, macOS, Linux 모두 지원
## 📦 Docker 설치 및 설정
### 📋 Docker 설치
#### 🚀 빠른 설치 (권장)
**공식 Docker Desktop을 사용하세요:**
- [Docker Desktop for Mac](https://www.docker.com/products/docker-desktop/)
- [Docker Desktop for Windows](https://www.docker.com/products/docker-desktop/)
- [Docker Engine for Linux](https://docs.docker.com/engine/install/)
#### 📋 OS별 간단 가이드
##### 🍎 **macOS**
```bash
# Homebrew 사용 (권장)
brew install --cask docker
# 또는 공식 인스톨러 다운로드
# https://www.docker.com/products/docker-desktop/
```
##### 🐧 **Linux (Ubuntu/Debian)**
```bash
# 공식 스크립트 사용 (권장)
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
# 사용자를 docker 그룹에 추가
sudo usermod -aG docker $USER
```
##### 🪟 **Windows**
**⚠️ Windows는 추가 설정이 필요합니다:**
1. **시스템 요구사항 확인**
- Windows 10/11 Pro, Enterprise, Education
- WSL2 또는 Hyper-V 지원
2. **Docker Desktop 설치**
- [공식 사이트](https://www.docker.com/products/docker-desktop/)에서 다운로드
- 설치 중 "Use WSL 2" 옵션 선택 권장
3. **설치 후 확인**
```cmd
docker --version
docker run hello-world
```
**Windows 상세 설치 가이드**: [Docker 공식 문서](https://docs.docker.com/desktop/install/windows-install/) 참조
### 요구사항
- Docker 20.10+
- 한국투자증권 OPEN API 계정
### 📋 설치 및 설정 단계
#### **1단계: 프로젝트 클론**
```bash
# 프로젝트 클론
git clone https://github.com/koreainvestment/open-trading-api.git
cd "open-trading-api/MCP/Kis Trading MCP"
```
#### **2단계: 한국투자증권 API 정보 준비**
한국투자증권 개발자 센터에서 발급받은 정보를 준비하세요:
**필수 정보:**
- App Key (실전용)
- App Secret (실전용)
- 계좌 정보들
**선택 정보:**
- App Key (모의용)
- App Secret (모의용)
#### **3단계: Docker 이미지 빌드**
```bash
# Docker 이미지 빌드
docker build -t kis-trade-mcp .
# 또는 태그와 함께 빌드
docker build -t kis-trade-mcp:latest .
```
#### **4단계: Docker 컨테이너 실행**
**기본 실행:**
```bash
docker run -d \
--name kis-trade-mcp \
-p 3000:3000 \
-e KIS_APP_KEY="your_app_key" \
-e KIS_APP_SECRET="your_app_secret" \
-e KIS_PAPER_APP_KEY="your_paper_app_key" \
-e KIS_PAPER_APP_SECRET="your_paper_app_secret" \
-e KIS_HTS_ID="your_hts_id" \
-e KIS_ACCT_STOCK="12345678" \
-e KIS_ACCT_FUTURE="87654321" \
-e KIS_PAPER_STOCK="11111111" \
-e KIS_PAPER_FUTURE="22222222" \
-e KIS_PROD_TYPE="01" \
kis-trade-mcp
```
#### **5단계: 컨테이너 상태 확인**
```bash
# 컨테이너 상태 확인
docker ps
# 컨테이너 로그 확인
docker logs kis-trade-mcp
# 실시간 로그 확인
docker logs -f kis-trade-mcp
# HTTP 서버 접근 확인
curl http://localhost:3000/sse
```
#### **6단계: HTTP 서버 접근 확인**
컨테이너가 정상적으로 실행되면 HTTP 서버에 접근할 수 있습니다:
```bash
# 서버 상태 확인
curl http://localhost:3000/sse
# 또는 브라우저에서 접근
# http://localhost:3000/sse
```
### 🔗 Claude Desktop 연동 및 설정
#### 📝 Claude Desktop 설정
Claude Desktop 설정 파일에 MCP 서버를 등록하세요.
**설정 파일 위치:**
- **Linux/Mac**: `~/.claude_desktop_config.json`
- **Windows**: `%APPDATA%\Claude\claude_desktop_config.json`
#### 🐧 Linux/Mac 설정
```json
{
"mcpServers": {
"kis-trade-mcp": {
"command": "npx",
"args": ["-y", "mcp-remote", "http://localhost:3000/sse"]
}
}
}
```
#### 🪟 Windows 설정
```json
{
"mcpServers": {
"kis-trade-mcp": {
"command": "npx",
"args": ["-y", "mcp-remote", "http://localhost:3000/sse"]
}
}
}
```
## 💬 사용법 및 질문 예시
### 기본 사용 패턴
1. **종목 검색**: 먼저 종목 코드를 찾습니다
2. **API 확인**: 사용할 API의 파라미터를 확인합니다
3. **API 호출**: 필요한 파라미터와 함께 API를 호출합니다
### 질문 예시
**주식 시세 조회:**
- "삼성전자(005930) 현재가 시세 조회해줘"
- "애플(AAPL) 해외주식 현재 체결가 알려줘"
- "삼성전자 종목코드 찾아줘"
**잔고 및 계좌:**
- "국내주식 잔고 조회해줘"
- "해외주식 잔고 확인해줘"
**채권 및 기타:**
- "국고채 3년물 호가 정보 조회하는 방법"
- "KODEX 200 ETF(069500) NAV 비교추이 확인해줘"
**모의투자:**
- "모의투자로 삼성전자 현재가 조회해줘"
- "데모 환경에서 애플 주식 시세 알려줘"
## 🔧 컨테이너 관리
### 컨테이너 제어
```bash
# 컨테이너 시작
docker start kis-trade-mcp
# 컨테이너 중지
docker stop kis-trade-mcp
# 컨테이너 재시작
docker restart kis-trade-mcp
# 컨테이너 제거
docker stop kis-trade-mcp
docker rm kis-trade-mcp
```
### 컨테이너 내부 접근
```bash
# 컨테이너 내부 bash 실행
docker exec -it kis-trade-mcp /bin/bash
# 환경변수 확인
docker exec kis-trade-mcp env | grep KIS
# 로그 실시간 확인
docker logs -f kis-trade-mcp
```
## 💡 사용 팁
1. **환경변수 관리**: 민감한 정보는 환경변수로 안전하게 관리
2. **로그 모니터링**: `docker logs -f`로 실시간 로그 확인
3. **리소스 모니터링**: `docker stats`로 컨테이너 리소스 사용량 확인
4. **백업 전략**: 중요한 설정 파일은 정기적으로 백업
5. **보안 관리**: 컨테이너 내부에서만 민감한 정보 처리
## 📝 로깅 및 모니터링
### 로그 확인
```bash
# 전체 로그
docker logs kis-trade-mcp
# 최근 100줄
docker logs --tail 100 kis-trade-mcp
# 실시간 로그
docker logs -f kis-trade-mcp
# 특정 시간대 로그
docker logs --since "2024-01-01T00:00:00" kis-trade-mcp
```
### 성능 모니터링
```bash
# 컨테이너 리소스 사용량
docker stats kis-trade-mcp
# 컨테이너 상세 정보
docker inspect kis-trade-mcp
# 프로세스 확인
docker exec kis-trade-mcp ps aux
```
## 🛠️ 문제 해결
### 일반적인 문제들
**1. 컨테이너가 시작되지 않는 경우**
```bash
# 로그 확인
docker logs kis-trade-mcp
# 환경변수 확인
docker exec kis-trade-mcp env | grep KIS
```
**2. 환경변수 누락**
```bash
# 컨테이너 재시작
docker restart kis-trade-mcp
# 환경변수 다시 설정하여 실행
docker run -d --name kis-trade-mcp -e KIS_APP_KEY="..." ...
```
**3. 메모리 부족**
```bash
# 메모리 사용량 확인
docker stats kis-trade-mcp
# 컨테이너 리소스 제한 설정
docker run -d --name kis-trade-mcp --memory="2g" --cpus="2" ...
```
**4. 네트워크 연결 문제**
```bash
# 포트 확인
docker port kis-trade-mcp
# 네트워크 연결 테스트
curl http://localhost:3000/sse
```
### 디버깅 명령어
```bash
# 컨테이너 내부 bash 접근
docker exec -it kis-trade-mcp /bin/bash
# Python 환경 확인
docker exec kis-trade-mcp uv run python -c "import sys; print(sys.path)"
# 의존성 확인
docker exec kis-trade-mcp uv pip list
# 네트워크 연결 확인
docker exec kis-trade-mcp ping github.com
```
## 🔒 보안 고려사항
- **컨테이너 격리**: 호스트 시스템과 완전히 분리된 환경에서 실행
- **환경변수 보안**: 민감한 정보는 환경변수로 전달, 코드에 하드코딩 금지
- **임시 파일 정리**: 각 API 호출 후 임시 파일 자동 삭제
- **네트워크 격리**: 필요한 경우 Docker 네트워크를 통한 추가 격리
## ⚠️ 제한사항 및 성능
### API 호출 제한
- 한국투자증권 API의 호출 제한을 준수해야 합니다
- 분당 호출 횟수 제한이 있을 수 있습니다
- 실전 환경에서는 더욱 신중한 사용이 필요합니다
### Docker 성능 고려사항
- **컨테이너 오버헤드**: Docker 컨테이너 실행으로 인한 약간의 성능 오버헤드
- **메모리 사용량**: SQLAlchemy와 pandas가 메모리를 많이 사용할 수 있음
- **네트워크 지연**: GitHub 다운로드 시 네트워크 지연 발생
### 다단계 타임아웃 설정
- 파일 다운로드: 30초 (GitHub 응답 대기)
- 코드 실행: 15초 (API 호출 및 결과 처리)
- 컨테이너 시작: 60초 (의존성 설치 및 초기화)
## 🔗 관련 링크
- [한국투자증권 개발자 센터](https://apiportal.koreainvestment.com/)
- [한국투자증권 OPEN API GitHub](https://github.com/koreainvestment/open-trading-api)
- [MCP (Model Context Protocol) 공식 문서](https://modelcontextprotocol.io/)
- [Docker 공식 문서](https://docs.docker.com/)
---
**주의**: 이 프로젝트는 한국투자증권 OPEN API를 사용합니다. 사용 전 반드시 [한국투자증권 개발자 센터](https://apiportal.koreainvestment.com/)에서 API 이용약관을 확인하시기 바랍니다.
## ⚠️ 투자 책임 고지
**본 MCP 서버는 한국투자증권 OPEN API를 활용한 도구일 뿐이며, 투자 조언이나 권유를 제공하지 않습니다.**
- 📈 **투자 결정 책임**: 모든 투자 결정과 그에 따른 손익은 전적으로 투자자 본인의 책임입니다
- 💰 **손실 위험**: 주식, 선물, 옵션 등 모든 금융상품 투자에는 원금 손실 위험이 있습니다
- 🔍 **정보 검증**: API를 통해 제공되는 정보의 정확성은 한국투자증권에 의존하며, 투자 전 반드시 정보를 검증하시기 바랍니다
- 🧠 **신중한 판단**: 충분한 조사와 신중한 판단 없이 투자하지 마시기 바랍니다
- 🎯 **모의투자 권장**: 실전 투자 전 반드시 모의투자를 통해 충분히 연습하시기 바랍니다
**투자는 본인의 판단과 책임 하에 이루어져야 하며, 본 도구 사용으로 인한 어떠한 손실에 대해서도 개발자는 책임지지 않습니다.**

View File

@@ -0,0 +1,105 @@
{
"tool_info": {
"introduce": "한국투자증권의 auth OPEN API를 활용합니다.",
"introduce_append": "",
"examples": [
{
"api_type": "auth_token",
"params": {
"grant_type": "client_credentials",
"env_dv": "real"
}
},
{
"api_type": "auth_ws_token",
"params": {
"grant_type": "client_credentials",
"env_dv": "real"
}
}
]
},
"apis": {
"auth_token": {
"category": "OAuth인증",
"name": "접근토큰발급(P)",
"github_url": "https://github.com/koreainvestment/open-trading-api/tree/main/examples_llm/auth/auth_token",
"method": "auth_token",
"api_path": "/oauth2/tokenP",
"params": {
"grant_type": {
"name": "grant_type",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 권한부여 Type (client_credentials)"
},
"appkey": {
"name": "appkey",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 앱키 (한국투자증권 홈페이지에서 발급받은 appkey)"
},
"appsecret": {
"name": "appsecret",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 앱시크릿키 (한국투자증권 홈페이지에서 발급받은 appsecret)"
},
"env_dv": {
"name": "env_dv",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 환경구분 (real: 실전, demo: 모의)"
}
}
},
"auth_ws_token": {
"category": "OAuth인증",
"name": "실시간 (웹소켓) 접속키 발급",
"github_url": "https://github.com/koreainvestment/open-trading-api/tree/main/examples_llm/auth/auth_ws_token",
"method": "auth_ws_token",
"api_path": "/oauth2/Approval",
"params": {
"grant_type": {
"name": "grant_type",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 권한부여 Type (client_credentials)"
},
"appkey": {
"name": "appkey",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 고객 앱Key (한국투자증권 홈페이지에서 발급받은 appkey)"
},
"appsecret": {
"name": "appsecret",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 고객 앱Secret (한국투자증권 홈페이지에서 발급받은 appsecret)"
},
"env_dv": {
"name": "env_dv",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 환경구분 (real: 실전, demo: 모의)"
},
"token": {
"name": "token",
"type": "str",
"required": false,
"default_value": "",
"description": "접근토큰 (OAuth 토큰이 필요한 API 경우 발급한 Access token)"
}
}
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,172 @@
{
"tool_info": {
"introduce": "한국투자증권의 ELW OPEN API를 활용합니다.",
"introduce_append": "이 도구는 ELW 관련 시세 정보를 제공합니다.",
"examples": [
{
"api_type": "volume_rank",
"params": {
"fid_cond_mrkt_div_code": "W",
"fid_cond_scr_div_code": "20278",
"fid_unas_input_iscd": "000000",
"fid_input_iscd": "00000",
"fid_input_rmnn_dynu_1": "",
"fid_div_cls_code": "0",
"fid_input_price_1": "1000",
"fid_input_price_2": "5000",
"fid_input_vol_1": "100",
"fid_input_vol_2": "1000",
"fid_input_date_1": "20230101",
"fid_rank_sort_cls_code": "0",
"fid_blng_cls_code": "0",
"fid_input_iscd_2": "0000",
"fid_input_date_2": ""
}
}
]
},
"apis": {
"volume_rank": {
"category": "[국내주식] ELW시세",
"name": "ELW 거래량순위",
"github_url": "https://github.com/koreainvestment/open-trading-api/tree/main/examples_llm/elw/volume_rank",
"method": "volume_rank",
"api_path": "/uapi/elw/v1/ranking/volume-rank",
"params": {
"fid_cond_mrkt_div_code": {
"name": "fid_cond_mrkt_div_code",
"type": "str",
"required": true,
"default_value": null,
"description": "조건시장분류코드"
},
"fid_cond_scr_div_code": {
"name": "fid_cond_scr_div_code",
"type": "str",
"required": true,
"default_value": null,
"description": "조건화면분류코드"
},
"fid_unas_input_iscd": {
"name": "fid_unas_input_iscd",
"type": "str",
"required": true,
"default_value": null,
"description": "기초자산입력종목코드"
},
"fid_input_iscd": {
"name": "fid_input_iscd",
"type": "str",
"required": true,
"default_value": null,
"description": "발행사"
},
"fid_input_rmnn_dynu_1": {
"name": "fid_input_rmnn_dynu_1",
"type": "str",
"required": true,
"default_value": null,
"description": "입력잔존일수"
},
"fid_div_cls_code": {
"name": "fid_div_cls_code",
"type": "str",
"required": true,
"default_value": null,
"description": "콜풋구분코드"
},
"fid_input_price_1": {
"name": "fid_input_price_1",
"type": "str",
"required": true,
"default_value": null,
"description": "가격(이상)"
},
"fid_input_price_2": {
"name": "fid_input_price_2",
"type": "str",
"required": true,
"default_value": null,
"description": "가격(이하)"
},
"fid_input_vol_1": {
"name": "fid_input_vol_1",
"type": "str",
"required": true,
"default_value": null,
"description": "거래량(이상)"
},
"fid_input_vol_2": {
"name": "fid_input_vol_2",
"type": "str",
"required": true,
"default_value": null,
"description": "거래량(이하)"
},
"fid_input_date_1": {
"name": "fid_input_date_1",
"type": "str",
"required": true,
"default_value": null,
"description": "조회기준일"
},
"fid_rank_sort_cls_code": {
"name": "fid_rank_sort_cls_code",
"type": "str",
"required": true,
"default_value": null,
"description": "순위정렬구분코드"
},
"fid_blng_cls_code": {
"name": "fid_blng_cls_code",
"type": "str",
"required": true,
"default_value": null,
"description": "소속구분코드"
},
"fid_input_iscd_2": {
"name": "fid_input_iscd_2",
"type": "str",
"required": true,
"default_value": null,
"description": "LP발행사"
},
"fid_input_date_2": {
"name": "fid_input_date_2",
"type": "str",
"required": true,
"default_value": null,
"description": "만기일-최종거래일조회"
},
"tr_cont": {
"name": "tr_cont",
"type": "str",
"required": true,
"default_value": "",
"description": "연속 거래 여부"
},
"dataframe": {
"name": "dataframe",
"type": "pd.DataFrame",
"required": false,
"default_value": null,
"description": "누적 데이터프레임"
},
"depth": {
"name": "depth",
"type": "int",
"required": true,
"default_value": 0,
"description": "현재 재귀 깊이"
},
"max_depth": {
"name": "max_depth",
"type": "int",
"required": true,
"default_value": 10,
"description": "최대 재귀 깊이 (기본값: 10)"
}
}
}
}
}

View File

@@ -0,0 +1,70 @@
{
"tool_info": {
"introduce": "한국투자증권의 ETF/ETN OPEN API를 활용합니다.",
"introduce_append": "",
"examples": [
{
"api_type": "inquire_price",
"params": {
"fid_cond_mrkt_div_code": "J",
"fid_input_iscd": "123456"
}
},
{
"api_type": "nav_comparison_trend",
"params": {
"fid_cond_mrkt_div_code": "J",
"fid_input_iscd": "069500"
}
}
]
},
"apis": {
"inquire_price": {
"category": "[국내주식] 기본시세",
"name": "ETF/ETN 현재가",
"github_url": "https://github.com/koreainvestment/open-trading-api/tree/main/examples_llm/etfetn/inquire_price",
"method": "inquire_price",
"api_path": "/uapi/etfetn/v1/quotations/inquire-price",
"params": {
"fid_cond_mrkt_div_code": {
"name": "fid_cond_mrkt_div_code",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 조건 시장 분류 코드 (ex. J:KRX, NX:NXT, UN:통합)"
},
"fid_input_iscd": {
"name": "fid_input_iscd",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 입력 종목코드 (ex. 123456)"
}
}
},
"nav_comparison_trend": {
"category": "[국내주식] 기본시세",
"name": "NAV 비교추이(종목)",
"github_url": "https://github.com/koreainvestment/open-trading-api/tree/main/examples_llm/etfetn/nav_comparison_trend",
"method": "nav_comparison_trend",
"api_path": "/uapi/etfetn/v1/quotations/nav-comparison-trend",
"params": {
"fid_cond_mrkt_div_code": {
"name": "fid_cond_mrkt_div_code",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 조건 시장 분류 코드 (ex. J)"
},
"fid_input_iscd": {
"name": "fid_input_iscd",
"type": "str",
"required": true,
"default_value": null,
"description": "[필수] 입력 종목코드"
}
}
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,29 @@
from .base import Base
from .updated import Updated
# 툴별 마스터 모델들
from .domestic_stock import DomesticStockMaster
from .overseas_stock import OverseasStockMaster
from .domestic_futureoption import DomesticFutureoptionMaster
from .overseas_futureoption import OverseasFutureoptionMaster
from .domestic_bond import DomesticBondMaster
from .etfetn import EtfetnMaster
from .elw import ElwMaster
from .auth import AuthMaster
# 모든 모델들을 리스트로 제공
ALL_MODELS = [
# 툴별 마스터 모델들
DomesticStockMaster,
OverseasStockMaster,
DomesticFutureoptionMaster,
OverseasFutureoptionMaster,
DomesticBondMaster,
EtfetnMaster,
ElwMaster,
AuthMaster,
# 업데이트 상태 추적
Updated
]

View File

@@ -0,0 +1,11 @@
from sqlalchemy import Column, Integer, String
from .base import Base
class AuthMaster(Base):
"""인증 마스터"""
__tablename__ = 'auth_master'
id = Column(Integer, primary_key=True)
name = Column(String(50), index=True) # 종목명
code = Column(String(50), index=True) # 종목코드

View File

@@ -0,0 +1,5 @@
from sqlalchemy.orm import declarative_base
# SQLAlchemy Base 클래스
Base = declarative_base()

View File

@@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String
from .base import Base
class DomesticBondMaster(Base):
"""국내채권 마스터"""
__tablename__ = 'domestic_bond_master'
id = Column(Integer, primary_key=True)
name = Column(String(50), index=True) # 종목명
code = Column(String(50), index=True) # 종목코드
ex = Column(String(30), index=True) # 거래소 코드

View File

@@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String
from .base import Base
class DomesticFutureoptionMaster(Base):
"""국내선물옵션 마스터"""
__tablename__ = 'domestic_futureoption_master'
id = Column(Integer, primary_key=True)
name = Column(String(50), index=True) # 종목명
code = Column(String(50), index=True) # 종목코드
ex = Column(String(30), index=True) # 거래소 코드

View File

@@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String
from .base import Base
class DomesticStockMaster(Base):
"""국내주식 마스터"""
__tablename__ = 'domestic_stock_master'
id = Column(Integer, primary_key=True)
name = Column(String(50), index=True) # 종목명
code = Column(String(50), index=True) # 종목코드
ex = Column(String(30), index=True) # 거래소 코드

View File

@@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String
from .base import Base
class ElwMaster(Base):
"""ELW 마스터"""
__tablename__ = 'elw_master'
id = Column(Integer, primary_key=True)
name = Column(String(50), index=True) # 종목명
code = Column(String(50), index=True) # 종목코드
ex = Column(String(30), index=True) # 거래소 코드

View File

@@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String
from .base import Base
class EtfetnMaster(Base):
"""ETF/ETN 마스터"""
__tablename__ = 'etfetn_master'
id = Column(Integer, primary_key=True)
name = Column(String(50), index=True) # 종목명
code = Column(String(50), index=True) # 종목코드
ex = Column(String(30), index=True) # 거래소 코드

View File

@@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String
from .base import Base
class OverseasFutureoptionMaster(Base):
"""해외선물옵션 마스터"""
__tablename__ = 'overseas_futureoption_master'
id = Column(Integer, primary_key=True)
name = Column(String(50), index=True) # 종목명
code = Column(String(50), index=True) # 종목코드
ex = Column(String(30), index=True) # 거래소 코드

View File

@@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String
from .base import Base
class OverseasStockMaster(Base):
"""해외주식 마스터"""
__tablename__ = 'overseas_stock_master'
id = Column(Integer, primary_key=True)
name = Column(String(50), index=True) # 종목명
code = Column(String(50), index=True) # 종목코드
ex = Column(String(30), index=True) # 거래소 코드

View File

@@ -0,0 +1,15 @@
from sqlalchemy import Column, Integer, String, DateTime
from .base import Base
class Updated(Base):
"""마스터파일 업데이트 상태 추적 테이블"""
__tablename__ = 'updated'
id = Column(Integer, primary_key=True)
tool_name = Column(String(50), nullable=False, unique=True, index=True) # 툴명 (예: domestic_stock, overseas_stock)
updated_at = Column(DateTime, nullable=False) # 마지막 업데이트 시간
def __repr__(self):
return f"<Updated(tool_name='{self.tool_name}', updated_at='{self.updated_at}')>"

View File

@@ -0,0 +1,3 @@
from .decorator import singleton
from .plugin import setup_environment, EnvironmentConfig, setup_kis_config, MasterFileManager
from .middleware import EnvironmentMiddleware

View File

@@ -0,0 +1,32 @@
import threading
def singleton(cls):
"""
클래스 형태를 유지하는 싱글톤 데코레이터.
- 여러 번 호출해도 동일 인스턴스 반환
- __init__은 최초 1회만 실행
- 스레드-세이프
"""
cls.__singleton_lock__ = getattr(cls, "__singleton_lock__", threading.Lock())
cls.__singleton_instance__ = getattr(cls, "__singleton_instance__", None)
orig_init = cls.__init__
def __init__(self, *args, **kwargs):
# 최초 1회만 실제 __init__ 수행
if getattr(self, "__initialized__", False):
return
orig_init(self, *args, **kwargs)
setattr(self, "__initialized__", True)
def __new__(inner_cls, *args, **kwargs):
if inner_cls.__singleton_instance__ is None:
with inner_cls.__singleton_lock__:
if inner_cls.__singleton_instance__ is None:
inner_cls.__singleton_instance__ = object.__new__(inner_cls)
return inner_cls.__singleton_instance__
cls.__init__ = __init__
cls.__new__ = staticmethod(__new__)
return cls

View File

@@ -0,0 +1,6 @@
# 기존 컨텍스트 상수들
CONTEXT_REQUEST_ID = "context_request_id"
CONTEXT_ENVIRONMENT = "context_environment"
CONTEXT_STARTED_AT = "context_started_at"
CONTEXT_ENDED_AT = "context_ended_at"
CONTEXT_ELAPSED_SECONDS = "context_elapsed_seconds"

View File

@@ -0,0 +1,46 @@
import uuid
from datetime import datetime
import time
from fastmcp.server.middleware import Middleware, MiddlewareContext
import module.factory as factory
# 기본 미들웨어
class EnvironmentMiddleware(Middleware):
def __init__(self, environment):
self.environment = environment
async def on_call_tool(self, context: MiddlewareContext, call_next):
ctx = context.fastmcp_context
# time counter start
t0 = time.perf_counter()
# started_at
started_dt = datetime.now()
ctx.set_state(factory.CONTEXT_STARTED_AT, started_dt.strftime("%Y-%m-%d %H:%M:%S"))
# request id
request_id = uuid.uuid4().hex
ctx.set_state(factory.CONTEXT_REQUEST_ID, request_id)
# context setup
ctx.set_state(factory.CONTEXT_ENVIRONMENT, self.environment)
try:
result = await call_next(context)
return result
except Exception as e:
raise e
finally:
# ended at
ended_at = datetime.now()
ctx.set_state(factory.CONTEXT_ENDED_AT, ended_at.strftime("%Y-%m-%d %H:%M:%S"))
# time counter end
elapsed_sec = time.perf_counter() - t0
ctx.set_state(factory.CONTEXT_ELAPSED_SECONDS, round(elapsed_sec, 2))

View File

@@ -0,0 +1,4 @@
from .kis import setup_kis_config
from .environment import setup_environment, EnvironmentConfig
from .master_file import MasterFileManager
from .database import DatabaseEngine, Database

View File

@@ -0,0 +1,540 @@
from typing import Any, Dict, List, Optional, Type, Union
from sqlalchemy import create_engine, Engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import SQLAlchemyError
import logging
import os
from datetime import datetime
logger = logging.getLogger(__name__)
class DatabaseEngine:
"""1 SQLite 파일 : 1 엔진을 관리하는 클래스"""
def __init__(self, db_path: str, models: List[Type]):
"""
Args:
db_path: SQLite 파일 경로
models: 해당 데이터베이스에 포함될 모델 클래스들의 리스트
"""
self.db_path = db_path
self.models = models
self.engine: Optional[Engine] = None
self.SessionLocal: Optional[sessionmaker] = None
self._initialize_engine()
def _initialize_engine(self):
"""데이터베이스 엔진 초기화"""
try:
# SQLite 연결 문자열 생성
db_url = f"sqlite:///{self.db_path}"
# 엔진 생성
self.engine = create_engine(
db_url,
echo=False, # SQL 로그 출력 여부
pool_pre_ping=True, # 연결 상태 확인
connect_args={"check_same_thread": False} # SQLite 멀티스레드 지원
)
# 세션 팩토리 생성
self.SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=self.engine
)
# 테이블 생성
self._create_tables()
logger.info(f"Database engine initialized: {self.db_path}")
except Exception as e:
logger.error(f"Failed to initialize database engine {self.db_path}: {e}")
raise
def _create_tables(self):
"""모든 모델의 테이블 생성"""
try:
from model.base import Base
Base.metadata.create_all(bind=self.engine)
logger.info(f"Tables created for {self.db_path}")
except Exception as e:
logger.error(f"Failed to create tables for {self.db_path}: {e}")
raise
def get_session(self) -> Session:
"""새로운 데이터베이스 세션 반환"""
if not self.SessionLocal:
raise RuntimeError("Database engine not initialized")
return self.SessionLocal()
def insert(self, model_instance: Any) -> Any:
"""
모델 인스턴스를 데이터베이스에 삽입
Args:
model_instance: 삽입할 모델 인스턴스
Returns:
삽입된 모델 인스턴스 (ID 포함)
"""
session = self.get_session()
try:
session.add(model_instance)
session.commit()
session.refresh(model_instance)
logger.info(f"Inserted record: {type(model_instance).__name__}")
return model_instance
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to insert record: {e}")
raise
finally:
session.close()
def update(self, model_class: Type, record_id: int, update_data: Dict[str, Any]) -> Optional[Any]:
"""
ID로 레코드 업데이트
Args:
model_class: 업데이트할 모델 클래스
record_id: 업데이트할 레코드의 ID
update_data: 업데이트할 필드와 값의 딕셔너리
Returns:
업데이트된 모델 인스턴스 또는 None
"""
session = self.get_session()
try:
# 레코드 조회
record = session.query(model_class).filter(model_class.id == record_id).first()
if not record:
logger.warning(f"Record not found: {model_class.__name__} ID {record_id}")
return None
# 필드 업데이트
for field, value in update_data.items():
if hasattr(record, field):
setattr(record, field, value)
else:
logger.warning(f"Field '{field}' not found in {model_class.__name__}")
session.commit()
session.refresh(record)
logger.info(f"Updated record: {model_class.__name__} ID {record_id}")
return record
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to update record: {e}")
raise
finally:
session.close()
def delete(self, model_class: Type, record_id: int) -> bool:
"""
ID로 레코드 삭제
Args:
model_class: 삭제할 모델 클래스
record_id: 삭제할 레코드의 ID
Returns:
삭제 성공 여부
"""
session = self.get_session()
try:
# 레코드 조회
record = session.query(model_class).filter(model_class.id == record_id).first()
if not record:
logger.warning(f"Record not found: {model_class.__name__} ID {record_id}")
return False
session.delete(record)
session.commit()
logger.info(f"Deleted record: {model_class.__name__} ID {record_id}")
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to delete record: {e}")
raise
finally:
session.close()
def list(self, model_class: Type, filters: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None, offset: Optional[int] = None) -> List[Any]:
"""
조건에 맞는 레코드 목록 조회
Args:
model_class: 조회할 모델 클래스
filters: 필터 조건 딕셔너리 {field: value}
limit: 조회할 최대 개수
offset: 건너뛸 개수
Returns:
조회된 레코드 리스트
"""
session = self.get_session()
try:
query = session.query(model_class)
# 필터 적용
if filters:
for field, value in filters.items():
if hasattr(model_class, field):
query = query.filter(getattr(model_class, field) == value)
else:
logger.warning(f"Field '{field}' not found in {model_class.__name__}")
# 페이징 적용
if offset:
query = query.offset(offset)
if limit:
query = query.limit(limit)
results = query.all()
logger.info(f"Listed {len(results)} records: {model_class.__name__}")
return results
except SQLAlchemyError as e:
logger.error(f"Failed to list records: {e}")
raise
finally:
session.close()
def get(self, model_class: Type, filters: Dict[str, Any]) -> Optional[Any]:
"""
조건에 맞는 첫 번째 레코드 조회
Args:
model_class: 조회할 모델 클래스
filters: 필터 조건 딕셔너리 {field: value}
Returns:
조회된 레코드 또는 None
"""
session = self.get_session()
try:
query = session.query(model_class)
# 필터 적용
for field, value in filters.items():
if hasattr(model_class, field):
query = query.filter(getattr(model_class, field) == value)
else:
logger.warning(f"Field '{field}' not found in {model_class.__name__}")
result = query.first()
if result:
logger.info(f"Found record: {model_class.__name__}")
else:
logger.info(f"No record found: {model_class.__name__}")
return result
except SQLAlchemyError as e:
logger.error(f"Failed to get record: {e}")
raise
finally:
session.close()
def count(self, model_class: Type, filters: Optional[Dict[str, Any]] = None) -> int:
"""
조건에 맞는 레코드 개수 조회
Args:
model_class: 조회할 모델 클래스
filters: 필터 조건 딕셔너리 {field: value}
Returns:
레코드 개수
"""
session = self.get_session()
try:
query = session.query(model_class)
# 필터 적용
if filters:
for field, value in filters.items():
if hasattr(model_class, field):
query = query.filter(getattr(model_class, field) == value)
else:
logger.warning(f"Field '{field}' not found in {model_class.__name__}")
count = query.count()
logger.info(f"Counted {count} records: {model_class.__name__}")
return count
except SQLAlchemyError as e:
logger.error(f"Failed to count records: {e}")
raise
finally:
session.close()
def bulk_replace_master_data(self, model_class: Type, data_list: List[Dict], master_name: str) -> int:
"""
마스터 데이터 추가 (INSERT만) - 카테고리 레벨에서 이미 삭제됨
Args:
model_class: 마스터 데이터 모델 클래스
data_list: 삽입할 데이터 리스트 (딕셔너리 리스트)
master_name: 마스터파일명 (로깅용)
Returns:
삽입된 레코드 수
"""
session = self.get_session()
try:
# 새 데이터 배치 삽입 (카테고리 레벨에서 이미 삭제되었으므로 INSERT만)
if data_list:
# 배치 크기 설정 (메모리 효율성을 위해 1000개씩)
batch_size = 1000
total_inserted = 0
for i in range(0, len(data_list), batch_size):
batch = data_list[i:i + batch_size]
batch_objects = []
for data in batch:
# 모든 값을 문자열로 강제 변환 (SQLAlchemy 타입 추론 방지)
str_data = {key: str(value) if value is not None else None for key, value in data.items()}
# 딕셔너리를 모델 인스턴스로 변환
obj = model_class(**str_data)
batch_objects.append(obj)
# 배치 삽입
session.bulk_save_objects(batch_objects)
total_inserted += len(batch_objects)
# 중간 커밋 (메모리 절약)
if i + batch_size < len(data_list):
session.commit()
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_objects)} records")
# 최종 커밋
session.commit()
logger.info(f"Bulk replace completed: {total_inserted} records inserted into {model_class.__name__}")
return total_inserted
else:
logger.warning(f"No data to insert for {master_name}")
return 0
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to bulk replace master data for {master_name}: {e}")
raise
finally:
session.close()
def update_master_timestamp(self, tool_name: str, record_count: int = None) -> bool:
"""
마스터파일 업데이트 시간 기록
Args:
tool_name: 툴명 (예: domestic_stock, overseas_stock)
record_count: 레코드 수 (선택사항)
Returns:
업데이트 성공 여부
"""
from model.updated import Updated
session = self.get_session()
try:
# 기존 레코드 조회
existing_record = session.query(Updated).filter(Updated.tool_name == tool_name).first()
if existing_record:
# 기존 레코드 업데이트
existing_record.updated_at = datetime.now()
logger.info(f"Updated timestamp for {tool_name}")
else:
# 새 레코드 생성
new_record = Updated(
tool_name=tool_name,
updated_at=datetime.now()
)
session.add(new_record)
logger.info(f"Created new timestamp record for {tool_name}")
session.commit()
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to update master timestamp for {tool_name}: {e}")
return False
finally:
session.close()
def get_master_update_time(self, tool_name: str) -> Optional[datetime]:
"""
마스터파일 마지막 업데이트 시간 조회
Args:
tool_name: 툴명 (예: domestic_stock, overseas_stock)
Returns:
마지막 업데이트 시간 또는 None
"""
from model.updated import Updated
session = self.get_session()
try:
record = session.query(Updated).filter(Updated.tool_name == tool_name).first()
if record:
logger.info(f"Found update time for {tool_name}: {record.updated_at}")
return record.updated_at
else:
logger.info(f"No update record found for {tool_name}")
return None
except SQLAlchemyError as e:
logger.error(f"Failed to get master update time for {tool_name}: {e}")
return None
finally:
session.close()
def is_master_data_available(self, model_class: Type) -> bool:
"""
마스터 데이터 존재 여부 확인
Args:
model_class: 마스터 데이터 모델 클래스
Returns:
데이터 존재 여부
"""
session = self.get_session()
try:
count = session.query(model_class).count()
available = count > 0
logger.info(f"Master data availability check for {model_class.__name__}: {available} ({count} records)")
return available
except SQLAlchemyError as e:
logger.error(f"Failed to check master data availability for {model_class.__name__}: {e}")
return False
finally:
session.close()
def close(self):
"""데이터베이스 연결 종료"""
if self.engine:
self.engine.dispose()
logger.info(f"Database engine closed: {self.db_path}")
def __repr__(self):
return f"DatabaseEngine(db_path='{self.db_path}', models={len(self.models)})"
class Database:
"""데이터베이스 엔진들을 관리하는 Singleton 클래스"""
_instance: Optional['Database'] = None
_initialized: bool = False
def __new__(cls) -> 'Database':
"""Singleton 패턴 구현"""
if cls._instance is None:
cls._instance = super(Database, cls).__new__(cls)
return cls._instance
def __init__(self):
"""초기화 (한 번만 실행)"""
if not self._initialized:
self.dbs: Dict[str, DatabaseEngine] = {}
self._initialized = True
logger.info("Database singleton instance created")
def new(self, db_dir: str = "configs/master") -> None:
"""
마스터 데이터베이스 엔진 초기화
Args:
db_dir: 데이터베이스 파일이 저장될 디렉토리
"""
try:
# 데이터베이스 디렉토리 생성
os.makedirs(db_dir, exist_ok=True)
# 하나의 통합 마스터 데이터베이스 엔진 생성
self._create_master_engine(db_dir)
logger.info(f"Master database engine initialized: '{db_dir}/master.db'")
logger.info(f"Available databases: {list(self.dbs.keys())}")
except Exception as e:
logger.error(f"Failed to initialize master database: {e}")
raise
def _create_master_engine(self, db_dir: str):
"""통합 마스터 데이터베이스 엔진 생성"""
from model import ALL_MODELS
db_path = os.path.join(db_dir, "master.db")
self.dbs["master"] = DatabaseEngine(db_path, ALL_MODELS)
logger.info("Created master database engine with all models")
def get_by_name(self, name: str) -> DatabaseEngine:
"""
이름으로 데이터베이스 엔진 조회
Args:
name: 데이터베이스 이름
Returns:
DatabaseEngine 인스턴스
Raises:
KeyError: 해당 이름의 데이터베이스가 없는 경우
"""
if name not in self.dbs:
available_dbs = list(self.dbs.keys())
raise KeyError(f"Database '{name}' not found. Available databases: {available_dbs}")
return self.dbs[name]
def get_available_databases(self) -> List[str]:
"""사용 가능한 데이터베이스 이름 목록 반환"""
return list(self.dbs.keys())
def is_initialized(self) -> bool:
"""데이터베이스가 초기화되었는지 확인"""
return len(self.dbs) > 0
def ensure_initialized(self, db_dir: str = "configs/master") -> bool:
"""데이터베이스가 초기화되지 않은 경우에만 초기화"""
if not self.is_initialized():
try:
self.new(db_dir)
logger.info("Database initialized on demand")
return True
except Exception as e:
logger.error(f"Failed to initialize database on demand: {e}")
return False
return True
def close_all(self):
"""모든 데이터베이스 연결 종료"""
for name, engine in self.dbs.items():
try:
engine.close()
logger.info(f"Closed database: {name}")
except Exception as e:
logger.error(f"Failed to close database {name}: {e}")
self.dbs.clear()
logger.info("All database connections closed")
def __repr__(self):
return f"Database(engines={len(self.dbs)}, names={list(self.dbs.keys())})"
def __del__(self):
"""소멸자 - 모든 연결 정리"""
if hasattr(self, 'dbs') and self.dbs:
self.close_all()

View File

@@ -0,0 +1,43 @@
import logging
import os
from collections import namedtuple
from dotenv import load_dotenv
# Environment 설정을 위한 namedtuple 정의
EnvironmentConfig = namedtuple('EnvironmentConfig', [
'mcp_type', 'mcp_host', 'mcp_port', 'mcp_path'
])
def setup_environment(env: str) -> EnvironmentConfig:
# get api env
if not env:
logging.error("Environment variable ENV not defined")
exit(1)
# load .env
dotenv_path = os.path.join(os.getcwd(), f".env.{env}")
if not os.path.isfile(dotenv_path):
logging.error(f"Environment variable file .env.{env} not found")
exit(1)
load_dotenv(dotenv_path=dotenv_path)
# return environment
# MCP_TYPE 검증 및 기본값 설정
mcp_type = os.getenv("MCP_TYPE", "stdio")
if mcp_type not in ['stdio', 'sse', 'streamable-http']:
logging.warning(f"Invalid MCP_TYPE: {mcp_type}, using default: stdio")
mcp_type = "stdio"
# MCP_PORT가 빈 문자열이면 기본값 사용
mcp_port_str = os.getenv("MCP_PORT", "8000")
mcp_port = int(mcp_port_str) if mcp_port_str.strip() else 8000
return EnvironmentConfig(
mcp_type=mcp_type,
mcp_host=os.getenv("MCP_HOST", "localhost"),
mcp_port=mcp_port,
mcp_path=os.getenv("MCP_PATH", "/mcp")
)

View File

@@ -0,0 +1,163 @@
import logging
import os
import requests
import yaml
def setup_kis_config(force_update=False):
"""KIS 설정 파일 자동 생성 (템플릿 다운로드 + 환경변수로 값 덮어쓰기)
Args:
force_update (bool): True면 기존 파일이 있어도 강제로 덮어쓰기
"""
# kis_auth.py와 동일한 경로 생성 방식 사용
kis_config_dir = os.path.join(os.path.expanduser("~"), "KIS", "config")
# KIS 설정 디렉토리 생성
os.makedirs(kis_config_dir, exist_ok=True)
# 설정 파일 경로
kis_config_path = os.path.join(kis_config_dir, "kis_devlp.yaml")
# 기존 파일 존재 확인
if os.path.exists(kis_config_path) and not force_update:
logging.info(f"✅ KIS 설정 파일이 이미 존재합니다: {kis_config_path}")
logging.info("기존 파일을 사용합니다. 강제 업데이트가 필요한 경우 force_update=True 옵션을 사용하세요.")
return True
# 1. kis_devlp.yaml 템플릿 다운로드
template_url = "https://raw.githubusercontent.com/koreainvestment/open-trading-api/refs/heads/main/kis_devlp.yaml"
try:
logging.info("KIS 설정 템플릿을 다운로드 중...")
response = requests.get(template_url, timeout=30)
response.raise_for_status()
# 원본 템플릿 텍스트 보존
template_content = response.text
logging.info("✅ KIS 설정 템플릿 다운로드 완료")
except Exception as e:
logging.error(f"❌ KIS 설정 템플릿 다운로드 실패: {e}")
return False
# 2. 환경변수로 민감한 정보 덮어쓰기
# 필수값 (누락 시 경고)
app_key = os.getenv("KIS_APP_KEY")
app_secret = os.getenv("KIS_APP_SECRET")
if not app_key or not app_secret:
logging.warning("⚠️ 필수 환경변수가 설정되지 않았습니다:")
if not app_key:
logging.warning(" - KIS_APP_KEY")
if not app_secret:
logging.warning(" - KIS_APP_SECRET")
logging.warning("실제 거래 API 사용이 불가능할 수 있습니다.")
# 선택적 값들 (누락 시 빈값 또는 기본값)
paper_app_key = os.getenv("KIS_PAPER_APP_KEY", "")
paper_app_secret = os.getenv("KIS_PAPER_APP_SECRET", "")
hts_id = os.getenv("KIS_HTS_ID", "")
acct_stock = os.getenv("KIS_ACCT_STOCK", "")
acct_future = os.getenv("KIS_ACCT_FUTURE", "")
paper_stock = os.getenv("KIS_PAPER_STOCK", "")
paper_future = os.getenv("KIS_PAPER_FUTURE", "")
prod_type = os.getenv("KIS_PROD_TYPE", "01") # 기본값: 종합계좌
url_rest = os.getenv("KIS_URL_REST", "")
url_rest_paper = os.getenv("KIS_URL_REST_PAPER", "")
url_ws = os.getenv("KIS_URL_WS", "")
url_ws_paper = os.getenv("KIS_URL_WS_PAPER", "")
# 3. YAML 파싱하여 값 업데이트
try:
# YAML 파싱 (주석 보존을 위해 ruamel.yaml 사용하거나, 간단히 pyyaml 사용)
config = yaml.safe_load(template_content)
# 환경변수 값이 있으면 해당 필드만 업데이트
if app_key:
config['my_app'] = app_key
logging.info(f"✅ 실전 App Key 설정 완료")
if app_secret:
config['my_sec'] = app_secret
logging.info(f"✅ 실전 App Secret 설정 완료")
if paper_app_key:
config['paper_app'] = paper_app_key
logging.info(f"✅ 모의 App Key 설정 완료")
if paper_app_secret:
config['paper_sec'] = paper_app_secret
logging.info(f"✅ 모의 App Secret 설정 완료")
if hts_id:
config['my_htsid'] = hts_id
logging.info(f"✅ HTS ID 설정 완료: {hts_id}")
else:
logging.warning("⚠️ KIS_HTS_ID 환경변수가 설정되지 않았습니다.")
if acct_stock:
config['my_acct_stock'] = acct_stock
logging.info(f"✅ 증권계좌 설정 완료")
if acct_future:
config['my_acct_future'] = acct_future
logging.info(f"✅ 선물옵션계좌 설정 완료")
if paper_stock:
config['my_paper_stock'] = paper_stock
logging.info(f"✅ 모의 증권계좌 설정 완료")
if paper_future:
config['my_paper_future'] = paper_future
logging.info(f"✅ 모의 선물옵션계좌 설정 완료")
if prod_type != "01": # 기본값이 아닌 경우만 업데이트
config['my_prod'] = prod_type
logging.info(f"✅ 계좌상품코드 설정 완료: {prod_type}")
# URL 설정 업데이트 (직접 필드)
if url_rest:
config['prod'] = url_rest
logging.info(f"✅ 실전 REST URL 설정 완료")
if url_rest_paper:
config['vps'] = url_rest_paper
logging.info(f"✅ 모의 REST URL 설정 완료")
if url_ws:
config['ops'] = url_ws
logging.info(f"✅ 실전 WebSocket URL 설정 완료")
if url_ws_paper:
config['vops'] = url_ws_paper
logging.info(f"✅ 모의 WebSocket URL 설정 완료")
# YAML로 다시 변환
updated_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
except yaml.YAMLError as e:
logging.error(f"❌ YAML 파싱 오류: {e}")
logging.info("문자열 치환 방식으로 대체합니다...")
# 실패 시 기존 문자열 치환 방식 사용
updated_content = template_content
if app_key:
updated_content = updated_content.replace('my_app: "앱키"', f'my_app: "{app_key}"')
if app_secret:
updated_content = updated_content.replace('my_sec: "앱키 시크릿"', f'my_sec: "{app_secret}"')
if hts_id:
updated_content = updated_content.replace('my_htsid: "사용자 HTS ID"', f'my_htsid: "{hts_id}"')
# ... 나머지 기존 로직
# 4. 수정된 설정을 파일로 저장 (원본 구조 보존)
try:
with open(kis_config_path, 'w', encoding='utf-8') as f:
f.write(updated_content)
logging.info(f"✅ KIS 설정 파일이 생성되었습니다: {kis_config_path}")
# 설정 요약 출력
logging.info("📋 KIS 설정 요약:")
logging.info(f" - 실제 거래: {'' if app_key and app_secret else ''}")
logging.info(f" - 모의 거래: {'' if paper_app_key and paper_app_secret else ''}")
logging.info(f" - 계좌번호: {'' if any([acct_stock, acct_future, paper_stock, paper_future]) else ''}")
logging.info(f" - URL 설정: {'' if any([url_rest, url_rest_paper, url_ws, url_ws_paper]) else ''}")
return True
except Exception as e:
logging.error(f"❌ KIS 설정 파일 생성 실패: {e}")
return False

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,17 @@
[project]
name = "korea-investment-api-mcp"
version = "0.1.0"
description = "한국투자증권 OPEN API MCP 서버 - LLM이 쉽게 사용할 수 있는 금융 API 래퍼"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"fastmcp>=2.11.2",
"pandas>=2.3.1",
"pycryptodome>=3.23.0",
"pydantic>=2.11.7",
"python-dotenv>=1.1.1",
"requests>=2.32.4",
"websockets>=15.0.1",
"PyYAML>=6.0.1",
"sqlalchemy>=2.0.43",
]

View File

@@ -0,0 +1,105 @@
import logging
import os
import platform
import sys
from fastmcp import FastMCP
from module import setup_environment, EnvironmentMiddleware, EnvironmentConfig, setup_kis_config
from module.plugin import Database
from tools import *
logging.basicConfig(
level=logging.DEBUG, # DEBUG 이상 (DEBUG, INFO, WARNING...) 모두 출력
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%H:%M:%S'
)
def main():
env = os.getenv("ENV", None)
# 환경 설정
logging.info("setup environment ...")
env_config = setup_environment(env=env)
# KIS 설정 자동 생성 (템플릿 다운로드 + 값 덮어쓰기)
logging.info("setup KIS configuration ...")
if not setup_kis_config(force_update=env == "live"):
logging.warning("KIS 설정 파일 생성에 실패했습니다. 수동으로 설정해주세요.")
# 데이터베이스 초기화
logging.info("setup database ...")
db = None
db_exists = False
try:
db = Database()
db_exists = os.path.exists(os.path.join("configs/master", "master.db"))
db.new(db_dir="configs/master")
logging.info(f"📁 Available databases: {db.get_available_databases()}")
except Exception as e:
logging.error(f"❌ Database initialization failed: {e}")
sys.exit(1)
# MCP 서버 설정
mcp_server = FastMCP(
name="My Awesome MCP Server",
instructions="This is a server for a specific project.",
version="1.0.0",
stateless_http=False,
)
# middleware
mcp_server.add_middleware(EnvironmentMiddleware(environment=env_config))
# tools 등록
DomesticStockTool().register(mcp_server=mcp_server)
DomesticFutureOptionTool().register(mcp_server=mcp_server)
DomesticBondTool().register(mcp_server=mcp_server)
OverseasStockTool().register(mcp_server=mcp_server)
OverseasFutureOptionTool().register(mcp_server=mcp_server)
ElwTool().register(mcp_server=mcp_server)
EtfEtnTool().register(mcp_server=mcp_server)
AuthTool().register(mcp_server=mcp_server)
# MCP 서버 실행 방식 결정
logging.info(f"🚀 MCP 서버를 {env_config.mcp_type} 모드로 시작합니다...")
if env_config.mcp_type == "stdio":
# stdio 모드 (기본값)
logging.info("📝 stdio 모드로 MCP 서버를 시작합니다.")
mcp_server.run(
transport="stdio"
)
elif env_config.mcp_type == "sse":
# HTTP 모드로 실행
logging.info(f"🌐 Server Sent Event 모드로 MCP 서버를 시작합니다: {env_config.mcp_host}:{env_config.mcp_port}")
mcp_server.run(
transport="sse",
host=env_config.mcp_host,
port=env_config.mcp_port,
path=env_config.mcp_path,
)
elif env_config.mcp_type == "streamable-http":
# HTTP 모드로 실행
logging.info(f"🌐 HTTP 모드로 MCP 서버를 시작합니다: {env_config.mcp_host}:{env_config.mcp_port}")
mcp_server.run(
transport="streamable-http",
host=env_config.mcp_host,
port=env_config.mcp_port,
path=env_config.mcp_path,
)
else:
logging.error(f"❌ 지원하지 않는 MCP_TYPE: {env_config.mcp_type}")
sys.exit(1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("🛑 Application interrupted by user (Ctrl+C)")

View File

@@ -0,0 +1,8 @@
from .domestic_bond import DomesticBondTool
from .domestic_futureoption import DomesticFutureOptionTool
from .domestic_stock import DomesticStockTool
from .elw import ElwTool
from .etfetn import EtfEtnTool
from .overseas_futureoption import OverseasFutureOptionTool
from .overseas_stock import OverseasStockTool
from .auth import AuthTool

View File

@@ -0,0 +1,9 @@
from .base import BaseTool
from module import singleton
@singleton
class AuthTool(BaseTool):
@property
def tool_name(self) -> str:
return "auth"

View File

@@ -0,0 +1,818 @@
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import json
import os
import time
import shutil
import subprocess
import requests
from fastmcp import FastMCP, Context
from module.plugin import MasterFileManager
from module.plugin.database import Database
import module.factory as factory
class ApiExecutor:
"""API 실행 클래스 - GitHub에서 코드를 다운로드하고 실행"""
def __init__(self, tool_name: str):
"""초기화"""
self.tool_name = tool_name
self.temp_base_dir = "./tmp"
# 절대 경로로 venv python 설정
self.venv_python = os.path.join(os.getcwd(), ".venv", "bin", "python")
# temp 디렉토리 생성
os.makedirs(self.temp_base_dir, exist_ok=True)
def _create_temp_directory(self, request_id: str) -> str:
"""임시 디렉토리 생성"""
timestamp = int(time.time() * 1_000_000) # 나노초 단위
temp_dir = os.path.join(self.temp_base_dir, f"{timestamp}_{request_id}")
os.makedirs(temp_dir, exist_ok=True)
return temp_dir
@classmethod
def _download_file(cls, url: str, file_path: str) -> bool:
"""파일 다운로드"""
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
with open(file_path, 'w', encoding='utf-8') as f:
f.write(response.text)
return True
except Exception as e:
print(f"파일 다운로드 실패: {url}, 오류: {str(e)}")
return False
def _download_kis_auth(self, temp_dir: str) -> bool:
"""kis_auth.py 다운로드"""
kis_auth_url = "https://raw.githubusercontent.com/koreainvestment/open-trading-api/main/examples_llm/kis_auth.py"
kis_auth_path = os.path.join(temp_dir, "kis_auth.py")
return self._download_file(kis_auth_url, kis_auth_path)
def _download_api_code(self, github_url: str, temp_dir: str, api_type: str) -> str:
"""API 코드 다운로드"""
# GitHub URL을 raw URL로 변환하고 api_type/api_type.py를 붙여서 실제 파일 경로 생성
raw_url = github_url.replace('/tree/', '/').replace('github.com', 'raw.githubusercontent.com')
full_url = f"{raw_url}/{api_type}.py"
api_code_path = os.path.join(temp_dir, "api_code.py")
if self._download_file(full_url, api_code_path):
return api_code_path
else:
raise Exception(f"API 코드 다운로드 실패: {full_url}")
@classmethod
def _extract_trenv_params_from_example(cls, api_code_content: str) -> Dict[str, str]:
"""예제 파일에서 trenv 사용 패턴 완전 추출"""
import re
# 🎯 완전 자동화: param_name=xxx.my_attr 패턴 찾기 (변수명 무관)
trenv_mapping_pattern = r'(\w+)=\w*\.(my_\w+)'
matches = re.findall(trenv_mapping_pattern, api_code_content)
dynamic_mappings = {}
discovered_mappings = []
for param_name, trenv_attr in matches:
# 발견된 매핑을 그대로 사용 (완전 자동화!)
trenv_value = f'ka._TRENV.{trenv_attr}'
# 소문자 버전 (함수 파라미터)
dynamic_mappings[param_name] = trenv_value
# 대문자 버전 (API 파라미터)
dynamic_mappings[param_name.upper()] = trenv_value
discovered_mappings.append(f"{param_name}=xxx.{trenv_attr}")
if discovered_mappings:
print(f"[🎯자동발견] {len(discovered_mappings)}개 매핑: {', '.join(discovered_mappings)}")
print(f"[🎯자동생성] {len(dynamic_mappings)}개 파라미터: {list(dynamic_mappings.keys())}")
else:
print("[🎯자동발견] .my_xxx 패턴 없음 - 조회성 API로 추정")
return dynamic_mappings
@classmethod
def _modify_api_code(cls, api_code_path: str, params: Dict[str, Any], api_type: str) -> str:
"""API 코드 수정 (파라미터 적용)"""
try:
import re
with open(api_code_path, 'r', encoding='utf-8') as f:
code = f.read()
# 1. sys.path.extend 관련 코드 제거
code = re.sub(r"sys\.path\.extend\(\[.*?\]\)", "", code, flags=re.DOTALL)
code = re.sub(r"import sys\n", "", code) # import sys도 제거
# 2. 코드에서 함수명과 시그니처 추출
function_match = re.search(r'def\s+(\w+)\s*\((.*?)\):', code, re.DOTALL)
if not function_match:
raise Exception("코드에서 함수를 찾을 수 없습니다.")
function_name = function_match.group(1)
function_params = function_match.group(2)
# 3. 함수가 max_depth 파라미터를 받는지 확인
has_max_depth = 'max_depth' in function_params
# 4. 파라미터 조정
adjusted_params = params.copy()
# max_depth 파라미터 처리
if has_max_depth:
# 함수가 max_depth를 받는 경우에만 처리
if 'max_depth' not in adjusted_params:
adjusted_params['max_depth'] = 1
print(f"[기본값] {function_name} 함수에 max_depth=1 설정")
else:
print(f"[사용자 설정] {function_name} 함수에 max_depth={adjusted_params['max_depth']} 사용")
else:
# 함수가 max_depth를 받지 않는 경우 제거
if 'max_depth' in adjusted_params:
del adjusted_params['max_depth']
print(f"[제거] {function_name} 함수는 max_depth 파라미터를 지원하지 않아 제거함")
# 🆕 동적으로 trenv 패턴 추출
dynamic_mappings = cls._extract_trenv_params_from_example(code)
# 기본 매핑과 동적 매핑 결합
account_mappings = {
'cano': 'ka._TRENV.my_acct', # 종합계좌번호 (변수 접근)
'acnt_prdt_cd': 'ka._TRENV.my_prod', # 계좌상품코드 (변수 접근)
'my_htsid': 'ka._TRENV.my_htsid', # HTS ID (변수 접근)
'user_id': 'ka._TRENV.my_htsid', # domestic_stock에서 발견된 변형
**dynamic_mappings # 동적으로 발견된 매핑 추가
}
for param_name, correct_value in account_mappings.items():
if param_name in function_params:
if param_name in adjusted_params:
original_value = adjusted_params[param_name]
adjusted_params[param_name] = correct_value
print(f"[보안강제] {function_name} 함수의 {param_name}='{original_value}'{correct_value} (LLM값 무시)")
else:
adjusted_params[param_name] = correct_value
print(f"[자동설정] {function_name} 함수에 {param_name}={correct_value} 설정")
# 거래소ID구분코드 처리 (API 타입 기반 추론)
if 'excg_id_dvsn_cd' in function_params and 'excg_id_dvsn_cd' not in adjusted_params:
if api_type.startswith('domestic'):
adjusted_params['excg_id_dvsn_cd'] = '"KRX"'
print(f"[추론] 국내 API({api_type})로 판단하여 excg_id_dvsn_cd='KRX' 설정")
else:
print(f"[경고] {api_type} API에서 excg_id_dvsn_cd 파라미터가 필요합니다. (예: NASD, NYSE, KRX)")
# overseas_stock 등은 사용자가 명시적으로 제공해야 함
# 5. 함수 호출 코드 생성 (ka.auth() - env_dv에 따라 분기)
# env_dv 값에 따른 인증 방식 결정
env_dv = params.get('env_dv', 'real')
if env_dv == 'demo':
auth_code = 'ka.auth("vps")'
print(f"[모의투자] {function_name} 함수에 ka.auth(\"vps\") 적용")
else:
auth_code = 'ka.auth()'
print(f"[실전투자] {function_name} 함수에 ka.auth() 적용")
call_code = f"""
# API 함수 호출
if __name__ == "__main__":
try:
# 인증 초기화 (env_dv={env_dv})
{auth_code}
result = {function_name}({", ".join([f"{k}={v if isinstance(v, str) and v.startswith('ka._TRENV.') else repr(v)}" for k, v in adjusted_params.items()])})
except TypeError as e:
# 🚨 핵심 오류 메시지만 출력
print(f"❌ TypeError: {{str(e)}}")
print()
# 파라미터 오류 처리 - LLM 교육용 메시지
if 'stock_name' in {repr(list(params.keys()))}:
print("💡 해결방법: find_stock_code로 종목을 검색하세요.")
else:
print("💡 해결방법: find_api_detail로 API 상세 정보를 확인하세요")
import sys
sys.exit(1)
try:
# N개 튜플 반환 함수 처리 (예: inquire_balance는 (df1, df2) 반환)
if isinstance(result, tuple):
# 튜플인 경우 - N개의 DataFrame 처리
output = {{}}
for i, item in enumerate(result):
if hasattr(item, 'to_dict'):
# DataFrame인 경우
output[f"output{{i+1}}"] = item.to_dict('records') if not item.empty else []
else:
# 일반 객체인 경우
output[f"output{{i+1}}"] = str(item)
import json
print(json.dumps(output, ensure_ascii=False, indent=2))
elif hasattr(result, 'empty') and not result.empty:
print(result.to_json(orient='records', force_ascii=False))
elif isinstance(result, dict):
import json
print(json.dumps(result, ensure_ascii=False))
elif isinstance(result, (list, tuple)):
import json
print(json.dumps(result, ensure_ascii=False))
else:
print(str(result))
except Exception as e:
print(f"오류 발생: {{str(e)}}")
"""
# 6. 코드 끝에 함수 호출 추가
modified_code = code + call_code
# 7. 수정된 코드 저장
with open(api_code_path, 'w', encoding='utf-8') as f:
f.write(modified_code)
return api_code_path
except Exception as e:
raise Exception(f"코드 수정 실패: {str(e)}")
def _execute_code(self, temp_dir: str, timeout: int = 15) -> Dict[str, Any]:
"""코드 실행"""
try:
# 실행할 파일 경로 (상대 경로로 변경)
api_code_path = "api_code.py"
# subprocess로 코드 실행
result = subprocess.run(
[self.venv_python, api_code_path],
cwd=temp_dir,
capture_output=True,
text=True,
timeout=timeout
)
if result.returncode == 0:
# 성공 시 stdout을 결과로 반환
return {
"success": True,
"output": result.stdout,
"error": result.stderr
}
else:
# 실패 시 stderr와 stdout 모두 확인
error_message = result.stderr if result.stderr else result.stdout
return {
"success": False,
"output": result.stdout,
"error": error_message
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"실행 시간 초과 ({timeout}초)"
}
except Exception as e:
return {
"success": False,
"error": f"실행 중 오류: {str(e)}"
}
def _cleanup_temp_directory(self, temp_dir: str):
"""임시 디렉토리 정리"""
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
print(f"임시 디렉토리 정리 실패: {temp_dir}, 오류: {str(e)}")
async def execute_api(self, ctx: Context, api_type: str, params: Dict[str, Any], github_url: str) -> Dict[str, Any]:
"""API 실행 메인 함수"""
temp_dir = None
start_time = time.time()
try:
await ctx.info(f"API 실행 시작: {api_type}")
# 1. 임시 디렉토리 생성
# FastMCP Context에서 request_id 안전하게 가져오기
try:
request_id = ctx.get_state(factory.CONTEXT_REQUEST_ID)
except:
request_id = "unknown"
temp_dir = self._create_temp_directory(request_id)
# 2. kis_auth.py 다운로드
if not self._download_kis_auth(temp_dir):
raise Exception("kis_auth.py 다운로드 실패")
# 3. API 코드 다운로드
api_code_path = self._download_api_code(github_url, temp_dir, api_type)
# 4. 코드 수정
self._modify_api_code(api_code_path, params, api_type)
# 5. 코드 실행
execution_result = self._execute_code(temp_dir)
# 6. 실행 시간 계산
execution_time = time.time() - start_time
# 7. 결과 반환
result = {
"success": execution_result["success"],
"api_type": api_type,
"params": params,
"message": f"{self.tool_name} API 호출 완료",
"execution_time": f"{execution_time:.2f}s",
"temp_dir": temp_dir,
"venv_used": True,
"cleanup_success": True
}
if execution_result["success"]:
result["data"] = execution_result["output"]
else:
result["error"] = execution_result["error"]
return result
except Exception as e:
await ctx.error(f"API 실행 중 오류: {str(e)}")
return {
"success": False,
"api_type": api_type,
"params": params,
"error": str(e),
"execution_time": f"{time.time() - start_time:.2f}s",
"temp_dir": temp_dir,
"venv_used": True,
"cleanup_success": False
}
finally:
# 8. 임시 디렉토리 정리
if temp_dir:
self._cleanup_temp_directory(temp_dir)
class BaseTool(ABC):
"""MCP 도구 기본 클래스"""
def __init__(self):
"""도구 초기화"""
self._load_config()
self.api_executor = ApiExecutor(self.tool_name)
self.master_file_manager = MasterFileManager(self.tool_name)
self.db = Database()
# ========== Abstract Properties ==========
@property
@abstractmethod
def tool_name(self) -> str:
"""도구 이름 (하위 클래스에서 구현 필수)"""
pass
# ========== Public Properties ==========
@property
def description(self) -> str:
"""도구 설명 (분류.json에서 동적 생성)"""
return self._generate_description()
@property
def config_file(self) -> str:
"""JSON 설정 파일 경로 (tool_name 기반 자동 생성)"""
return f"./configs/{self.tool_name}.json"
# ========== Public Methods ==========
def register(self, mcp_server: FastMCP) -> None:
"""MCP 서버에 도구 등록"""
mcp_server.tool(
self._run,
name=self.tool_name,
description=self.description,
)
# ========== Protected Methods ==========
def _load_config(self) -> None:
"""JSON 설정 파일 로드"""
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
self.config = json.load(f)
except FileNotFoundError:
# 임시로 빈 설정으로 초기화
self.config = {"apis": {}}
def _generate_description(self) -> str:
"""분류.json에서 도구 설명 동적 생성"""
try:
config_json_path = f"./configs/{self.tool_name}.json"
with open(config_json_path, 'r', encoding='utf-8') as f:
config_data = json.load(f)
tool_info = config_data.get("tool_info")
apis = config_data.get("apis", {})
if not tool_info:
return f"{self.tool_name} 도구의 tool_info가 없습니다."
# description 문자열 구성
lines = [tool_info.get("introduce", "")]
# introduce_append가 있으면 추가
introduce_append = tool_info.get("introduce_append", "").strip()
if introduce_append:
lines.append(introduce_append)
lines.append("") # 빈 줄
lines.append("[지원 기능]")
# API 목록 추가
for api_type, api_info in apis.items():
lines.append(f"- {api_info['name']} (api_type: \"{api_type}\")")
lines.append("") # 빈 줄
# 개선된 구조 적용
lines.append("📋 사용 방법:")
lines.append("1. find_api_detail로 API 상세 정보를 확인하세요")
lines.append("2. api_type을 선택하고 params에 필요한 파라미터를 입력하세요")
lines.append("3. 종목명으로 검색할 경우: stock_name='종목명' 파라미터를 사용하세요")
lines.append("4. 모의투자 시에는 env_dv='demo'를 추가하세요")
lines.append("")
lines.append("🔧 특별한 api_type 및 예시:")
lines.append(f"- find_stock_code (종목번호 검색) : {self.tool_name}({{ \"api_type\": \"find_stock_code\", \"params\": {{ \"stock_name\": \"삼성전자\" }} }})")
lines.append(f"- find_api_detail (API 정보 조회) : {self.tool_name}({{ \"api_type\": \"find_api_detail\", \"params\": {{ \"api_type\": \"inquire_price\" }} }})")
lines.append("")
lines.append("🔍 종목명 사용: stock_name=\"삼성전자\" → 자동으로 종목번호 변환하여 실행")
lines.append(f"{self.tool_name}({{ \"api_type\": \"inquire_price\", \"params\": {{ \"stock_name\": \"삼성전자\" }} }})")
lines.append("")
lines.append("💡 주요 파라미터:")
if self.tool_name.startswith('domestic'):
lines.append("- 시장코드(fid_cond_mrkt_div_code)='J'(KRX)/'NX'(넥스트레이드)/'UN'(통합)")
lines.append("- 매매구분(ord_dv)='buy'(매수)/'sell'(매도)")
lines.append("- 실전모의구분(env_dv)='real'(실전)/'demo'(모의)")
lines.append("")
lines.append("⚠️ 중요: API 호출 시 필수 주의사항")
lines.append("**API 실행 전 반드시 API 상세 문서의 파라미터를 확인하세요. Request Query Params와 Request Body 입력 시 추측이나 과거 실행 값 사용 금지, 확인된 API 상세 문서의 값을 사용하세요.**")
lines.append("**파라미터 description에 '공란'이 있는 경우 기본적으로 빈값으로 처리하되, 아닌 경우에는 값을 넣어도 됩니다.**")
lines.append("**🎯 모의투자 관련: 사용자가 '모의', '모의투자', '데모', '테스트' 등의 용어를 언급하거나 모의투자 관련 요청을 할 경우, 반드시 env_dv 파라미터를 'demo'로 설정하여 API를 호출해야 합니다. env_dv 파라미터가 있는 모든 API에서 모의투자 시에는 env_dv='demo', 실전투자 시에는 env_dv='real'을 사용합니다. 기본값은 'real'이므로 모의투자 요청 시 반드시 env_dv='demo'를 명시적으로 설정해주세요.**")
lines.append("")
lines.append("🔒 자동 처리되는 파라미터 (제공하지 마세요):")
lines.append("• cano (계좌번호), acnt_prdt_cd (계좌상품코드), my_htsid (HTS ID) - 시스템 자동 설정")
if self.tool_name.startswith('domestic'):
lines.append("• excg_id_dvsn_cd (거래소구분) - 국내 API는 자동으로 KRX 설정")
lines.append("")
# 예시 호출 추가
examples = tool_info.get("examples", [])
if examples:
lines.append("💻 예시 호출:")
for example in examples:
params_str = json.dumps(example.get('params', {}), ensure_ascii=False)
lines.append(
f"{self.tool_name}({{ \"api_type\": \"{example['api_type']}\",\"params\": {params_str} }})")
return "\n".join(lines)
except Exception as e:
return f"{self.tool_name} 도구 설명 생성 중 오류: {str(e)}"
async def _run(self, ctx: Context, api_type: str, params: dict) -> Dict[str, Any]:
"""공통 실행 로직"""
try:
await ctx.info(f"{self.tool_name} running with api_type: {api_type}")
# 1) 인자 구조 검증(가볍게)
if not api_type or not isinstance(params, dict):
return {
"ok": False,
"error": "MISSING_OR_INVALID_ARGS",
"missing": [k for k in ("api_type", "params") if
not (api_type if k == "api_type" else isinstance(params, dict))],
"invalid": [] if isinstance(params, dict) else [{"field": "params", "expected": "object"}],
}
# 2. 특별한 api_type 처리
if api_type == "find_stock_code":
return await self._handle_find_stock_code(ctx, params)
elif api_type == "find_api_detail":
return await self._handle_find_api_detail(ctx, params)
# 3. API 설정 조회
if api_type not in self.config['apis']:
return {"ok": False, "error": f"지원하지 않는 API 타입: {api_type}"}
# 4. 종목명 자동 처리 (stock_name이 있으면 자동으로 pdno 변환)
params = await self._process_stock_name(ctx, params)
# 5. 실제 실행 (래핑 함수 선택 → OPEN API 호출)
data = await self._run_api(ctx, api_type, params)
return {"ok": True, "data": data}
except Exception as e:
await ctx.error(f"실행 중 오류: {str(e)}")
return {"ok": False, "error": str(e)}
async def _run_api(self, ctx: Context, api_type: str, params: Dict[str, Any]) -> Any:
"""API 실행 - ApiExecutor 사용"""
try:
api_info = self.config['apis'][api_type]
github_url = api_info.get('github_url')
if not github_url:
return {"error": f"GitHub URL이 없습니다: {api_type}"}
# ApiExecutor를 사용하여 API 실행
result = await self.api_executor.execute_api(
ctx=ctx,
api_type=api_type,
params=params,
github_url=github_url
)
return result
except Exception as e:
return {"error": f"API 실행 중 오류: {str(e)}"}
async def _process_stock_name(self, ctx: Context, params: Dict[str, Any]) -> Dict[str, Any]:
"""종목명/종목코드 자동 처리 (stock_name이 있으면 자동으로 pdno 변환)"""
try:
# 종목명으로 찾을 수 있는 파라미터들
stock_name_params = ["stock_name", "stock_name_kr", "korean_name", "company_name"]
# 파라미터에서 종목명/종목코드 찾기
search_value = None
for param_name in stock_name_params:
if param_name in params and params[param_name]:
search_value = params[param_name]
break
# 검색할 값이 없으면 그대로 반환
if not search_value:
return params
await ctx.info(f"검색값 발견: {search_value}, 자동 검색 시작")
# 종목명 또는 종목코드로 검색
result = await self._find_stock_by_name_or_code(ctx, search_value)
if result["found"]:
params["pdno"] = result["code"]
await ctx.info(f"종목번호 자동 찾기 성공: {search_value}{result['code']}")
# 원본 검색값 보존
params["_original_search_value"] = search_value
params["_resolved_stock_code"] = result["code"]
else:
await ctx.warning(f"종목을 찾을 수 없음: {search_value}")
# 종목을 찾지 못해도 원본 파라미터 유지
return params
except Exception as e:
await ctx.error(f"종목명 자동 처리 실패: {str(e)}")
return params
async def _find_stock_by_name_or_code(self, ctx: Context, search_value: str) -> Dict[str, Any]:
"""종목명 또는 종목코드로 종목번호 찾기"""
try:
# 검색어에서 띄어쓰기 제거
search_term = search_value.replace(" ", "")
# 데이터베이스 연결 확인
if not self.db.ensure_initialized():
return {"found": False, "message": "데이터베이스 초기화 실패"}
# 마스터 파일 업데이트 확인 (force_update=False로 필요시에만 업데이트)
try:
from module.plugin import MasterFileManager
master_file_manager = MasterFileManager(self.tool_name)
await master_file_manager.ensure_master_file_updated(ctx, force_update=False)
except Exception as e:
await ctx.warning(f"마스터 파일 업데이트 확인 중 오류: {str(e)}")
# DB 엔진
db_engine = self.db.get_by_name("master")
master_models = MasterFileManager.get_master_models_for_tool(self.tool_name)
if not master_models:
return {"found": False, "message": f"지원하지 않는 툴: {self.tool_name}"}
# 각 모델에서 우선순위별 검색
for model_class in master_models:
try:
# 1순위: 종목코드로 완전 매칭
code_results = db_engine.list(
model_class,
filters={"code": search_term},
limit=1
)
if code_results:
result = code_results[0]
return {
"found": True,
"code": result.code,
"name": result.name,
"ex": result.ex if hasattr(result, 'ex') else None,
"match_type": "code_exact"
}
# 2순위: 종목명으로 완전 매칭
name_results = db_engine.list(
model_class,
filters={"name": search_term},
limit=1
)
if name_results:
result = name_results[0]
return {
"found": True,
"code": result.code,
"name": result.name,
"ex": result.ex if hasattr(result, 'ex') else None,
"match_type": "name_exact"
}
# 3순위: 종목명으로 앞글자 매칭
prefix_results = db_engine.list(
model_class,
filters={"name": f"{search_term}%"},
limit=1
)
if prefix_results:
result = prefix_results[0]
return {
"found": True,
"code": result.code,
"name": result.name,
"ex": result.ex if hasattr(result, 'ex') else None,
"match_type": "name_prefix"
}
# 4순위: 종목명으로 중간 매칭
contains_results = db_engine.list(
model_class,
filters={"name": f"%{search_term}%"},
limit=1
)
if contains_results:
result = contains_results[0]
return {
"found": True,
"code": result.code,
"name": result.name,
"ex": result.ex if hasattr(result, 'ex') else None,
"match_type": "name_contains"
}
except Exception as e:
continue
return {"found": False, "message": f"종목을 찾을 수 없음: {search_value}"}
except Exception as e:
return {"found": False, "message": f"종목 검색 오류: {str(e)}"}
def get_api_info(self, api_type: str) -> Dict[str, Any]:
"""API 정보 조회 (리소스 기능 통합)"""
try:
# API 설정 조회
if api_type not in self.config['apis']:
return {
"error": f"지원하지 않는 API 타입: {api_type}",
"available_apis": list(self.config['apis'].keys()),
"api_type": api_type
}
# API 정보 반환
api_info = self.config['apis'][api_type]
# 파라미터 정보 정리
params = api_info.get("params", {})
param_details = {}
for param_name, param_info in params.items():
param_details[param_name] = {
"name": param_info.get("name", param_name),
"type": param_info.get("type", "str"),
"required": param_info.get("required", False),
"default_value": param_info.get("default_value"),
"description": param_info.get("description", "")
}
result = {
"tool_name": self.tool_name,
"api_type": api_type,
"name": api_info.get("name", ""),
"category_detail": api_info.get("category", ""),
"method": api_info.get("method", ""),
"api_path": api_info.get("api_path", ""),
"github_url": api_info.get("github_url", ""),
"params": param_details
}
return result
except Exception as e:
return {
"error": f"API 정보 조회 중 오류 발생: {str(e)}",
"tool_name": self.tool_name,
"api_type": api_type
}
async def _handle_find_stock_code(self, ctx: Context, params: Dict[str, Any]) -> Dict[str, Any]:
"""종목 검색 처리"""
try:
await ctx.info(f"종목 검색 요청: {self.tool_name}")
# stock_name 파라미터 확인
search_value = params.get("stock_name")
if not search_value:
return {
"ok": False,
"error": "MISSING_OR_INVALID_ARGS",
"missing": ["stock_name"],
"message": "stock_name 파라미터가 필요합니다. (종목명 또는 종목코드 입력 가능)"
}
# 종목 검색 실행 (종목명 또는 종목코드)
result = await self._find_stock_by_name_or_code(ctx, search_value)
if result["found"]:
return {
"ok": True,
"data": {
"tool_name": self.tool_name,
"search_value": search_value,
"found": True,
"stock_code": result["code"],
"stock_name_found": result["name"],
"ex": result.get("ex"),
"match_type": result.get("match_type"),
"message": f"'{search_value}' 종목을 찾았습니다. 종목번호: {result['code']}",
"usage_guide": f"find_api_detail로 API상세정보를 확인하고 종목코드 '{result['code']}'를 해당 API의 종목코드 필드에 입력하여 실행하세요.",
"next_step": f"{self.tool_name} 툴에서 find_api_detail로 확인한 종목코드 필드에 '{result['code']}'를 입력하세요."
}
}
else:
return {
"ok": False,
"error": "STOCK_NOT_FOUND",
"message": f"'{search_value}' 종목을 찾을 수 없습니다.",
"suggestions": [
"종목명의 철자가 정확한지 확인",
"종목코드가 정확한지 확인",
"띄어쓰기나 특수문자가 있는지 확인",
"다른 검색어로 시도 (예: '삼성전자' 대신 '삼성' 또는 '005930')"
]
}
except Exception as e:
await ctx.error(f"종목 검색 처리 중 오류: {str(e)}")
return {"ok": False, "error": str(e)}
async def _handle_find_api_detail(self, ctx: Context, params: Dict[str, Any]) -> Dict[str, Any]:
"""API 상세 정보 조회 처리"""
try:
await ctx.info(f"API 상세 정보 조회 요청: {self.tool_name}")
# api_type 파라미터 확인
target_api_type = params.get("api_type")
if not target_api_type:
return {
"ok": False,
"error": "MISSING_OR_INVALID_ARGS",
"missing": ["api_type"],
"message": "api_type 파라미터가 필요합니다.",
"available_apis": list(self.config['apis'].keys())
}
# API 정보 조회
api_info = self.get_api_info(target_api_type)
if "error" in api_info:
return {
"ok": False,
"error": api_info["error"],
"available_apis": api_info.get("available_apis", [])
}
return {
"ok": True,
"data": api_info
}
except Exception as e:
await ctx.error(f"API 상세 정보 조회 처리 중 오류: {str(e)}")
return {"ok": False, "error": str(e)}

View File

@@ -0,0 +1,10 @@
from .base import BaseTool
from module import singleton
@singleton
class DomesticBondTool(BaseTool):
@property
def tool_name(self) -> str:
return "domestic_bond"

View File

@@ -0,0 +1,15 @@
from .base import BaseTool
from module import singleton
import pandas as pd
import urllib.request
import ssl
import zipfile
import os
@singleton
class DomesticFutureOptionTool(BaseTool):
@property
def tool_name(self) -> str:
return "domestic_futureoption"

View File

@@ -0,0 +1,9 @@
from .base import BaseTool
from module import singleton
@singleton
class DomesticStockTool(BaseTool):
@property
def tool_name(self) -> str:
return "domestic_stock"

View File

@@ -0,0 +1,9 @@
from .base import BaseTool
from module import singleton
@singleton
class ElwTool(BaseTool):
@property
def tool_name(self) -> str:
return "elw"

View File

@@ -0,0 +1,9 @@
from .base import BaseTool
from module import singleton
@singleton
class EtfEtnTool(BaseTool):
@property
def tool_name(self) -> str:
return "etfetn"

View File

@@ -0,0 +1,9 @@
from .base import BaseTool
from module import singleton
@singleton
class OverseasFutureOptionTool(BaseTool):
@property
def tool_name(self) -> str:
return "overseas_futureoption"

View File

@@ -0,0 +1,9 @@
from .base import BaseTool
from module import singleton
@singleton
class OverseasStockTool(BaseTool):
@property
def tool_name(self) -> str:
return "overseas_stock"