Files
waste_api/app/core/scheduler.py

119 lines
4.1 KiB
Python

import uuid
from datetime import datetime, timedelta
from pathlib import Path
import json
import logging
log = logging.getLogger(__name__)
NOTIFS_FILE = Path(__file__).resolve().parent.parent.parent / "data" / "notifications.json"
# ─── Source registry ───────────────────────────────────────────
SOURCE_DEFS = {
"ics": {
"name": "Generic ICS / iCal URL",
"params": [
{"name": "url", "type": "string", "required": True, "example": "https://example.com/kalender.ics"},
],
},
"abfall_io": {
"name": "Abfall.IO (Germany)",
"params": [
{"name": "key", "type": "string", "required": True, "example": "8215c62763967916979e0e8566b6172e"},
{"name": "f_id_kommune", "type": "integer", "required": True, "example": 2999},
{"name": "f_id_strasse", "type": "integer", "required": True, "example": 1087},
],
},
"awm_muenchen_de": {
"name": "AWM München (Germany)",
"params": [
{"name": "street", "type": "string", "required": True, "example": "Bahnstr."},
{"name": "house_number", "type": "string", "required": True, "example": "11"},
],
},
"stuttgart_de": {
"name": "Abfall Stuttgart (Germany)",
"params": [
{"name": "street", "type": "string", "required": True, "example": "Im Steinengarten"},
{"name": "streetnr", "type": "integer", "required": True, "example": 7},
],
},
}
class NotificationScheduler:
def __init__(self):
self._notifications: dict[str, dict] = {}
self._load()
def _load(self):
if NOTIFS_FILE.exists():
try:
data = json.loads(NOTIFS_FILE.read_text())
self._notifications = {n["id"]: n for n in data.get("notifications", [])}
log.info(f"Loaded {len(self._notifications)} notifications from {NOTIFS_FILE}")
except Exception as e:
log.warning(f"Failed to load notifications: {e}")
def _save(self):
NOTIFS_FILE.parent.mkdir(parents=True, exist_ok=True)
NOTIFS_FILE.write_text(
json.dumps({"notifications": list(self._notifications.values())}, indent=2)
)
def add(
self,
source_id: str,
collection: dict,
notify_at_days_before: int,
channels: list[str],
chat_id: int,
) -> dict:
nid = f"notif_{uuid.uuid4().hex[:8]}"
entry = {
"id": nid,
"source_id": source_id,
"collection_type": collection.get("type", "Unknown"),
"collection_date": collection.get("date", ""),
"notify_at_days_before": notify_at_days_before,
"channels": channels,
"chat_id": chat_id,
"fired": False,
}
self._notifications[nid] = entry
self._save()
return entry
def list_all(self) -> list[dict]:
return list(self._notifications.values())
def delete(self, nid: str) -> bool:
if nid in self._notifications:
del self._notifications[nid]
self._save()
return True
return False
def due_today(self) -> list[dict]:
today = datetime.now().date().isoformat()
due = []
for n in self._notifications.values():
if n.get("fired"):
continue
coll_date = n.get("collection_date", "")
days_before = n.get("notify_at_days_before", 1)
# check if today is notify_at_days_before days before collection
try:
coll = datetime.fromisoformat(coll_date).date()
notify_date = coll - timedelta(days=days_before)
if notify_date.isoformat() == today:
due.append(n)
except Exception:
pass
return due
def mark_fired(self, nid: str):
if nid in self._notifications:
self._notifications[nid]["fired"] = True
self._save()