Files
KisStock/backend/app/services/telegram_service.py

34 lines
1.0 KiB
Python

import httpx
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class TelegramService:
def __init__(self):
self.base_url = "https://api.telegram.org/bot"
async def send_message(self, token: str, chat_id: str, text: str) -> bool:
if not token or not chat_id:
logger.warning("Telegram: Token or Chat ID is missing.")
return False
url = f"{self.base_url}{token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML"
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(url, json=payload, timeout=10.0)
resp.raise_for_status()
logger.info(f"Telegram: Message sent successfully to {chat_id}")
return True
except Exception as e:
logger.error(f"Telegram: Failed to send message: {e}")
return False
telegram_service = TelegramService()