import time import threading import logging import requests import json from sqlalchemy.orm import Session from database import SessionLocal, News, Stock from config import get_kis_config, load_config logger = logging.getLogger("NEWS_AI") class NewsBot: def __init__(self): self.is_running = False self.thread = None self.config = load_config() self.naver_id = self.config.get('naver', {}).get('client_id', '') self.naver_secret = self.config.get('naver', {}).get('client_secret', '') self.google_key = self.config.get('google', {}).get('api_key', '') def start(self): if self.is_running: return self.is_running = True self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread.start() logger.info("News Bot Started") def stop(self): self.is_running = False if self.thread: self.thread.join() logger.info("News Bot Stopped") def _run_loop(self): while self.is_running: try: # Reload config to check current settings self.config = load_config() if self.config.get('preferences', {}).get('enable_news', False): self._fetch_and_analyze() else: logger.info("News collection is disabled.") except Exception as e: logger.error(f"Error in news loop: {e}") # Sleep 10 minutes (600 seconds) for _ in range(600): if not self.is_running: break time.sleep(1) def _fetch_and_analyze(self): logger.info("Fetching News...") if not self.naver_id or not self.naver_secret: logger.warning("Naver API Credentials missing.") return # 1. Fetch News (Naver) # Search for generic economy terms or specific watchlist query = "주식 시장" # General Stock Market url = "https://openapi.naver.com/v1/search/news.json" headers = { "X-Naver-Client-Id": self.naver_id, "X-Naver-Client-Secret": self.naver_secret } params = {"query": query, "display": 10, "sort": "date"} res = requests.get(url, headers=headers, params=params) if res.status_code != 200: logger.error(f"Naver News Failed: {res.text}") return items = res.json().get('items', []) db = SessionLocal() try: for item in items: title = item['title'] link = item['originallink'] or item['link'] pub_date = item['pubDate'] # Check duplication if db.query(News).filter(News.link == link).first(): continue # 2. AI Analysis (Google Gemini) analysis = self._analyze_with_ai(title, item['description']) # Save to DB news = News( title=title, link=link, pub_date=pub_date, analysis_result=analysis.get('summary', ''), impact_score=analysis.get('score', 0), related_sector=analysis.get('sector', '') ) db.add(news) db.commit() logger.info(f"Processed {len(items)} news items.") finally: db.close() def _analyze_with_ai(self, title, description): if not self.google_key: return {"summary": "No API Key", "score": 0, "sector": ""} logger.info(f"Analyzing: {title[:30]}...") # Prompt prompt = f""" Analyze the following news for stock market impact. Title: {title} Description: {description} Return JSON format: {{ "summary": "One line summary of impact", "score": Integer between -10 (Negative) to 10 (Positive), "sector": "Related Industry/Sector or 'None'" }} """ url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key={self.google_key}" headers = {"Content-Type": "application/json"} body = { "contents": [{ "parts": [{"text": prompt}] }] } try: res = requests.post(url, headers=headers, data=json.dumps(body)) if res.status_code == 200: result = res.json() text = result['candidates'][0]['content']['parts'][0]['text'] # Clean markdown json if any text = text.replace("```json", "").replace("```", "").strip() return json.loads(text) else: logger.error(f"Gemini API Error: {res.text}") except Exception as e: logger.error(f"AI Analysis Exception: {e}") return {"summary": "Error", "score": 0, "sector": ""} news_bot = NewsBot()