diff --git a/Dockerfile b/Dockerfile index f819cf1..ff8c30b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,4 +31,4 @@ EXPOSE 80 # Run FastAPI server # Assuming main.py is in backend folder and app object is named 'app' -CMD ["uvicorn", "backend.main:app", "--host", "0.0.0.0", "--port", "80"] \ No newline at end of file +CMD ["uvicorn", "backend.app.main:app", "--host", "0.0.0.0", "--port", "80"] \ No newline at end of file diff --git a/backend/.env.example b/backend/.env.example new file mode 100644 index 0000000..c1e111a --- /dev/null +++ b/backend/.env.example @@ -0,0 +1,14 @@ +# Server Config +PORT=80 +HOST=0.0.0.0 + +# Security +ALLOWED_HOSTS=["kis.tindevil.com", "localhost", "127.0.0.1"] +SECRET_KEY=change_this_to_a_secure_random_string + +# Database +DATABASE_URL=sqlite+aiosqlite:///./kis_stock.db + +# KIS API (Optional here, managed in DB mostly) +# KIS_APP_KEY= +# KIS_APP_SECRET= diff --git a/backend/TODO.md b/backend/TODO.md index 58fda49..bfad83c 100644 --- a/backend/TODO.md +++ b/backend/TODO.md @@ -6,46 +6,46 @@ --- ## Phase 1: 프로젝트 스캐폴딩 (Scaffolding) -- [ ] **디렉토리 구조 생성** +- [x] **디렉토리 구조 생성** - `app/core` (설정, 인증, 유틸) - `app/db` (데이터베이스 연결, 모델) - `app/api` (End-points) - `app/services` (비즈니스 로직, 외부 통신) - `app/workers` (백그라운드 스케줄러) -- [ ] **환경 설정 (`config.py`)** +- [x] **환경 설정 (`config.py`)** - Pydantic `BaseSettings` 활용 - `.env` 파일 연동 (APP_KEY, SECRET 등) - Docker 환경 변수 처리 - **Domain Config**: `ALLOWED_HOSTS=["kis.tindevil.com", ...]` 설정 추가 -- [ ] **FastAPI 기본 앱 작성 (`main.py`)** +- [x] **FastAPI 기본 앱 작성 (`main.py`)** - CORS 설정 - Health Check 엔드포인트 (`/health`) - Static Files Mount (Frontend `dist` 폴더 연결 준비) ## Phase 2: 데이터베이스 구현 (Database) -- [ ] **SQLite 연동 (`database.py`)** +- [x] **SQLite 연동 (`database.py`)** - SQLAlchemy `sessionmaker` 설정 (비동기 `AsyncSession` 권장) -- [ ] **ORM 모델 정의 (`db/models.py`)** +- [x] **ORM 모델 정의 (`db/models.py`)** - `ApiSettings` (설정) - `StockItem` (관심/보유 종목) - `ReservedOrder` (감시 주문) - `TradeOrder` (매매 기록) - `CacheTable` (지수, 뉴스 등 임시 데이터) -- [ ] **마이그레이션 및 초기화** +- [x] **마이그레이션 및 초기화** - `init_db()` 함수 구현: 앱 시작 시 테이블 자동 생성 (`Base.metadata.create_all`) ## Phase 3: 코어 인프라 (Core Infrastructure) -- [ ] **Rate Limiter (속도 제한 큐)** +- [x] **Rate Limiter (속도 제한 큐)** - `RateLimiter` 클래스: `asyncio.Queue` 기반 중앙 제어 - `TokenBucket` 또는 단순 `Delay` (250ms) 로직 구현 -- [ ] **Token Manager (인증 관리)** +- [x] **Token Manager (인증 관리)** - `KisAuth` 클래스: Access Token 발급/저장/갱신 - **Auto-Retry**: 401 에러 인터셉터 및 재발급 로직 -- [ ] **Market Schedule (장 운영시간)** +- [x] **Market Schedule (장 운영시간)** - `MarketCalendar` 유틸: 현재 국내/해외 장 운영 여부 판단 (`IsMarketOpen`) ## Phase 4: 내부 서비스 (Internal Services) -- [ ] **Brokerage Service (증권사 통신)** +- [x] **Brokerage Service (증권사 통신)** - 파일: `app/services/kis_client.py` - 구현: `api.md`의 **Section 9. Integration Map**에 정의된 API 엔드포인트 연동 - **참고 샘플 (KIS Samples)**: @@ -59,36 +59,39 @@ - 시세: `overseas_stock/price/price.py` - 잔고: `overseas_stock/inquire_balance/inquire_balance.py` - 모든 호출은 `Rate Limiter` 경유 필수 -- [ ] **Realtime Manager (웹소켓)** +- [x] **Realtime Manager (웹소켓)** - `KisWebSocket` 클라이언트: Approval Key 발급 및 연결 - - **Reference Counting**: `Subscribe(code)`, `Unsubscribe(code)` 구현 - - 수신 데이터 파싱 및 Event Bus 전파 -- [ ] **AI Orchestrator** - - Gemini/OpenAI API 연동 핸들러 + - `OPS` (실전) / `VOPS` (모의) 자동 전환 + - **Subscription**: 종목 등록/해제 (`H0STCNT0` 등) + - **PINGPONG**: 자동 응답 처리 + - **Data Handler**: 수신 데이터 파싱 및 DB/Cache 업데이트 (암호화 `AES256` 해독 포함) +- [x] **AI Orchestrator** + - `AIFactory`: Provider(Gemini/Ollama/OpenAI) 추상화 및 인스턴스 생성 + - `AIOrchestrator`: DB 설정(`AiConfig`) 기반 최적 모델 자동 선택 및 실행 요청 ## Phase 5: API 엔드포인트 구현 (Endpoints) -- [ ] **Settings API** (`/api/settings`) +- [x] **Settings API** (`/api/settings`) - API Key, Rate Limit 설정 조회/수정 -- [ ] **Stock/Market API** (`/api/kis/...`) +- [x] **Stock/Market API** (`/api/kis/...`) - 시세 조회, 차트 데이터, 관심종목 관리 -- [ ] **Trading API** (`/api/trade/...`) +- [x] **Trading API** (`/api/kis/order`, `/api/kis/balance`) - 주문 전송, 잔고 조회, 예약 주문(감시) CRUD - [ ] **Real-time API** (Frontend WebSocket) - `/ws/client`: 프론트엔드와 연결, KIS 데이터 중계 ## Phase 6: 스케줄러 및 자동화 (Automation) -- [ ] **Startup Sequence** +- [x] **Startup Sequence** - DB 체크 -> 토큰 로드 -> Telegram 알림 -> 스케줄러 시작 -- [ ] **Background Workers** +- [x] **Background Workers** - `PersistenceWorker`: 메모리 내 시세 데이터 DB 주기적 저장 (Passive) - `NewsScraper`: 네이버 뉴스 주기적 수집 - `AutoTradingScanner`: 1분 단위 예약 주문 감시 ## Phase 7: 통합 및 배포 (Integration) -- [ ] **Frontend 연동** +- [x] **Frontend 연동** - React 빌드 (`npm run build`) -> backend/static 복사 - 단일 포트(80) 서빙 테스트 -- [ ] **Docker 빌드** +- [x] **Docker 빌드** - `docker-compose up` 테스트 - Dokploy 배포 및 볼륨 마운트 확인 diff --git a/backend/app/__init__.py b/backend/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/api/api.py b/backend/app/api/api.py new file mode 100644 index 0000000..6b8a69c --- /dev/null +++ b/backend/app/api/api.py @@ -0,0 +1,8 @@ +from fastapi import APIRouter +from app.api.endpoints import settings, kis + +api_router = APIRouter() + +api_router.include_router(settings.router, prefix="/settings", tags=["settings"]) +api_router.include_router(kis.router, prefix="/kis", tags=["kis"]) +# api_router.include_router(trade.router, prefix="/trade", tags=["trade"]) diff --git a/backend/app/api/endpoints/kis.py b/backend/app/api/endpoints/kis.py new file mode 100644 index 0000000..17ece84 --- /dev/null +++ b/backend/app/api/endpoints/kis.py @@ -0,0 +1,41 @@ +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from typing import Literal + +from app.services.kis_client import kis_client + +router = APIRouter() + +class OrderRequest(BaseModel): + market: Literal["Domestic", "Overseas"] + side: Literal["buy", "sell"] + code: str + quantity: int + price: float = 0 # 0 for Market Price (if supported) + +@router.get("/price") +async def get_current_price(market: Literal["Domestic", "Overseas"], code: str): + """ + Get Real-time Price (REST). Prefer WebSocket for streaming. + """ + try: + price = await kis_client.get_current_price(market, code) + return {"code": code, "price": price} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/balance") +async def get_balance(market: Literal["Domestic", "Overseas"]): + try: + data = await kis_client.get_balance(market) + return data + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/order") +async def place_order(order: OrderRequest): + try: + res = await kis_client.place_order(order.market, order.side, order.code, order.quantity, order.price) + return res + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/backend/app/api/endpoints/settings.py b/backend/app/api/endpoints/settings.py new file mode 100644 index 0000000..8d52907 --- /dev/null +++ b/backend/app/api/endpoints/settings.py @@ -0,0 +1,53 @@ +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from pydantic import BaseModel + +from app.db.database import get_db +from app.db.models import ApiSettings +from app.services.kis_auth import kis_auth + +router = APIRouter() + +class SettingsSchema(BaseModel): + # Partial schema for updates + appKey: str | None = None + appSecret: str | None = None + accountNumber: str | None = None + kisApiDelayMs: int | None = None + + class Config: + from_attributes = True + +@router.get("/", response_model=SettingsSchema) +async def get_settings(db: AsyncSession = Depends(get_db)): + stmt = select(ApiSettings).where(ApiSettings.id == 1) + result = await db.execute(stmt) + settings = result.scalar_one_or_none() + if not settings: + raise HTTPException(status_code=404, detail="Settings not initialized") + return settings + +@router.put("/", response_model=SettingsSchema) +async def update_settings(payload: SettingsSchema, db: AsyncSession = Depends(get_db)): + stmt = select(ApiSettings).where(ApiSettings.id == 1) + result = await db.execute(stmt) + settings = result.scalar_one_or_none() + + if not settings: + settings = ApiSettings(id=1) + db.add(settings) + + # Update fields if provided + if payload.appKey is not None: settings.appKey = payload.appKey + if payload.appSecret is not None: settings.appSecret = payload.appSecret + if payload.accountNumber is not None: settings.accountNumber = payload.accountNumber + if payload.kisApiDelayMs is not None: settings.kisApiDelayMs = payload.kisApiDelayMs + + await db.commit() + await db.refresh(settings) + + # Trigger Token Refresh if Creds changed (Async Background task ideally) + # await kis_auth.get_access_token(db) + + return settings diff --git a/backend/app/core/config.py b/backend/app/core/config.py new file mode 100644 index 0000000..2f6e3b0 --- /dev/null +++ b/backend/app/core/config.py @@ -0,0 +1,55 @@ +import os +from typing import List, Union +from pydantic import AnyHttpUrl, field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict + +class Settings(BaseSettings): + PROJECT_NAME: str = "BatchuKis Backend" + API_V1_STR: str = "/api" + + # Server Config + PORT: int = 80 + HOST: str = "0.0.0.0" + + # Security: CORS & Allowed Hosts + # In production, this should be set to ["kis.tindevil.com"] + ALLOWED_HOSTS: List[str] = ["localhost", "127.0.0.1", "kis.tindevil.com"] + + # CORS Origins + BACKEND_CORS_ORIGINS: List[Union[str, AnyHttpUrl]] = [ + "http://localhost", + "http://localhost:3000", + "https://kis.tindevil.com", + ] + + # Database + # Using aiosqlite for async SQLite + DATABASE_URL: str = "sqlite+aiosqlite:///./kis_stock.db" + + # Timezone + TIMEZONE: str = "Asia/Seoul" + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=True, + extra="ignore" + ) + + @field_validator("ALLOWED_HOSTS", mode="before") + def assemble_allowed_hosts(cls, v: Union[str, List[str]]) -> List[str]: + if isinstance(v, str) and not v.startswith("["): + return [i.strip() for i in v.split(",")] + elif isinstance(v, (list, str)): + return v + raise ValueError(v) + + @field_validator("BACKEND_CORS_ORIGINS", mode="before") + def assemble_cors_origins(cls, v: Union[str, List[str]]) -> List[Union[str, AnyHttpUrl]]: + if isinstance(v, str) and not v.startswith("["): + return [i.strip() for i in v.split(",")] + elif isinstance(v, (list, str)): + return v + raise ValueError(v) + +settings = Settings() diff --git a/backend/app/core/crypto.py b/backend/app/core/crypto.py new file mode 100644 index 0000000..b2cbb82 --- /dev/null +++ b/backend/app/core/crypto.py @@ -0,0 +1,19 @@ +from Crypto.Cipher import AES +from Crypto.Util.Padding import unpad +from base64 import b64decode + +def aes_cbc_base64_dec(key: str, iv: str, cipher_text: str) -> str: + """ + Decrypts KIS WebSocket data using AES-256-CBC. + adapted from KIS official sample. + """ + if not key or not iv: + raise ValueError("Key and IV are required for decryption") + + # Key and IV are assumed to be utf-8 strings + cipher = AES.new(key.encode("utf-8"), AES.MODE_CBC, iv.encode("utf-8")) + + # Decrypt and unpad + decrypted_bytes = unpad(cipher.decrypt(b64decode(cipher_text)), AES.block_size) + + return bytes.decode(decrypted_bytes, 'utf-8') diff --git a/backend/app/core/market_schedule.py b/backend/app/core/market_schedule.py new file mode 100644 index 0000000..df43cb8 --- /dev/null +++ b/backend/app/core/market_schedule.py @@ -0,0 +1,55 @@ +from datetime import datetime, time +import pytz + +KST = pytz.timezone("Asia/Seoul") +US_EASTERN = pytz.timezone("US/Eastern") + +class MarketSchedule: + """ + Checks if the market is open based on current time and market type. + """ + + @staticmethod + def is_market_open(market: str) -> bool: + """ + :param market: 'Domestic' or 'Overseas' + """ + if market == "Domestic": + return MarketSchedule._is_domestic_open() + elif market == "Overseas": + return MarketSchedule._is_overseas_open() + return False + + @staticmethod + def _is_domestic_open() -> bool: + now = datetime.now(KST) + + # 1. Weekend Check (0=Mon, 4=Fri, 5=Sat, 6=Sun) + if now.weekday() >= 5: + return False + + # 2. Time Check (09:00 ~ 15:30) + current_time = now.time() + start = time(9, 0) + end = time(15, 30) + + return start <= current_time <= end + + @staticmethod + def _is_overseas_open() -> bool: + # US Market: 09:30 ~ 16:00 (US Eastern Time) + # pytz handles DST automatically for US/Eastern + now = datetime.now(US_EASTERN) + + # 1. Weekend Check + if now.weekday() >= 5: + return False + + # 2. Time Check + current_time = now.time() + start = time(9, 30) + end = time(16, 0) + + return start <= current_time <= end + +market_schedule = MarketSchedule() diff --git a/backend/app/core/rate_limiter.py b/backend/app/core/rate_limiter.py new file mode 100644 index 0000000..d3d7b0a --- /dev/null +++ b/backend/app/core/rate_limiter.py @@ -0,0 +1,35 @@ +import asyncio +import time + +class RateLimiter: + """ + Centralized Request Queue that enforces a physical delay between API calls. + Default delay is 250ms (4 requests per second). + """ + def __init__(self): + self._lock = asyncio.Lock() + self._last_call_time = 0 + self._delay = 0.25 # seconds (250ms) + + async def wait(self): + """ + Acquire lock and sleep if necessary to respect the rate limit. + """ + async with self._lock: + now = time.monotonic() + elapsed = now - self._last_call_time + + if elapsed < self._delay: + sleep_time = self._delay - elapsed + await asyncio.sleep(sleep_time) + + self._last_call_time = time.monotonic() + + def set_delay(self, ms: int): + """ + Update the delay interval dynamically from DB settings. + """ + self._delay = ms / 1000.0 + +# Singleton instance +global_rate_limiter = RateLimiter() diff --git a/backend/app/db/database.py b/backend/app/db/database.py new file mode 100644 index 0000000..4bde553 --- /dev/null +++ b/backend/app/db/database.py @@ -0,0 +1,28 @@ +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession +from sqlalchemy.orm import DeclarativeBase +from app.core.config import settings + +# 1. Async Engine +# "check_same_thread": False is required for SQLite +engine = create_async_engine( + settings.DATABASE_URL, + echo=False, + connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {} +) + +# 2. Async Session Factory +SessionLocal = async_sessionmaker( + bind=engine, + class_=AsyncSession, + expire_on_commit=False, + autoflush=False +) + +# 3. Base Model +class Base(DeclarativeBase): + pass + +# 4. Dependency Injection for FastAPI +async def get_db(): + async with SessionLocal() as session: + yield session diff --git a/backend/app/db/init_db.py b/backend/app/db/init_db.py new file mode 100644 index 0000000..cc10410 --- /dev/null +++ b/backend/app/db/init_db.py @@ -0,0 +1,24 @@ +from sqlalchemy import select +from app.db.database import engine, Base, SessionLocal +from app.db.models import ApiSettings +# Must import all models to ensure they are registered in Base.metadata +from app.db import models + +async def init_db(): + async with engine.begin() as conn: + # Create all tables + await conn.run_sync(Base.metadata.create_all) + + # Seed Data + async with SessionLocal() as session: + # Check if ApiSettings(id=1) exists + stmt = select(ApiSettings).where(ApiSettings.id == 1) + result = await session.execute(stmt) + settings_entry = result.scalar_one_or_none() + + if not settings_entry: + # Create default settings + default_settings = ApiSettings(id=1) + session.add(default_settings) + await session.commit() + print("Initialized default ApiSettings(id=1)") diff --git a/backend/app/db/models.py b/backend/app/db/models.py new file mode 100644 index 0000000..8a31fa4 --- /dev/null +++ b/backend/app/db/models.py @@ -0,0 +1,222 @@ +from datetime import datetime +from typing import List, Optional +from sqlalchemy import Integer, String, Boolean, Float, DateTime, ForeignKey, Text, JSON +from sqlalchemy.orm import Mapped, mapped_column, relationship +from app.db.database import Base + +# ----------------- +# 1. System & Config +# ----------------- + +class AiConfig(Base): + __tablename__ = "ai_configs" + + id: Mapped[str] = mapped_column(String, primary_key=True) + name: Mapped[str] = mapped_column(String) + providerType: Mapped[str] = mapped_column(String) # Gemini, Ollama, OpenAI + modelName: Mapped[str] = mapped_column(String) + baseUrl: Mapped[Optional[str]] = mapped_column(String, nullable=True) + +class ApiSettings(Base): + __tablename__ = "api_settings" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, default=1) # Always 1 + + # Credentials + appKey: Mapped[Optional[str]] = mapped_column(String, nullable=True) + appSecret: Mapped[Optional[str]] = mapped_column(String, nullable=True) + accountNumber: Mapped[Optional[str]] = mapped_column(String, nullable=True) + + # Integrations + useTelegram: Mapped[bool] = mapped_column(Boolean, default=False) + telegramToken: Mapped[Optional[str]] = mapped_column(String, nullable=True) + telegramChatId: Mapped[Optional[str]] = mapped_column(String, nullable=True) + + useNaverNews: Mapped[bool] = mapped_column(Boolean, default=False) + naverClientId: Mapped[Optional[str]] = mapped_column(String, nullable=True) + naverClientSecret: Mapped[Optional[str]] = mapped_column(String, nullable=True) + + # Configs + kisApiDelayMs: Mapped[int] = mapped_column(Integer, default=250) + newsScrapIntervalMin: Mapped[int] = mapped_column(Integer, default=10) + + # Token Storage (Runtime) + accessToken: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + tokenExpiry: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + websocketApprovalKey: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + + # AI Config Relations (Foreign Keys) + preferredNewsAiId: Mapped[Optional[str]] = mapped_column(ForeignKey("ai_configs.id"), nullable=True) + preferredStockAiId: Mapped[Optional[str]] = mapped_column(ForeignKey("ai_configs.id"), nullable=True) + preferredNewsJudgementAiId: Mapped[Optional[str]] = mapped_column(ForeignKey("ai_configs.id"), nullable=True) + preferredAutoBuyAiId: Mapped[Optional[str]] = mapped_column(ForeignKey("ai_configs.id"), nullable=True) + preferredAutoSellAiId: Mapped[Optional[str]] = mapped_column(ForeignKey("ai_configs.id"), nullable=True) + +# ----------------- +# 2. Account & Portfolio +# ----------------- + +class AccountStatus(Base): + __tablename__ = "account_status" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, default=1) + totalAssets: Mapped[float] = mapped_column(Float, default=0.0) + buyingPower: Mapped[float] = mapped_column(Float, default=0.0) + dailyProfit: Mapped[float] = mapped_column(Float, default=0.0) + dailyProfitRate: Mapped[float] = mapped_column(Float, default=0.0) + +class Holding(Base): + __tablename__ = "holdings" + + stockCode: Mapped[str] = mapped_column(String, primary_key=True) + stockName: Mapped[str] = mapped_column(String) + quantity: Mapped[int] = mapped_column(Integer) + avgPrice: Mapped[float] = mapped_column(Float) + currentPrice: Mapped[float] = mapped_column(Float) # Real-time updated + profit: Mapped[float] = mapped_column(Float) + profitRate: Mapped[float] = mapped_column(Float) + marketValue: Mapped[float] = mapped_column(Float) + +# ----------------- +# 3. Market & Discovery +# ----------------- + +class MasterStock(Base): + __tablename__ = "master_stocks" + + code: Mapped[str] = mapped_column(String, primary_key=True) + name: Mapped[str] = mapped_column(String) + market: Mapped[str] = mapped_column(String) # Domestic, Overseas + + # Stats + per: Mapped[float] = mapped_column(Float, default=0.0) + pbr: Mapped[float] = mapped_column(Float, default=0.0) + roe: Mapped[float] = mapped_column(Float, default=0.0) + marketCap: Mapped[float] = mapped_column(Float, default=0.0) + dividendYield: Mapped[float] = mapped_column(Float, default=0.0) + + # User Data + memo: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + isHidden: Mapped[bool] = mapped_column(Boolean, default=False) + +class NewsCache(Base): + __tablename__ = "news_cache" + + news_id: Mapped[str] = mapped_column(String, primary_key=True) # Hashed ID + title: Mapped[str] = mapped_column(Text) + description: Mapped[str] = mapped_column(Text) + link: Mapped[str] = mapped_column(Text) + pubDate: Mapped[str] = mapped_column(String) + + sentiment: Mapped[Optional[str]] = mapped_column(String, nullable=True) + relatedThemes: Mapped[Optional[List]] = mapped_column(JSON, nullable=True) + relatedStocks: Mapped[Optional[List]] = mapped_column(JSON, nullable=True) + +class DiscoveryRankingCache(Base): + __tablename__ = "discovery_ranking_cache" + + # Composite Key simulated (category_market string or separate cols) + # Using composite PK + category: Mapped[str] = mapped_column(String, primary_key=True) + market: Mapped[str] = mapped_column(String, primary_key=True) + + updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now) + items_json: Mapped[str] = mapped_column(Text) # JSON String of StockItem[] + +class StockStat(Base): + __tablename__ = "stock_stats" + + code: Mapped[str] = mapped_column(String, primary_key=True) + tradingValue: Mapped[float] = mapped_column(Float, default=0.0) + + buyRatio: Mapped[int] = mapped_column(Integer, default=0) + sellRatio: Mapped[int] = mapped_column(Integer, default=0) + foreignNetBuy: Mapped[int] = mapped_column(Integer, default=0) + institutionalNetBuy: Mapped[int] = mapped_column(Integer, default=0) + + aiScoreBuy: Mapped[int] = mapped_column(Integer, default=0) + aiScoreSell: Mapped[int] = mapped_column(Integer, default=0) + +# ----------------- +# 4. Watchlist +# ----------------- + +class WatchlistGroup(Base): + __tablename__ = "watchlist_groups" + + id: Mapped[str] = mapped_column(String, primary_key=True) + name: Mapped[str] = mapped_column(String) + market: Mapped[str] = mapped_column(String) # Domestic, Overseas + +class WatchlistItem(Base): + __tablename__ = "watchlist_items" + + group_id: Mapped[str] = mapped_column(ForeignKey("watchlist_groups.id"), primary_key=True) + stock_code: Mapped[str] = mapped_column(String, primary_key=True) + added_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now) + +# ----------------- +# 5. Trading & Automation +# ----------------- + +class TradeHistory(Base): + __tablename__ = "trade_history" + + id: Mapped[str] = mapped_column(String, primary_key=True) + stockCode: Mapped[str] = mapped_column(String) + stockName: Mapped[str] = mapped_column(String) + type: Mapped[str] = mapped_column(String) # BUY, SELL + + quantity: Mapped[int] = mapped_column(Integer) + price: Mapped[float] = mapped_column(Float) + + timestamp: Mapped[datetime] = mapped_column(DateTime) + status: Mapped[str] = mapped_column(String) # FILLED, CANCELLED + +class AutoTradeRobot(Base): + __tablename__ = "auto_trade_robots" + + id: Mapped[str] = mapped_column(String, primary_key=True) + stockCode: Mapped[str] = mapped_column(String) + stockName: Mapped[str] = mapped_column(String) + groupId: Mapped[Optional[str]] = mapped_column(String, nullable=True) + + type: Mapped[str] = mapped_column(String) # ACCUMULATION, TRAILING, etc. + frequency: Mapped[str] = mapped_column(String) # DAILY, WEEKLY + executionTime: Mapped[str] = mapped_column(String) # HH:MM + market: Mapped[str] = mapped_column(String) + + quantity: Mapped[int] = mapped_column(Integer) + specificDay: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) # 0=Monday + trailingPercent: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + + active: Mapped[bool] = mapped_column(Boolean, default=True) + +class ReservedOrder(Base): + __tablename__ = "reserved_orders" + + id: Mapped[str] = mapped_column(String, primary_key=True) + stockCode: Mapped[str] = mapped_column(String) + stockName: Mapped[str] = mapped_column(String) + type: Mapped[str] = mapped_column(String) # BUY, SELL + market: Mapped[str] = mapped_column(String) + + monitoringType: Mapped[str] = mapped_column(String) # TARGET, TRAILING + trailingType: Mapped[Optional[str]] = mapped_column(String, nullable=True) # AMOUNT, PERCENT + status: Mapped[str] = mapped_column(String) # MONITORING, TRIGGERED, EXPIRED + + quantity: Mapped[int] = mapped_column(Integer) + + triggerPrice: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + trailingValue: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + stopLossValue: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + + highestPrice: Mapped[Optional[float]] = mapped_column(Float, nullable=True) # For Trailing + lowestPrice: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + + useStopLoss: Mapped[bool] = mapped_column(Boolean, default=False) + sellAll: Mapped[bool] = mapped_column(Boolean, default=False) + stopLossType: Mapped[Optional[str]] = mapped_column(String, nullable=True) + + createdAt: Mapped[datetime] = mapped_column(DateTime, default=datetime.now) + expiryDate: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) diff --git a/backend/app/main.py b/backend/app/main.py new file mode 100644 index 0000000..3e666a8 --- /dev/null +++ b/backend/app/main.py @@ -0,0 +1,62 @@ +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.middleware.trustedhost import TrustedHostMiddleware +from fastapi.staticfiles import StaticFiles +from pathlib import Path +from contextlib import asynccontextmanager + +from app.core.config import settings +from app.db.init_db import init_db +from app.workers.scheduler import start_scheduler + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup: Initialize DB + await init_db() + start_scheduler() + print("Database & Scheduler Initialized") + yield + # Shutdown: Cleanup if needed + +app = FastAPI( + title=settings.PROJECT_NAME, + openapi_url=f"{settings.API_V1_STR}/openapi.json", + docs_url=f"{settings.API_V1_STR}/docs", + redoc_url=f"{settings.API_V1_STR}/redoc", + lifespan=lifespan +) + +# 1. Security: Trusted Host Middleware +app.add_middleware( + TrustedHostMiddleware, + allowed_hosts=settings.ALLOWED_HOSTS +) + +# 2. CORS Middleware +if settings.BACKEND_CORS_ORIGINS: + app.add_middleware( + CORSMiddleware, + allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + +# 3. Health Check +@app.get("/health") +def health_check(): + return {"status": "ok", "app": settings.PROJECT_NAME} + +from app.api.api import api_router + +# 4. API Router +app.include_router(api_router, prefix=settings.API_V1_STR) + +# 5. Static Files (Frontend) +BASE_DIR = Path(__file__).resolve().parent.parent +STATIC_DIR = BASE_DIR / "static" + +if STATIC_DIR.exists(): + app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static") +else: + print(f"Warning: Static directory not found at {STATIC_DIR}") diff --git a/backend/app/services/ai_factory.py b/backend/app/services/ai_factory.py new file mode 100644 index 0000000..e537f5c --- /dev/null +++ b/backend/app/services/ai_factory.py @@ -0,0 +1,60 @@ +from typing import Optional, Dict, Any +from abc import ABC, abstractmethod +import httpx + +class BaseAIProvider(ABC): + def __init__(self, api_key: str, model_name: str, base_url: str = None): + self.api_key = api_key + self.model_name = model_name + self.base_url = base_url + + @abstractmethod + async def generate_content(self, prompt: str, system_instruction: str = None) -> str: + pass + +class GeminiProvider(BaseAIProvider): + async def generate_content(self, prompt: str, system_instruction: str = None) -> str: + # Placeholder for Gemini API Implementation + # https://generativelanguage.googleapis.com/v1beta/models/... + return f"Gemini Response to: {prompt}" + +class OpenAIProvider(BaseAIProvider): + async def generate_content(self, prompt: str, system_instruction: str = None) -> str: + # Placeholder for OpenAI API + return f"OpenAI Response to: {prompt}" + +class OllamaProvider(BaseAIProvider): + """ + Ollama (Local LLM), compatible with OpenAI client usually, or direct /api/generate + """ + async def generate_content(self, prompt: str, system_instruction: str = None) -> str: + # Placeholder for Ollama API + url = f"{self.base_url}/api/generate" + payload = { + "model": self.model_name, + "prompt": prompt, + "stream": False + } + if system_instruction: + payload["system"] = system_instruction + + try: + async with httpx.AsyncClient() as client: + resp = await client.post(url, json=payload, timeout=60.0) + resp.raise_for_status() + data = resp.json() + return data.get("response", "") + except Exception as e: + return f"Error: {e}" + +class AIFactory: + @staticmethod + def get_provider(provider_type: str, api_key: str, model_name: str, base_url: str = None) -> BaseAIProvider: + if provider_type.lower() == "gemini": + return GeminiProvider(api_key, model_name, base_url) + elif provider_type.lower() == "openai": + return OpenAIProvider(api_key, model_name, base_url) + elif provider_type.lower() == "ollama": + return OllamaProvider(api_key, model_name, base_url) + else: + raise ValueError(f"Unknown Provider: {provider_type}") diff --git a/backend/app/services/ai_orchestrator.py b/backend/app/services/ai_orchestrator.py new file mode 100644 index 0000000..e39a3d6 --- /dev/null +++ b/backend/app/services/ai_orchestrator.py @@ -0,0 +1,55 @@ +from sqlalchemy import select +from app.db.database import SessionLocal +from app.db.models import AiConfig, ApiSettings +from app.services.ai_factory import AIFactory, BaseAIProvider + +class AIOrchestrator: + def __init__(self): + pass + + async def _get_provider_by_id(self, config_id: str) -> BaseAIProvider: + async with SessionLocal() as session: + stmt = select(AiConfig).where(AiConfig.id == config_id) + result = await session.execute(stmt) + config = result.scalar_one_or_none() + + if not config: + raise ValueError("AI Config not found") + + # Note: API Keys might need to be stored securely or passed from ENV/Settings. + # For now assuming API Key is managed externally or stored in config (not implemented in DB schema for security). + # Or we look up ApiSettings or a secure vault. + # Simplified: Use a placeholder or ENV. + api_key = "place_holder" + + return AIFactory.get_provider(config.providerType, api_key, config.modelName, config.baseUrl) + + async def get_preferred_provider(self, purpose: str) -> BaseAIProvider: + """ + purpose: 'news', 'stock', 'judgement', 'buy', 'sell' + """ + async with SessionLocal() as session: + stmt = select(ApiSettings).where(ApiSettings.id == 1) + result = await session.execute(stmt) + settings = result.scalar_one_or_none() + + if not settings: + raise ValueError("Settings not initialized") + + config_id = None + if purpose == 'news': config_id = settings.preferredNewsAiId + elif purpose == 'stock': config_id = settings.preferredStockAiId + elif purpose == 'judgement': config_id = settings.preferredNewsJudgementAiId + elif purpose == 'buy': config_id = settings.preferredAutoBuyAiId + elif purpose == 'sell': config_id = settings.preferredAutoSellAiId + + if not config_id: + raise ValueError(f"No preferred AI configured for {purpose}") + + return await self._get_provider_by_id(config_id) + + async def analyze_text(self, text: str, purpose="news") -> str: + provider = await self.get_preferred_provider(purpose) + return await provider.generate_content(text) + +ai_orchestrator = AIOrchestrator() diff --git a/backend/app/services/kis_auth.py b/backend/app/services/kis_auth.py new file mode 100644 index 0000000..62396f5 --- /dev/null +++ b/backend/app/services/kis_auth.py @@ -0,0 +1,117 @@ +import httpx +from datetime import datetime, timedelta +from sqlalchemy import select +from app.db.database import SessionLocal +from app.db.models import ApiSettings + +class KisAuth: + BASE_URL_REAL = "https://openapi.koreainvestment.com:9443" + # BASE_URL_VIRTUAL = "https://openapivts.koreainvestment.com:29443" + + def __init__(self): + pass + + async def get_access_token(self, db_session=None) -> str: + """ + Returns valid access token. Issues new one if expired or missing. + """ + local_session = False + if not db_session: + db_session = SessionLocal() + local_session = True + + try: + # 1. Get Settings + stmt = select(ApiSettings).where(ApiSettings.id == 1) + result = await db_session.execute(stmt) + settings_obj = result.scalar_one_or_none() + + if not settings_obj or not settings_obj.appKey or not settings_obj.appSecret: + raise ValueError("KIS API Credentials not configured.") + + # 2. Check Expiry (Buffer 10 mins) + if settings_obj.accessToken and settings_obj.tokenExpiry: + if settings_obj.tokenExpiry > datetime.now() + timedelta(minutes=10): + return settings_obj.accessToken + + # 3. Issue New Token + token_data = await self._issue_token(settings_obj.appKey, settings_obj.appSecret) + + # 4. Save to DB + settings_obj.accessToken = token_data['access_token'] + # expires_in is seconds (usually 86400) + settings_obj.tokenExpiry = datetime.now() + timedelta(seconds=int(token_data['expires_in'])) + + await db_session.commit() + return settings_obj.accessToken + + except Exception as e: + await db_session.rollback() + raise e + finally: + if local_session: + await db_session.close() + + async def _issue_token(self, app_key: str, app_secret: str) -> dict: + url = f"{self.BASE_URL_REAL}/oauth2/tokenP" + payload = { + "grant_type": "client_credentials", + "appkey": app_key, + "appsecret": app_secret + } + + async with httpx.AsyncClient() as client: + resp = await client.post(url, json=payload, headers={"Content-Type": "application/json"}) + resp.raise_for_status() + return resp.json() + + async def get_approval_key(self, db_session=None) -> str: + """ + Returns WebSocket Approval Key. Issues new one if missing. + """ + local_session = False + if not db_session: + db_session = SessionLocal() + local_session = True + + try: + stmt = select(ApiSettings).where(ApiSettings.id == 1) + result = await db_session.execute(stmt) + settings_obj = result.scalar_one_or_none() + + if not settings_obj or not settings_obj.appKey or not settings_obj.appSecret: + raise ValueError("KIS API Credentials not configured.") + + if settings_obj.websocketApprovalKey: + return settings_obj.websocketApprovalKey + + # Issue New Key + approval_key = await self._issue_approval_key(settings_obj.appKey, settings_obj.appSecret) + + settings_obj.websocketApprovalKey = approval_key + await db_session.commit() + + return approval_key + + except Exception as e: + await db_session.rollback() + raise e + finally: + if local_session: + await db_session.close() + + async def _issue_approval_key(self, app_key: str, app_secret: str) -> str: + url = f"{self.BASE_URL_REAL}/oauth2/Approval" + payload = { + "grant_type": "client_credentials", + "appkey": app_key, + "secretkey": app_secret # Note: Parameter name difference + } + + async with httpx.AsyncClient() as client: + resp = await client.post(url, json=payload, headers={"Content-Type": "application/json"}) + resp.raise_for_status() + data = resp.json() + return data['approval_key'] + +kis_auth = KisAuth() diff --git a/backend/app/services/kis_client.py b/backend/app/services/kis_client.py new file mode 100644 index 0000000..a0253a7 --- /dev/null +++ b/backend/app/services/kis_client.py @@ -0,0 +1,197 @@ +import httpx +from typing import Dict, Optional, Any +from app.services.kis_auth import kis_auth +from app.core.rate_limiter import global_rate_limiter +from app.db.database import SessionLocal +from app.db.models import ApiSettings +from sqlalchemy import select + +class KisClient: + """ + Brokerage Service Interface for KIS API. + Implements Section 9 Integration Map. + """ + + # Domestic URLs + URL_DOMESTIC_ORDER = "/uapi/domestic-stock/v1/trading/order-cash" + URL_DOMESTIC_PRICE = "/uapi/domestic-stock/v1/quotations/inquire-price" + URL_DOMESTIC_BALANCE = "/uapi/domestic-stock/v1/trading/inquire-balance" + + # Overseas URLs + URL_OVERSEAS_ORDER = "/uapi/overseas-stock/v1/trading/order" + URL_OVERSEAS_PRICE = "/uapi/overseas-price/v1/quotations/price" + URL_OVERSEAS_BALANCE = "/uapi/overseas-stock/v1/trading/inquire-balance" + + def __init__(self): + pass + + async def _get_settings(self): + async with SessionLocal() as session: + stmt = select(ApiSettings).where(ApiSettings.id == 1) + result = await session.execute(stmt) + return result.scalar_one_or_none() + + async def _call_api(self, method: str, url_path: str, tr_id: str, params: Dict = None, data: Dict = None) -> Dict: + """ + Common API Caller with Rate Limiting and Auth. + """ + # 1. Rate Limit + await global_rate_limiter.wait() + + # 2. Get Token & Base URL + # Assuming Real Environment for now. TODO: Support Virtual + base_url = kis_auth.BASE_URL_REAL + token = await kis_auth.get_access_token() + settings = await self._get_settings() + + if not settings: + raise ValueError("Settings not found") + + # 3. Headers + headers = { + "Content-Type": "application/json", + "authorization": f"Bearer {token}", + "appkey": settings.appKey, + "appsecret": settings.appSecret, + "tr_id": tr_id, + "tr_cont": "", + "custtype": "P" + } + + full_url = f"{base_url}{url_path}" + + async with httpx.AsyncClient() as client: + if method == "GET": + resp = await client.get(full_url, headers=headers, params=params) + else: + resp = await client.post(full_url, headers=headers, json=data) + + # TODO: Handle 401 Re-Auth Logic here (catch, clear token, retry) + + resp.raise_for_status() + return resp.json() + + # ----------------------------- + # 1. Current Price + # ----------------------------- + async def get_current_price(self, market: str, code: str) -> float: + if market == "Domestic": + # TR_ID: FHKST01010100 + params = { + "FID_COND_MRKT_DIV_CODE": "J", + "FID_INPUT_ISCD": code + } + res = await self._call_api("GET", self.URL_DOMESTIC_PRICE, "FHKST01010100", params=params) + return float(res['output']['stck_prpr']) + + elif market == "Overseas": + # TR_ID: HHDFS00000300 + # Need Exchange Code (e.g., NASD). Assuming NASD for generic US or mapped. + # Ideally code should be 'NASD:AAPL' or separate excg param. + # For now assuming 'NASD' if not provided implicitly. + # Or code is just 'AAPL'. + excg = "NASD" # Default + params = { + "AUTH": "", + "EXCD": excg, + "SYMB": code + } + res = await self._call_api("GET", self.URL_OVERSEAS_PRICE, "HHDFS00000300", params=params) + return float(res['output']['last']) + + return 0.0 + + # ----------------------------- + # 2. Balance + # ----------------------------- + async def get_balance(self, market: str) -> Dict: + settings = await self._get_settings() + acc_no = settings.accountNumber + # acc_no is 8 digits. Split? "500xxx-01" -> 500xxx, 01 + if '-' in acc_no: + cano, prdt = acc_no.split('-') + else: + cano = acc_no[:8] + prdt = acc_no[8:] + + if market == "Domestic": + # TR_ID: TTTC8434R (Real) + params = { + "CANO": cano, + "ACNT_PRDT_CD": prdt, + "AFHR_FLPR_YN": "N", + "OFL_YN": "", + "INQR_DVSN": "02", + "UNPR_DVSN": "01", + "FUND_STTL_ICLD_YN": "N", + "FNCG_AMT_AUTO_RDPT_YN": "N", + "PRCS_DVSN": "01", + "CTX_AREA_FK100": "", + "CTX_AREA_NK100": "" + } + res = await self._call_api("GET", self.URL_DOMESTIC_BALANCE, "TTTC8434R", params=params) + return res + + elif market == "Overseas": + # TR_ID: TTTS3012R (Real) + params = { + "CANO": cano, + "ACNT_PRDT_CD": prdt, + "OVRS_EXCG_CD": "NASD", # Default + "TR_CRCY_CD": "USD", + "CTX_AREA_FK200": "", + "CTX_AREA_NK200": "" + } + res = await self._call_api("GET", self.URL_OVERSEAS_BALANCE, "TTTS3012R", params=params) + return res + + return {} + + # ----------------------------- + # 3. Order + # ----------------------------- + async def place_order(self, market: str, side: str, code: str, quantity: int, price: float) -> Dict: + """ + side: 'buy' or 'sell' + price: 0 for Market? KIS logic varies. + """ + settings = await self._get_settings() + if '-' in settings.accountNumber: + cano, prdt = settings.accountNumber.split('-') + else: + cano = settings.accountNumber[:8] + prdt = settings.accountNumber[8:] + + if market == "Domestic": + # TR_ID: TTT 0802U (Buy), 0801U (Sell) -> using sample 0012U/0011U + # 0012U (Buy), 0011U (Sell) + tr_id = "TTTC0012U" if side == "buy" else "TTTC0011U" + + data = { + "CANO": cano, + "ACNT_PRDT_CD": prdt, + "PDNO": code, + "ORD_DVSN": "00", # Limit (00). 01=Market + "ORD_QTY": str(quantity), + "ORD_UNPR": str(int(price)), # Cash Order requires integer price string + } + return await self._call_api("POST", self.URL_DOMESTIC_ORDER, tr_id, data=data) + + elif market == "Overseas": + # TR_ID: TTTT1002U (US Buy), TTTT1006U (US Sell) + # Assuming US (NASD) + tr_id = "TTTT1002U" if side == "buy" else "TTTT1006U" + + data = { + "CANO": cano, + "ACNT_PRDT_CD": prdt, + "OVRS_EXCG_CD": "NASD", + "PDNO": code, + "ORD_QTY": str(quantity), + "OVRS_ORD_UNPR": str(price), + "ORD_SVR_DVSN_CD": "0", + "ORD_DVSN": "00" # Limit + } + return await self._call_api("POST", self.URL_OVERSEAS_ORDER, tr_id, data=data) + +kis_client = KisClient() diff --git a/backend/app/services/realtime_manager.py b/backend/app/services/realtime_manager.py new file mode 100644 index 0000000..6ee075c --- /dev/null +++ b/backend/app/services/realtime_manager.py @@ -0,0 +1,154 @@ +import asyncio +import json +import websockets +import logging +from typing import Dict, Set, Callable, Optional + +from app.services.kis_auth import kis_auth +from app.core.crypto import aes_cbc_base64_dec +from app.db.database import SessionLocal +# from app.db.crud import update_stock_price # TODO: Implement CRUD + +logger = logging.getLogger(__name__) + +class RealtimeManager: + """ + Manages KIS WebSocket Connection. + Handles: Connection, Subscription, Decryption, PINGPONG. + """ + WS_URL_REAL = "ws://ops.koreainvestment.com:21000" + + def __init__(self): + self.ws: Optional[websockets.WebSocketClientProtocol] = None + self.approval_key: Optional[str] = None + self.subscribed_codes: Set[str] = set() + self.running = False + self.data_map: Dict[str, Dict] = {} # Store IV/Key for encrypted TRs + + async def start(self): + """ + Main loop: Connect -> Authenticate -> Listen + """ + self.running = True + while self.running: + try: + # 1. Get Approval Key + self.approval_key = await kis_auth.get_approval_key() + + logger.info(f"Connecting to KIS WS: {self.WS_URL_REAL}") + async with websockets.connect(self.WS_URL_REAL, ping_interval=None) as websocket: + self.ws = websocket + logger.info("Connected.") + + # 2. Resubscribe if recovering connection + if self.subscribed_codes: + await self._resubscribe_all() + + # 3. Listen Loop + await self._listen() + + except Exception as e: + logger.error(f"WS Connection Error: {e}. Retrying in 5s...") + await asyncio.sleep(5) + + async def stop(self): + self.running = False + if self.ws: + await self.ws.close() + + async def subscribe(self, stock_code: str, type="price"): + """ + Subscribe to a stock. + type: 'price' (H0STCNT0 - 체결가) + """ + if not self.ws or not self.approval_key: + logger.warning("WS not ready. Adding to pending list.") + self.subscribed_codes.add(stock_code) + return + + # Domestic Realtime Price TR ID: H0STCNT0 + tr_id = "H0STCNT0" + tr_key = stock_code + + payload = { + "header": { + "approval_key": self.approval_key, + "custtype": "P", + "tr_type": "1", # 1=Register, 2=Unregister + "content-type": "utf-8" + }, + "body": { + "input": { + "tr_id": tr_id, + "tr_key": tr_key + } + } + } + + await self.ws.send(json.dumps(payload)) + self.subscribed_codes.add(stock_code) + logger.info(f"Subscribed to {stock_code}") + + async def _resubscribe_all(self): + for code in self.subscribed_codes: + await self.subscribe(code) + + async def _listen(self): + async for message in self.ws: + try: + # Message can be plain text provided by library, or bytes + if isinstance(message, bytes): + message = message.decode('utf-8') + + # KIS sends data in specific formats. + # 1. JSON (Control Messages, PINGPONG, Subscription Ack) + # 2. Text/Pipe separated (Real Data) - Usually starts with 0 or 1 + + first_char = message[0] + + if first_char in ['{', '[']: + # JSON Message + data = json.loads(message) + header = data.get('header', {}) + tr_id = header.get('tr_id') + + if tr_id == "PINGPONG": + await self.ws.send(message) # Echo back + logger.debug("PINGPONG handled") + + elif 'body' in data: + # Subscription Ack + # Store IV/Key if encryption is enabled (msg1 often contains 'ENCRYPT') + # But for Brokerage API, H0STCNT0 is usually plaintext unless configured otherwise. + # If encrypted, 'iv' and 'key' are in body['output'] + pass + + elif first_char in ['0', '1']: + # Real Data: 0|TR_ID|DATA_CNT|DATA... + parts = message.split('|') + if len(parts) < 4: + continue + + tr_id = parts[1] + raw_data = parts[3] + + # Decryption Check + # If this tr_id was registered as encrypted, decrypt it. + # For now assuming Plaintext for H0STCNT0 as per standard Personal API. + + # Parse Data + if tr_id == "H0STCNT0": # Domestic Price + # Data format: TIME^PRICE^... + # We need to look up format spec. + # Simple implementation: just log or split + fields = raw_data.split('^') + if len(fields) > 2: + current_price = fields[2] # Example index + # TODO: Update DB + # print(f"Price Update: {current_price}") + pass + + except Exception as e: + logger.error(f"Error processing WS message: {e}") + +realtime_manager = RealtimeManager() diff --git a/backend/app/workers/scheduler.py b/backend/app/workers/scheduler.py new file mode 100644 index 0000000..74ea6a8 --- /dev/null +++ b/backend/app/workers/scheduler.py @@ -0,0 +1,43 @@ +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from app.core.market_schedule import market_schedule +from app.services.kis_client import kis_client +from app.services.realtime_manager import realtime_manager +import logging + +logger = logging.getLogger(__name__) + +scheduler = AsyncIOScheduler() + +async def market_check_job(): + """ + Periodic check to ensure Realtime Manager is connected when market is open. + """ + is_domestic_open = market_schedule.is_market_open("Domestic") + # is_overseas_open = market_schedule.is_market_open("Overseas") + + # If market is open and WS is not running, start it + if is_domestic_open and not realtime_manager.running: + logger.info("Market is Open! Starting Realtime Manager.") + # This is a blocking call if awaited directly in job? NO, start() has a loop. + # We should start it as a task if it's not running. + # But realtime_manager.start() is a loop. + # Better to have it managed by FastAPI startup, and this job just checks. + pass + +async def news_scrap_job(): + # Placeholder for News Scraper + # logger.info("Scraping Naver News...") + pass + +async def auto_trade_scan_job(): + # Placeholder for Auto Trading Scanner (Check Reserved Orders) + # logger.info("Scanning Reserved Orders...") + pass + +def start_scheduler(): + scheduler.add_job(market_check_job, 'interval', minutes=5) + scheduler.add_job(news_scrap_job, 'interval', minutes=10) + scheduler.add_job(auto_trade_scan_job, 'interval', minutes=1) + + scheduler.start() + logger.info("Scheduler Started.") diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..72d9610 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,11 @@ +fastapi>=0.100.0 +uvicorn[standard]>=0.20.0 +sqlalchemy>=2.0.0 +aiosqlite>=0.19.0 +pydantic-settings>=2.0.0 +httpx>=0.24.0 +websockets>=11.0 +pytz +python-multipart +pycryptodome +apscheduler