220 lines
9.4 KiB
Python
220 lines
9.4 KiB
Python
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from app.core.market_schedule import market_schedule
|
|
from app.services.kis_client import kis_client
|
|
from app.services.realtime_manager import realtime_manager
|
|
from app.db.database import SessionLocal
|
|
from app.core.startup import check_kis_connectivity, check_telegram_connectivity
|
|
from app.db.models import StockItem, ReservedOrder, ApiSettings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
scheduler = AsyncIOScheduler()
|
|
|
|
class ServiceState:
|
|
last_keys_hash: str = ""
|
|
telegram_active: bool = False
|
|
naver_active: bool = False
|
|
|
|
@classmethod
|
|
def get_keys_hash(cls, settings: ApiSettings):
|
|
return f"{settings.appKey}{settings.appSecret}{settings.accountNumber}"
|
|
|
|
async def service_watchdog_job():
|
|
"""
|
|
Periodically checks settings and initiates services if they were newly enabled or changed.
|
|
"""
|
|
async with SessionLocal() as db_session:
|
|
stmt = select(ApiSettings).where(ApiSettings.id == 1)
|
|
result = await db_session.execute(stmt)
|
|
settings_obj = result.scalar_one_or_none()
|
|
|
|
if not settings_obj: return
|
|
|
|
# 1. KIS Connectivity Watchdog
|
|
current_hash = ServiceState.get_keys_hash(settings_obj)
|
|
if current_hash != ServiceState.last_keys_hash and settings_obj.appKey:
|
|
logger.info("Watchdog: KIS Credentials Changed or Initialized. Re-authenticating...")
|
|
success = await check_kis_connectivity(db_session, settings_obj)
|
|
if success:
|
|
ServiceState.last_keys_hash = current_hash
|
|
|
|
# 2. Telegram Watchdog
|
|
if settings_obj.useTelegram and not ServiceState.telegram_active:
|
|
if settings_obj.telegramToken and settings_obj.telegramChatId:
|
|
logger.info("Watchdog: Telegram newly enabled. Sending activation message.")
|
|
from app.services.telegram_service import telegram_service
|
|
msg = "🔔 <b>알림 서비스가 활성화되었습니다.</b>\n이제부터 주요 거래 알림을 이곳으로 보내드립니다."
|
|
await telegram_service.send_message(settings_obj.telegramToken, settings_obj.telegramChatId, msg)
|
|
ServiceState.telegram_active = True
|
|
elif not settings_obj.useTelegram:
|
|
ServiceState.telegram_active = False
|
|
|
|
# 3. Naver News Watchdog
|
|
if settings_obj.useNaverNews and not ServiceState.naver_active:
|
|
if settings_obj.naverClientId and settings_obj.naverClientSecret:
|
|
logger.info("Watchdog: Naver News newly enabled. (Logic placeholder)")
|
|
ServiceState.naver_active = True
|
|
elif not settings_obj.useNaverNews:
|
|
ServiceState.naver_active = False
|
|
|
|
async def market_check_job():
|
|
"""
|
|
Periodic check to ensure Realtime Manager is connected when market is open.
|
|
"""
|
|
logger.info("Scheduler: Market Check Job Running...")
|
|
is_domestic_open = market_schedule.is_market_open("Domestic")
|
|
# is_overseas_open = market_schedule.is_market_open("Overseas")
|
|
|
|
# If market is open and WS is not running, start it
|
|
if is_domestic_open and not realtime_manager.running:
|
|
logger.info("Market is Open! Starting Realtime Manager.")
|
|
else:
|
|
logger.info(f"Market Status: Domestic={is_domestic_open}, RealtimeManager={realtime_manager.running}")
|
|
|
|
async def persist_market_data_job():
|
|
"""
|
|
Flushes realtime_manager.price_cache to DB (StockItem).
|
|
"""
|
|
if not realtime_manager.price_cache:
|
|
return
|
|
|
|
# Snapshot and clear (thread-safe ish in async single loop)
|
|
# Or just iterate. Python dict iteration is safe if not modifying keys.
|
|
# We'll take keys copy.
|
|
codes_to_process = list(realtime_manager.price_cache.keys())
|
|
|
|
if not codes_to_process:
|
|
return
|
|
|
|
async with SessionLocal() as session:
|
|
count = 0
|
|
for code in codes_to_process:
|
|
data = realtime_manager.price_cache.get(code)
|
|
if not data: continue
|
|
|
|
# Upsert StockItem?
|
|
# Or just update if exists. StockItem generally exists if synced.
|
|
# If not exists, we might create it.
|
|
|
|
price = float(data['price'])
|
|
change = float(data['change'])
|
|
rate = float(data['rate'])
|
|
|
|
# Simple Update
|
|
stmt = update(StockItem).where(StockItem.code == code).values(
|
|
price=price,
|
|
change=change,
|
|
changePercent=rate
|
|
)
|
|
await session.execute(stmt)
|
|
count += 1
|
|
|
|
await session.commit()
|
|
|
|
# logger.debug(f"Persisted {count} stock prices to DB.")
|
|
|
|
async def news_scrap_job():
|
|
# Placeholder for News Scraper
|
|
logger.info("Scheduler: Scraping Naver News (Placeholder)...")
|
|
pass
|
|
|
|
async def auto_trade_scan_job():
|
|
"""
|
|
Scans ReservedOrders and triggers actions if conditions are met.
|
|
"""
|
|
async with SessionLocal() as session:
|
|
# Fetch Monitoring Orders
|
|
stmt = select(ReservedOrder).where(ReservedOrder.status == "MONITORING")
|
|
result = await session.execute(stmt)
|
|
orders = result.scalars().all()
|
|
|
|
for order in orders:
|
|
# Check Price
|
|
current_price = 0
|
|
|
|
# 1. Try Realtime Cache
|
|
if order.stockCode in realtime_manager.price_cache:
|
|
current_price = float(realtime_manager.price_cache[order.stockCode]['price'])
|
|
else:
|
|
# 2. Try DB (StockItem)
|
|
# ... (omitted for speed, usually Cache covers if monitored)
|
|
pass
|
|
|
|
if current_price <= 0: continue
|
|
|
|
# Trigger Logic (Simple)
|
|
triggered = False
|
|
if order.monitoringType == "PRICE_TRIGGER":
|
|
# Buy if Price <= Trigger (Dip Buy) ?? OR Buy if Price >= Trigger (Breakout)?
|
|
# Usually define "Condition". Assuming Buy Lower for now or User defined.
|
|
# Let's assume TriggerPrice is "Target".
|
|
# If BUY -> Price <= Trigger?
|
|
# If SELL -> Price >= Trigger?
|
|
|
|
if order.type == "BUY" and current_price <= order.triggerPrice:
|
|
triggered = True
|
|
elif order.type == "SELL" and current_price >= order.triggerPrice:
|
|
triggered = True
|
|
|
|
elif order.monitoringType == "TRAILING_STOP":
|
|
if order.type == "SELL":
|
|
# Trailing Sell (High Water Mark)
|
|
if not order.highestPrice or current_price > order.highestPrice:
|
|
order.highestPrice = current_price
|
|
|
|
if order.highestPrice > 0 and order.trailingValue:
|
|
drop = order.highestPrice - current_price
|
|
triggered = False
|
|
|
|
if order.trailingType == "PERCENT":
|
|
drop_rate = (drop / order.highestPrice) * 100
|
|
if drop_rate >= order.trailingValue: triggered = True
|
|
else: # AMOUNT
|
|
if drop >= order.trailingValue: triggered = True
|
|
|
|
if triggered:
|
|
logger.info(f"TS SELL Triggered: High={order.highestPrice}, Curr={current_price}")
|
|
|
|
elif order.type == "BUY":
|
|
# Trailing Buy (Low Water Mark / Rebound)
|
|
# Initialize lowest if 0 (assuming stock price never 0, or use 99999999 default)
|
|
if not order.lowestPrice or order.lowestPrice == 0:
|
|
order.lowestPrice = current_price
|
|
|
|
if current_price < order.lowestPrice:
|
|
order.lowestPrice = current_price
|
|
|
|
if order.lowestPrice > 0 and order.trailingValue:
|
|
rise = current_price - order.lowestPrice
|
|
triggered = False
|
|
|
|
if order.trailingType == "PERCENT":
|
|
rise_rate = (rise / order.lowestPrice) * 100
|
|
if rise_rate >= order.trailingValue: triggered = True
|
|
else:
|
|
if rise >= order.trailingValue: triggered = True
|
|
|
|
if triggered:
|
|
logger.info(f"TS BUY Triggered: Low={order.lowestPrice}, Curr={current_price}")
|
|
|
|
if triggered:
|
|
logger.info(f"Order TRIGGERED! {order.stockName} {order.type} @ {current_price} (Target: {order.triggerPrice})")
|
|
|
|
# Execute Order
|
|
# res = await kis_client.place_order(...)
|
|
# Update Status
|
|
# order.status = "TRIGGERED" (or "COMPLETED" after exec)
|
|
pass
|
|
|
|
await session.commit()
|
|
|
|
def start_scheduler():
|
|
scheduler.add_job(market_check_job, 'interval', minutes=5)
|
|
scheduler.add_job(news_scrap_job, 'interval', minutes=10)
|
|
scheduler.add_job(auto_trade_scan_job, 'interval', minutes=1)
|
|
scheduler.add_job(service_watchdog_job, 'interval', minutes=1) # Check settings every 1 min
|
|
scheduler.add_job(persist_market_data_job, 'interval', seconds=10) # Persist every 10s
|
|
|
|
scheduler.start()
|
|
logger.info("Scheduler Started.")
|