from __future__ import annotations from datetime import timedelta from typing import Any from sqlalchemy import desc, func, select from sqlalchemy.orm import Session from app.core.security import utc_now from app.models import ( Announcement, Event, EventRsvp, FileAsset, Group, Member, Poll, PollOption, PollVote, RemoteCachedObject, RemoteServerConnection, Task, Thread, ) from app.services.auth import CurrentContext, get_members_for_context from app.services.serializers import ( action_dict, announcement_dict, event_dict, file_dict, poll_dict, remote_connection_dict, task_dict, thread_dict, ) PRIORITY_ORDER = {"urgent": 0, "high": 1, "normal": 2, "low": 3} def _db_now(): return utc_now().replace(tzinfo=None) def _member_rsvp_status(db: Session, event_id: str, member_id: str) -> str: rsvp = db.scalar(select(EventRsvp).where(EventRsvp.event_id == event_id, EventRsvp.member_id == member_id)) return rsvp.status if rsvp else "unknown" def _local_actions_for_member(db: Session, member: Member) -> list[dict[str, Any]]: group = db.get(Group, member.group_id) if not group: return [] now = _db_now() actions: list[dict[str, Any]] = [] events = db.scalars(select(Event).where(Event.group_id == group.id, Event.starts_at >= now - timedelta(days=1))).all() for event in events: status = _member_rsvp_status(db, event.id, member.id) if event.rsvp_required and status == "unknown": actions.append( action_dict( id=f"local:rsvp:{member.id}:{event.id}", source_type="local", source_server_origin=group.server_origin, source_group_id=group.id, source_group_name=group.name, type="rsvp_required", priority="urgent" if event.starts_at <= now + timedelta(days=2) else "high", title=f"RSVP: {event.title}", summary=event.location_name or "Let the group know if you can make it.", object_type="event", object_id=event.id, due_at=event.starts_at, ) ) if event.changed_at and (not member.last_seen_at or event.changed_at > member.last_seen_at): actions.append( action_dict( id=f"local:event_changed:{member.id}:{event.id}", source_type="local", source_server_origin=group.server_origin, source_group_id=group.id, source_group_name=group.name, type="event_changed", priority="high", title=f"Changed: {event.title}", summary="Time or location changed since your last visit.", object_type="event", object_id=event.id, due_at=event.starts_at, ) ) tasks = db.scalars(select(Task).where(Task.assigned_to_member_id == member.id, Task.status == "open")).all() for task in tasks: actions.append( action_dict( id=f"local:task:{member.id}:{task.id}", source_type="local", source_server_origin=group.server_origin, source_group_id=group.id, source_group_name=group.name, type="task_assigned", priority="high" if task.due_at and task.due_at <= now + timedelta(days=3) else "normal", title=task.title, summary=task.description or "Task assigned to you.", object_type="task", object_id=task.id, due_at=task.due_at, ) ) polls = db.scalars(select(Poll).where(Poll.group_id == group.id, Poll.status == "open")).all() for poll in polls: existing_vote = db.scalar(select(PollVote).where(PollVote.poll_id == poll.id, PollVote.member_id == member.id)) if not existing_vote: actions.append( action_dict( id=f"local:poll:{member.id}:{poll.id}", source_type="local", source_server_origin=group.server_origin, source_group_id=group.id, source_group_name=group.name, type="vote_required", priority="high" if poll.closes_at and poll.closes_at <= now + timedelta(days=2) else "normal", title=poll.title, summary=poll.description or "Add your vote.", object_type="poll", object_id=poll.id, due_at=poll.closes_at, ) ) files = db.scalars(select(FileAsset).where(FileAsset.group_id == group.id, FileAsset.requires_ack.is_(True))).all() for file in files: actions.append( action_dict( id=f"local:file_ack:{member.id}:{file.id}", source_type="local", source_server_origin=group.server_origin, source_group_id=group.id, source_group_name=group.name, type="file_ack", priority="normal", title=f"Read: {file.filename_original}", summary=file.description or "Please review this file.", object_type="file", object_id=file.id, due_at=None, ) ) announcements = db.scalars( select(Announcement).where(Announcement.group_id == group.id, Announcement.requires_ack.is_(True), Announcement.official.is_(True)) ).all() for announcement in announcements: actions.append( action_dict( id=f"local:announcement_ack:{member.id}:{announcement.id}", source_type="local", source_server_origin=group.server_origin, source_group_id=group.id, source_group_name=group.name, type="admin_request", priority="urgent" if announcement.priority == "urgent" else "normal", title=announcement.title, summary=announcement.body[:180], object_type="announcement", object_id=announcement.id, due_at=None, ) ) return actions def local_actions_for_context(db: Session, ctx: CurrentContext) -> list[dict[str, Any]]: actions: list[dict[str, Any]] = [] for member in get_members_for_context(db, ctx): actions.extend(_local_actions_for_member(db, member)) return sort_actions(actions) def sort_actions(actions: list[dict[str, Any]]) -> list[dict[str, Any]]: return sorted( actions, key=lambda item: ( PRIORITY_ORDER.get(item.get("priority", "normal"), 9), item.get("due_at") or "9999-12-31T00:00:00", item.get("title") or "", ), ) def remote_items_for_context(db: Session, ctx: CurrentContext, object_type: str | None = None) -> list[dict[str, Any]]: if not ctx.home_profile: return [] connections = db.scalars(select(RemoteServerConnection).where(RemoteServerConnection.home_profile_id == ctx.home_profile.id)).all() connection_ids = [connection.id for connection in connections] if not connection_ids: return [] query = select(RemoteCachedObject).where(RemoteCachedObject.remote_connection_id.in_(connection_ids)) if object_type: query = query.where(RemoteCachedObject.object_type == object_type) rows = db.scalars(query).all() connection_by_id = {connection.id: connection for connection in connections} items: list[dict[str, Any]] = [] for row in rows: payload = dict(row.payload_json or {}) connection = connection_by_id.get(row.remote_connection_id) payload.setdefault("source_type", "remote") payload.setdefault("source_server_origin", connection.server_origin if connection else "") payload.setdefault("source_group_id", row.group_remote_id) payload.setdefault("source_group_name", row.group_name) payload.setdefault("remote_connection_id", row.remote_connection_id) items.append(payload) return items def home_dashboard(db: Session, ctx: CurrentContext) -> dict[str, Any]: members = get_members_for_context(db, ctx) member_group_ids = [member.group_id for member in members] now = _db_now() local_actions = local_actions_for_context(db, ctx) remote_actions = remote_items_for_context(db, ctx, "action") needs_me = sort_actions(local_actions + remote_actions) today: list[dict[str, Any]] = [] if member_group_ids: events = db.scalars( select(Event) .where(Event.group_id.in_(member_group_ids), Event.starts_at >= now - timedelta(hours=6)) .order_by(Event.starts_at) .limit(50) ).all() member_by_group = {member.group_id: member for member in members} for event in events: group = db.get(Group, event.group_id) member = member_by_group.get(event.group_id) today.append(event_dict(event, group, _member_rsvp_status(db, event.id, member.id) if member else None)) today.extend(remote_items_for_context(db, ctx, "event")) today = sorted(today, key=lambda item: item.get("starts_at") or "9999-12-31T00:00:00")[:50] changed = [action for action in needs_me if action["type"] in {"event_changed", "admin_request"}][:20] official_updates: list[dict[str, Any]] = [] if member_group_ids: announcements = db.scalars( select(Announcement) .where(Announcement.group_id.in_(member_group_ids), Announcement.official.is_(True)) .order_by(desc(Announcement.created_at)) .limit(20) ).all() for announcement in announcements: official_updates.append(announcement_dict(announcement, db.get(Group, announcement.group_id))) official_updates.extend(remote_items_for_context(db, ctx, "announcement")) discussion_count = 0 if member_group_ids: discussion_count = db.scalar(select(func.count()).select_from(Thread).where(Thread.group_id.in_(member_group_ids), Thread.kind == "discussion")) or 0 catch_up = [ {"label": "official announcements", "count": len(official_updates)}, {"label": "changed events", "count": len([item for item in changed if item["type"] == "event_changed"])}, {"label": "open actions", "count": len(needs_me)}, {"label": "discussion threads", "count": discussion_count + len(remote_items_for_context(db, ctx, "thread"))}, ] connections = [] if ctx.home_profile: connections = [ remote_connection_dict(connection) for connection in db.scalars(select(RemoteServerConnection).where(RemoteServerConnection.home_profile_id == ctx.home_profile.id)).all() ] return { "sections": { "needs_me": needs_me, "today": today, "changed": changed, "official_updates": official_updates, "catch_up": catch_up, }, "connections": connections, } def calendar_items(db: Session, ctx: CurrentContext) -> list[dict[str, Any]]: items = home_dashboard(db, ctx)["sections"]["today"] return sorted(items, key=lambda item: item.get("starts_at") or "9999-12-31T00:00:00") def file_items(db: Session, ctx: CurrentContext) -> list[dict[str, Any]]: members = get_members_for_context(db, ctx) group_ids = [member.group_id for member in members] files: list[dict[str, Any]] = [] if group_ids: for file in db.scalars(select(FileAsset).where(FileAsset.group_id.in_(group_ids)).order_by(desc(FileAsset.created_at))).all(): files.append(file_dict(file, db.get(Group, file.group_id))) files.extend(remote_items_for_context(db, ctx, "file")) return files def group_dashboard(db: Session, group: Group, member: Member | None = None) -> dict[str, Any]: rsvp_member_id = member.id if member else "" announcements = [ announcement_dict(item, group) for item in db.scalars( select(Announcement).where(Announcement.group_id == group.id).order_by(desc(Announcement.created_at)).limit(20) ).all() ] events = [ event_dict(item, group, _member_rsvp_status(db, item.id, rsvp_member_id) if rsvp_member_id else None) for item in db.scalars(select(Event).where(Event.group_id == group.id).order_by(Event.starts_at).limit(30)).all() ] tasks = [task_dict(item, group) for item in db.scalars(select(Task).where(Task.group_id == group.id).order_by(Task.status, Task.due_at)).all()] files = [file_dict(item, group) for item in db.scalars(select(FileAsset).where(FileAsset.group_id == group.id).order_by(desc(FileAsset.created_at))).all()] polls = [] for poll in db.scalars(select(Poll).where(Poll.group_id == group.id).order_by(desc(Poll.created_at))).all(): options = db.scalars(select(PollOption).where(PollOption.poll_id == poll.id).order_by(PollOption.position)).all() votes = db.scalars(select(PollVote).where(PollVote.poll_id == poll.id)).all() polls.append(poll_dict(poll, list(options), list(votes), group)) threads = [ thread_dict(item, group=group) for item in db.scalars(select(Thread).where(Thread.group_id == group.id).order_by(desc(Thread.updated_at)).limit(20)).all() ] important_now = _local_actions_for_member(db, member) if member else [] return { "important_now": sort_actions(important_now)[:8], "upcoming": events[:8], "open_actions": sort_actions(important_now), "announcements": announcements, "tasks": tasks, "polls": polls, "files": files, "discussions": threads, }