"""三国杀游戏状态管理""" import logging from typing import List, Dict, Optional, Tuple from dataclasses import dataclass, field import random import time from games.sgs_core import ( Player, Card, CardDeck, GeneralPool, Role, Phase, CardType, CardSuit, General ) logger = logging.getLogger(__name__) @dataclass class GameState: """游戏状态""" game_id: str # 游戏ID chat_id: int # 会话ID host_id: int # 房主ID players: List[Player] = field(default_factory=list) # 玩家列表 deck: CardDeck = field(default_factory=CardDeck) # 牌堆 current_player_idx: int = 0 # 当前玩家索引 current_phase: Phase = Phase.PREPARE # 当前阶段 round_number: int = 1 # 回合数 is_started: bool = False # 是否已开始 is_finished: bool = False # 是否已结束 winner_role: Optional[Role] = None # 获胜身份 created_at: int = field(default_factory=lambda: int(time.time())) # 游戏配置 max_players: int = 8 # 最大玩家数 min_players: int = 2 # 最小玩家数(简化版,标准是5人) # 临时状态 pending_action: Optional[Dict] = None # 等待的操作 action_queue: List[Dict] = field(default_factory=list) # 操作队列 @property def current_player(self) -> Optional[Player]: """获取当前玩家""" if 0 <= self.current_player_idx < len(self.players): return self.players[self.current_player_idx] return None @property def alive_players(self) -> List[Player]: """获取存活玩家""" return [p for p in self.players if p.is_alive] @property def lord_player(self) -> Optional[Player]: """获取主公""" for player in self.players: if player.role == Role.LORD: return player return None def get_player_by_id(self, user_id: int) -> Optional[Player]: """根据用户ID获取玩家""" for player in self.players: if player.user_id == user_id: return player return None def get_next_alive_player(self, from_idx: int = None) -> Optional[Player]: """获取下一个存活玩家""" if from_idx is None: from_idx = self.current_player_idx idx = (from_idx + 1) % len(self.players) checked = 0 while checked < len(self.players): if self.players[idx].is_alive: return self.players[idx] idx = (idx + 1) % len(self.players) checked += 1 return None def add_player(self, user_id: int, username: str) -> bool: """添加玩家 Returns: 是否成功 """ if self.is_started: return False if len(self.players) >= self.max_players: return False if self.get_player_by_id(user_id): return False player = Player(user_id=user_id, username=username) self.players.append(player) logger.info(f"玩家 {username} 加入游戏") return True def remove_player(self, user_id: int) -> bool: """移除玩家(仅限游戏未开始时)""" if self.is_started: return False player = self.get_player_by_id(user_id) if player: self.players.remove(player) logger.info(f"玩家 {player.username} 离开游戏") return True return False def start_game(self) -> Tuple[bool, str]: """开始游戏 Returns: (是否成功, 消息) """ if self.is_started: return False, "游戏已经开始" if len(self.players) < self.min_players: return False, f"人数不足,至少需要 {self.min_players} 人" if len(self.players) > self.max_players: return False, f"人数过多,最多 {self.max_players} 人" # 分配身份 self._assign_roles() # 选择武将 self._assign_generals() # 初始化体力 for player in self.players: if player.general: player.hp = player.general.max_hp # 主公额外+1体力上限 if player.role == Role.LORD: player.hp += 1 if player.general: player.general.max_hp += 1 # 发初始手牌 self._deal_initial_cards() # 确定起始玩家(主公先手) for idx, player in enumerate(self.players): if player.role == Role.LORD: self.current_player_idx = idx break self.is_started = True self.current_phase = Phase.PREPARE logger.info(f"游戏开始,共 {len(self.players)} 名玩家") return True, "游戏开始!" def _assign_roles(self): """分配身份""" player_count = len(self.players) # 根据人数分配身份 role_distribution = { 2: [Role.LORD, Role.REBEL], 3: [Role.LORD, Role.REBEL, Role.SPY], 4: [Role.LORD, Role.LOYAL, Role.REBEL, Role.SPY], 5: [Role.LORD, Role.LOYAL, Role.REBEL, Role.REBEL, Role.SPY], 6: [Role.LORD, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY], 7: [Role.LORD, Role.LOYAL, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY], 8: [Role.LORD, Role.LOYAL, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY] } roles = role_distribution.get(player_count, [Role.LORD] + [Role.REBEL] * (player_count - 1)) random.shuffle(roles) for player, role in zip(self.players, roles): player.role = role logger.info(f"玩家 {player.username} 的身份是 {role.value}") def _assign_generals(self): """分配武将""" general_pool = GeneralPool() # 主公先选 lord = self.lord_player if lord: # 主公从3个武将中选择(这里简化为随机) lord_options = general_pool.get_random_generals(3) lord.general = lord_options[0] # 简化:直接选第一个 logger.info(f"主公 {lord.username} 选择了武将 {lord.general.name}") # 其他玩家随机分配 used_generals = [lord.general.name] if lord and lord.general else [] for player in self.players: if player.role != Role.LORD: generals = general_pool.get_random_generals(1, exclude=used_generals) if generals: player.general = generals[0] used_generals.append(player.general.name) logger.info(f"玩家 {player.username} 获得武将 {player.general.name}") def _deal_initial_cards(self): """发初始手牌""" for player in self.players: # 每人发4张手牌 cards = self.deck.draw(4) player.hand_cards.extend(cards) logger.info(f"玩家 {player.username} 获得 {len(cards)} 张初始手牌") def next_phase(self) -> Tuple[Phase, Optional[Player]]: """进入下一阶段 Returns: (新阶段, 当前玩家) """ current = self.current_player if not current: return self.current_phase, None # 阶段流转 phase_order = [ Phase.PREPARE, Phase.JUDGE, Phase.DRAW, Phase.PLAY, Phase.DISCARD, Phase.END ] current_idx = phase_order.index(self.current_phase) if current_idx < len(phase_order) - 1: # 进入下一阶段 self.current_phase = phase_order[current_idx + 1] else: # 回合结束,轮到下一个玩家 next_player = self.get_next_alive_player() if next_player: for idx, p in enumerate(self.players): if p.user_id == next_player.user_id: self.current_player_idx = idx break self.current_phase = Phase.PREPARE self.round_number += 1 return self.current_phase, self.current_player def check_game_over(self) -> Tuple[bool, Optional[Role]]: """检查游戏是否结束 Returns: (是否结束, 获胜身份) """ lord = self.lord_player # 主公死亡 if not lord or not lord.is_alive: # 反贼胜利 return True, Role.REBEL # 检查是否所有反贼和内奸都死亡 rebels_alive = any(p.is_alive and p.role == Role.REBEL for p in self.players) spies_alive = any(p.is_alive and p.role == Role.SPY for p in self.players) if not rebels_alive and not spies_alive: # 主公和忠臣胜利 return True, Role.LORD return False, None def to_dict(self) -> Dict: """转换为字典""" return { "game_id": self.game_id, "chat_id": self.chat_id, "host_id": self.host_id, "players": [p.to_dict() for p in self.players], "current_player_idx": self.current_player_idx, "current_phase": self.current_phase.value, "round_number": self.round_number, "is_started": self.is_started, "is_finished": self.is_finished, "winner_role": self.winner_role.value if self.winner_role else None, "created_at": self.created_at } class GameManager: """游戏管理器""" def __init__(self): """初始化""" self.games: Dict[int, GameState] = {} # chat_id -> GameState def create_game(self, chat_id: int, host_id: int, host_name: str) -> GameState: """创建游戏""" game_id = f"sgs_{chat_id}_{int(time.time())}" game = GameState( game_id=game_id, chat_id=chat_id, host_id=host_id ) game.add_player(host_id, host_name) self.games[chat_id] = game logger.info(f"创建游戏: {game_id}") return game def get_game(self, chat_id: int) -> Optional[GameState]: """获取游戏""" return self.games.get(chat_id) def remove_game(self, chat_id: int): """移除游戏""" if chat_id in self.games: del self.games[chat_id] logger.info(f"移除游戏: chat_id={chat_id}") def has_active_game(self, chat_id: int) -> bool: """是否有活跃游戏""" game = self.get_game(chat_id) return game is not None and not game.is_finished # 全局游戏管理器 _game_manager: Optional[GameManager] = None def get_game_manager() -> GameManager: """获取全局游戏管理器""" global _game_manager if _game_manager is None: _game_manager = GameManager() return _game_manager