Files
backup_openclaw/projects/auto-trader/notify_telegram.py
2026-03-30 19:30:25 +09:00

95 lines
2.5 KiB
Python

import json
import subprocess
from pathlib import Path
BASE = Path('/home/arin/.openclaw/workspace/projects/auto-trader')
ALERTS_PATH = BASE / 'alerts.jsonl'
NOTIFY_STATE_PATH = BASE / 'notify_state.json'
CONFIG_PATH = BASE / 'telegram_notify_config.json'
def load_json(path: Path, default):
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding='utf-8'))
except Exception:
return default
def save_json(path: Path, data):
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
def load_alerts():
if not ALERTS_PATH.exists():
return []
rows = []
for line in ALERTS_PATH.read_text(encoding='utf-8').splitlines():
line = line.strip()
if not line:
continue
try:
rows.append(json.loads(line))
except Exception:
pass
return rows
def format_msg(alert: dict) -> str:
side = '매수' if alert.get('side') == 'buy' else '매도'
reasons = ', '.join(alert.get('reasons', []))
return (
f"[자동매매 알림]\n"
f"시간: {alert.get('time')}\n"
f"종목: {alert.get('name')}({alert.get('symbol')})\n"
f"신호: {side}\n"
f"현재가: {alert.get('price'):,}\n"
f"등락률: {alert.get('change_pct')}%\n"
f"조건: {reasons}"
)
def send_message(cfg: dict, text: str):
cmd = [
'openclaw', 'message', 'send',
'--channel', cfg['channel'],
'--target', cfg['target'],
'--message', text,
]
if cfg.get('account'):
cmd.extend(['--account', cfg['account']])
subprocess.run(cmd, check=True)
def main():
cfg = load_json(CONFIG_PATH, {})
if not cfg.get('enabled'):
print(json.dumps({'status': 'disabled'}, ensure_ascii=False))
return
alerts = load_alerts()
state = load_json(NOTIFY_STATE_PATH, {'last_key': None})
last_key = state.get('last_key')
if last_key is None:
new_alerts = alerts[-1:] if alerts else []
else:
keys = [a.get('key') for a in alerts]
if last_key in keys:
idx = keys.index(last_key)
new_alerts = alerts[idx + 1:]
else:
new_alerts = alerts[-1:] if alerts else []
for alert in new_alerts:
send_message(cfg, format_msg(alert))
state['last_key'] = alert.get('key')
save_json(NOTIFY_STATE_PATH, state)
print(json.dumps({'sent': len(new_alerts), 'last_key': state.get('last_key')}, ensure_ascii=False))
if __name__ == '__main__':
main()