Files
2026-06-08 15:57:11 +02:00

58 lines
2.0 KiB
Python

from __future__ import annotations
import time
from dataclasses import dataclass
from redis import Redis
from redis.exceptions import RedisError
from app.settings import settings
@dataclass(frozen=True, slots=True)
class RateLimitDecision:
key: str
messages_per_minute: int
gap_seconds: float
waited_seconds: float
def _redis_client() -> Redis:
return Redis.from_url(settings.redis_url, decode_responses=True)
def wait_for_rate_limit(*, key: str, messages_per_minute: int, enabled: bool = True) -> RateLimitDecision:
"""Throttle sends across worker processes using Redis when available.
The implementation stores the next allowed send timestamp per key. A Redis
lock keeps multiple Celery processes from reading/updating the timestamp at
the same time. If Redis is unavailable, it falls back to no distributed wait;
the per-container Celery concurrency still protects local development.
"""
messages_per_minute = max(1, int(messages_per_minute or 1))
gap = 60.0 / messages_per_minute
if not enabled:
return RateLimitDecision(key=key, messages_per_minute=messages_per_minute, gap_seconds=gap, waited_seconds=0.0)
redis_key = f"multimailer:ratelimit:{key}:next_allowed"
lock_key = f"multimailer:ratelimit:{key}:lock"
waited = 0.0
try:
client = _redis_client()
with client.lock(lock_key, timeout=30, blocking_timeout=30):
now = time.time()
raw_next = client.get(redis_key)
next_allowed = float(raw_next) if raw_next else now
if next_allowed > now:
waited = next_allowed - now
time.sleep(waited)
now = time.time()
client.set(redis_key, now + gap, ex=max(60, int(gap * 10)))
except (RedisError, TimeoutError, ValueError):
# Development fallback: do not fail sending because Redis is absent.
waited = 0.0
return RateLimitDecision(key=key, messages_per_minute=messages_per_minute, gap_seconds=gap, waited_seconds=waited)