75 lines
2.1 KiB
Python
75 lines
2.1 KiB
Python
"""限流控制"""
|
|
import time
|
|
import logging
|
|
from collections import deque
|
|
from typing import Dict
|
|
from config import MESSAGE_RATE_LIMIT
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RateLimiter:
|
|
"""令牌桶限流器"""
|
|
|
|
def __init__(self, max_requests: int = MESSAGE_RATE_LIMIT, window: int = 60):
|
|
"""初始化限流器
|
|
|
|
Args:
|
|
max_requests: 时间窗口内最大请求数
|
|
window: 时间窗口(秒)
|
|
"""
|
|
self.max_requests = max_requests
|
|
self.window = window
|
|
# 使用deque存储时间戳
|
|
self.requests: deque = deque()
|
|
logger.info(f"限流器已启用: {max_requests}条/{window}秒")
|
|
|
|
def is_allowed(self) -> bool:
|
|
"""检查是否允许请求
|
|
|
|
Returns:
|
|
是否允许
|
|
"""
|
|
current_time = time.time()
|
|
|
|
# 清理过期的请求记录
|
|
while self.requests and self.requests[0] < current_time - self.window:
|
|
self.requests.popleft()
|
|
|
|
# 检查是否超过限制
|
|
if len(self.requests) < self.max_requests:
|
|
self.requests.append(current_time)
|
|
return True
|
|
else:
|
|
logger.warning(f"触发限流: 已达到 {self.max_requests}条/{self.window}秒")
|
|
return False
|
|
|
|
def get_remaining(self) -> int:
|
|
"""获取剩余可用次数"""
|
|
current_time = time.time()
|
|
|
|
# 清理过期的请求记录
|
|
while self.requests and self.requests[0] < current_time - self.window:
|
|
self.requests.popleft()
|
|
|
|
return max(0, self.max_requests - len(self.requests))
|
|
|
|
def get_reset_time(self) -> float:
|
|
"""获取重置时间(秒)"""
|
|
if not self.requests:
|
|
return 0
|
|
|
|
oldest_request = self.requests[0]
|
|
reset_time = oldest_request + self.window - time.time()
|
|
return max(0, reset_time)
|
|
|
|
|
|
# 全局限流器实例(单例)
|
|
_rate_limiter: RateLimiter = RateLimiter()
|
|
|
|
|
|
def get_rate_limiter() -> RateLimiter:
|
|
"""获取全局限流器实例"""
|
|
return _rate_limiter
|
|
|