43 lines
1.3 KiB
Python
43 lines
1.3 KiB
Python
"""
|
|
Send notification via Telegram.
|
|
Requires bot token and chat_id configured in environment or .env file.
|
|
"""
|
|
import os
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
load_dotenv()
|
|
|
|
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "8638072533:AAG3fcm0Sq_JBSSA1UXLspgRzn-tdIJ9N_8")
|
|
TELEGRAM_CHAT_ID = int(os.getenv("TELEGRAM_CHAT_ID", "1320170074"))
|
|
|
|
|
|
def send_telegram(text: str, chat_id: int = TELEGRAM_CHAT_ID) -> bool:
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
|
try:
|
|
resp = httpx.post(url, json={"chat_id": chat_id, "text": text}, timeout=10)
|
|
resp.raise_for_status()
|
|
log.info(f"Telegram sent to {chat_id}: {text[:50]}")
|
|
return True
|
|
except Exception as e:
|
|
log.error(f"Telegram failed: {e}")
|
|
return False
|
|
|
|
|
|
def format_collection_reminder(collection: dict, source_id: str) -> str:
|
|
date = collection.get("date", "?")
|
|
wtype = collection.get("type", "?")
|
|
days = collection.get("daysUntil", "?")
|
|
icon = collection.get("icon", "🗑️")
|
|
return (
|
|
f"{icon} *Waste Collection Reminder*\n"
|
|
f"📅 {date} (in {days} day{'s' if days != 1 else ''})\n"
|
|
f"🗑️ {wtype}\n"
|
|
f"🔔 from: {source_id}"
|
|
)
|