215 lines
8.1 KiB
Python
215 lines
8.1 KiB
Python
import time
|
|
import threading
|
|
import logging
|
|
from sqlalchemy.orm import Session
|
|
from database import SessionLocal, TradeSetting, Order, Stock, AccountBalance, Holding
|
|
import datetime
|
|
from kis_api import kis
|
|
from telegram_notifier import notifier
|
|
|
|
|
|
logger = logging.getLogger("TRADER")
|
|
|
|
class TradingBot:
|
|
def __init__(self):
|
|
self.is_running = False
|
|
self.thread = None
|
|
self.holdings = {} # Local cache for holdings: {code: {qty: int, price: float}}
|
|
|
|
self.last_chart_update = 0
|
|
|
|
def refresh_assets(self):
|
|
"""
|
|
Fetch Balance and Holdings from KIS and save to DB
|
|
"""
|
|
logger.info("Syncing Assets to Database...")
|
|
db = SessionLocal()
|
|
try:
|
|
# 1. Domestic Balance
|
|
balance = kis.get_balance(source="Automated_Sync")
|
|
if balance and 'output2' in balance and balance['output2']:
|
|
summary = balance['output2'][0]
|
|
# Upsert AccountBalance
|
|
fn_status = db.query(AccountBalance).first()
|
|
if not fn_status:
|
|
fn_status = AccountBalance()
|
|
db.add(fn_status)
|
|
|
|
fn_status.total_eval = float(summary['tot_evlu_amt'])
|
|
fn_status.deposit = float(summary['dnca_tot_amt'])
|
|
fn_status.total_profit = float(summary['evlu_pfls_smtl_amt'])
|
|
fn_status.updated_at = datetime.datetime.now()
|
|
|
|
# 2. Holdings (Domestic)
|
|
# Clear existing DOMESTIC
|
|
db.query(Holding).filter(Holding.market == 'DOMESTIC').delete()
|
|
|
|
if balance and 'output1' in balance:
|
|
self.holdings = {} # Keep memory cache for trading logic
|
|
for item in balance['output1']:
|
|
code = item['pdno']
|
|
qty = int(item['hldg_qty'])
|
|
if qty > 0:
|
|
buy_price = float(item['pchs_avg_pric'])
|
|
current_price = float(item['prpr'])
|
|
profit_rate = float(item['evlu_pfls_rt'])
|
|
|
|
# Save to DB
|
|
db.add(Holding(
|
|
code=code,
|
|
name=item['prdt_name'],
|
|
quantity=qty,
|
|
price=buy_price,
|
|
current_price=current_price,
|
|
profit_rate=profit_rate,
|
|
market="DOMESTIC"
|
|
))
|
|
|
|
# Memory Cache for Trade Logic
|
|
self.holdings[code] = {'qty': qty, 'price': buy_price}
|
|
|
|
# 3. Overseas Balance (NASD default)
|
|
# TODO: Multi-market support if needed
|
|
overseas = kis.get_overseas_balance(exchange="NASD")
|
|
|
|
# Clear existing NASD
|
|
db.query(Holding).filter(Holding.market == 'NASD').delete()
|
|
|
|
if overseas and 'output1' in overseas:
|
|
for item in overseas['output1']:
|
|
qty = float(item['ovrs_cblc_qty']) # Overseas can be fractional? KIS is usually int but check.
|
|
if qty > 0:
|
|
code = item['ovrs_pdno']
|
|
# name = item.get('ovrs_item_name') or item.get('prdt_name')
|
|
# KIS overseas output keys vary.
|
|
|
|
db.add(Holding(
|
|
code=code,
|
|
name=item.get('ovrs_item_name', code),
|
|
quantity=int(qty),
|
|
price=float(item.get('frcr_pchs_amt1', 0)), # Avg Price? Check API
|
|
current_price=float(item.get('now_pric2', 0)),
|
|
profit_rate=float(item.get('evlu_pfls_rt', 0)),
|
|
market="NASD"
|
|
))
|
|
|
|
db.commit()
|
|
logger.info("Assets Synced Successfully.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to sync assets: {e}")
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
def start(self):
|
|
if self.is_running:
|
|
return
|
|
self.is_running = True
|
|
self.refresh_assets() # Fetch on start
|
|
self.thread = threading.Thread(target=self._run_loop, daemon=True)
|
|
self.thread.start()
|
|
logger.info("Trading Bot Started")
|
|
|
|
def stop(self):
|
|
self.is_running = False
|
|
if self.thread:
|
|
self.thread.join()
|
|
logger.info("Trading Bot Stopped")
|
|
|
|
def _run_loop(self):
|
|
while self.is_running:
|
|
try:
|
|
self._process_cycle()
|
|
except Exception as e:
|
|
logger.error(f"Error in trading loop: {e}")
|
|
|
|
# Sleep 1 second to avoid hammering
|
|
time.sleep(1)
|
|
|
|
def _process_cycle(self):
|
|
db = SessionLocal()
|
|
try:
|
|
# Get active trade settings
|
|
settings = db.query(TradeSetting).filter(TradeSetting.is_active == True).all()
|
|
|
|
for setting in settings:
|
|
self._check_and_trade(db, setting)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def _check_and_trade(self, db: Session, setting: TradeSetting):
|
|
code = setting.code
|
|
|
|
# Get Current Price
|
|
# Optimization: Ideally read from a shared cache from WebSocket
|
|
# For now, we still poll price or should use WS logic?
|
|
# User said "Websocket... automatic decision".
|
|
# But trader.py is isolated.
|
|
# For simplicity in this step (removing balance poll), we keep price fetch but remove balance poll.
|
|
price_data = kis.get_current_price(code)
|
|
if not price_data:
|
|
return
|
|
|
|
current_price = float(price_data.get('stck_prpr', 0))
|
|
if current_price == 0:
|
|
return
|
|
|
|
# Check holdings from Cache
|
|
if code not in self.holdings:
|
|
return # No holdings, nothing to sell (if logic is Sell)
|
|
|
|
holding = self.holdings[code]
|
|
holding_qty = holding['qty']
|
|
|
|
# SELL Logic
|
|
if holding_qty > 0:
|
|
# Stop Loss
|
|
if setting.stop_loss_price and current_price <= setting.stop_loss_price:
|
|
logger.info(f"Stop Loss Triggered for {code}. Price: {current_price}, SL: {setting.stop_loss_price}")
|
|
self._place_order(db, code, 'sell', holding_qty, 0) # 0 means Market Price
|
|
return
|
|
|
|
# Target Profit
|
|
if setting.target_price and current_price >= setting.target_price:
|
|
logger.info(f"Target Price Triggered for {code}. Price: {current_price}, TP: {setting.target_price}")
|
|
self._place_order(db, code, 'sell', holding_qty, 0)
|
|
return
|
|
|
|
def _place_order(self, db: Session, code: str, type: str, qty: int, price: int):
|
|
logger.info(f"Placing Order: {code} {type} {qty} @ {price}")
|
|
res = kis.place_order(code, type, qty, price)
|
|
|
|
status = "FAILED"
|
|
order_id = ""
|
|
if res and res.get('rt_cd') == '0':
|
|
status = "PENDING"
|
|
order_id = res.get('output', {}).get('ODNO', '')
|
|
logger.info(f"Order Success: {order_id}")
|
|
notifier.send_message(f"🔔 주문 전송 완료\n[{type.upper()}] {code}\n수량: {qty}\n가격: {price if price > 0 else '시장가'}")
|
|
|
|
# Optimistic Update or Refresh?
|
|
# User said "If execution happens, update list".
|
|
# We should schedule a refresh.
|
|
time.sleep(1) # Wait for execution
|
|
self.refresh_assets()
|
|
|
|
else:
|
|
logger.error(f"Order Failed: {res}")
|
|
notifier.send_message(f"⚠️ 주문 실패\n[{type.upper()}] {code}\n이유: {res}")
|
|
|
|
# Record to DB
|
|
new_order = Order(
|
|
code=code,
|
|
order_id=order_id,
|
|
type=type.upper(),
|
|
price=price,
|
|
quantity=qty,
|
|
status=status
|
|
)
|
|
db.add(new_order)
|
|
db.commit()
|
|
|
|
trader = TradingBot()
|