Files
WPSBot/games/sgs_game.py
借我清欢与鹤梦 86f87b440e 新增三国杀系统
2025-10-30 16:19:02 +08:00

338 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""三国杀游戏状态管理"""
import logging
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, field
import random
import time
from games.sgs_core import (
Player, Card, CardDeck, GeneralPool, Role, Phase,
CardType, CardSuit, General
)
logger = logging.getLogger(__name__)
@dataclass
class GameState:
"""游戏状态"""
game_id: str # 游戏ID
chat_id: int # 会话ID
host_id: int # 房主ID
players: List[Player] = field(default_factory=list) # 玩家列表
deck: CardDeck = field(default_factory=CardDeck) # 牌堆
current_player_idx: int = 0 # 当前玩家索引
current_phase: Phase = Phase.PREPARE # 当前阶段
round_number: int = 1 # 回合数
is_started: bool = False # 是否已开始
is_finished: bool = False # 是否已结束
winner_role: Optional[Role] = None # 获胜身份
created_at: int = field(default_factory=lambda: int(time.time()))
# 游戏配置
max_players: int = 8 # 最大玩家数
min_players: int = 2 # 最小玩家数简化版标准是5人
# 临时状态
pending_action: Optional[Dict] = None # 等待的操作
action_queue: List[Dict] = field(default_factory=list) # 操作队列
@property
def current_player(self) -> Optional[Player]:
"""获取当前玩家"""
if 0 <= self.current_player_idx < len(self.players):
return self.players[self.current_player_idx]
return None
@property
def alive_players(self) -> List[Player]:
"""获取存活玩家"""
return [p for p in self.players if p.is_alive]
@property
def lord_player(self) -> Optional[Player]:
"""获取主公"""
for player in self.players:
if player.role == Role.LORD:
return player
return None
def get_player_by_id(self, user_id: int) -> Optional[Player]:
"""根据用户ID获取玩家"""
for player in self.players:
if player.user_id == user_id:
return player
return None
def get_next_alive_player(self, from_idx: int = None) -> Optional[Player]:
"""获取下一个存活玩家"""
if from_idx is None:
from_idx = self.current_player_idx
idx = (from_idx + 1) % len(self.players)
checked = 0
while checked < len(self.players):
if self.players[idx].is_alive:
return self.players[idx]
idx = (idx + 1) % len(self.players)
checked += 1
return None
def add_player(self, user_id: int, username: str) -> bool:
"""添加玩家
Returns:
是否成功
"""
if self.is_started:
return False
if len(self.players) >= self.max_players:
return False
if self.get_player_by_id(user_id):
return False
player = Player(user_id=user_id, username=username)
self.players.append(player)
logger.info(f"玩家 {username} 加入游戏")
return True
def remove_player(self, user_id: int) -> bool:
"""移除玩家(仅限游戏未开始时)"""
if self.is_started:
return False
player = self.get_player_by_id(user_id)
if player:
self.players.remove(player)
logger.info(f"玩家 {player.username} 离开游戏")
return True
return False
def start_game(self) -> Tuple[bool, str]:
"""开始游戏
Returns:
(是否成功, 消息)
"""
if self.is_started:
return False, "游戏已经开始"
if len(self.players) < self.min_players:
return False, f"人数不足,至少需要 {self.min_players}"
if len(self.players) > self.max_players:
return False, f"人数过多,最多 {self.max_players}"
# 分配身份
self._assign_roles()
# 选择武将
self._assign_generals()
# 初始化体力
for player in self.players:
if player.general:
player.hp = player.general.max_hp
# 主公额外+1体力上限
if player.role == Role.LORD:
player.hp += 1
if player.general:
player.general.max_hp += 1
# 发初始手牌
self._deal_initial_cards()
# 确定起始玩家(主公先手)
for idx, player in enumerate(self.players):
if player.role == Role.LORD:
self.current_player_idx = idx
break
self.is_started = True
self.current_phase = Phase.PREPARE
logger.info(f"游戏开始,共 {len(self.players)} 名玩家")
return True, "游戏开始!"
def _assign_roles(self):
"""分配身份"""
player_count = len(self.players)
# 根据人数分配身份
role_distribution = {
2: [Role.LORD, Role.REBEL],
3: [Role.LORD, Role.REBEL, Role.SPY],
4: [Role.LORD, Role.LOYAL, Role.REBEL, Role.SPY],
5: [Role.LORD, Role.LOYAL, Role.REBEL, Role.REBEL, Role.SPY],
6: [Role.LORD, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY],
7: [Role.LORD, Role.LOYAL, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY],
8: [Role.LORD, Role.LOYAL, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY]
}
roles = role_distribution.get(player_count, [Role.LORD] + [Role.REBEL] * (player_count - 1))
random.shuffle(roles)
for player, role in zip(self.players, roles):
player.role = role
logger.info(f"玩家 {player.username} 的身份是 {role.value}")
def _assign_generals(self):
"""分配武将"""
general_pool = GeneralPool()
# 主公先选
lord = self.lord_player
if lord:
# 主公从3个武将中选择这里简化为随机
lord_options = general_pool.get_random_generals(3)
lord.general = lord_options[0] # 简化:直接选第一个
logger.info(f"主公 {lord.username} 选择了武将 {lord.general.name}")
# 其他玩家随机分配
used_generals = [lord.general.name] if lord and lord.general else []
for player in self.players:
if player.role != Role.LORD:
generals = general_pool.get_random_generals(1, exclude=used_generals)
if generals:
player.general = generals[0]
used_generals.append(player.general.name)
logger.info(f"玩家 {player.username} 获得武将 {player.general.name}")
def _deal_initial_cards(self):
"""发初始手牌"""
for player in self.players:
# 每人发4张手牌
cards = self.deck.draw(4)
player.hand_cards.extend(cards)
logger.info(f"玩家 {player.username} 获得 {len(cards)} 张初始手牌")
def next_phase(self) -> Tuple[Phase, Optional[Player]]:
"""进入下一阶段
Returns:
(新阶段, 当前玩家)
"""
current = self.current_player
if not current:
return self.current_phase, None
# 阶段流转
phase_order = [
Phase.PREPARE,
Phase.JUDGE,
Phase.DRAW,
Phase.PLAY,
Phase.DISCARD,
Phase.END
]
current_idx = phase_order.index(self.current_phase)
if current_idx < len(phase_order) - 1:
# 进入下一阶段
self.current_phase = phase_order[current_idx + 1]
else:
# 回合结束,轮到下一个玩家
next_player = self.get_next_alive_player()
if next_player:
for idx, p in enumerate(self.players):
if p.user_id == next_player.user_id:
self.current_player_idx = idx
break
self.current_phase = Phase.PREPARE
self.round_number += 1
return self.current_phase, self.current_player
def check_game_over(self) -> Tuple[bool, Optional[Role]]:
"""检查游戏是否结束
Returns:
(是否结束, 获胜身份)
"""
lord = self.lord_player
# 主公死亡
if not lord or not lord.is_alive:
# 反贼胜利
return True, Role.REBEL
# 检查是否所有反贼和内奸都死亡
rebels_alive = any(p.is_alive and p.role == Role.REBEL for p in self.players)
spies_alive = any(p.is_alive and p.role == Role.SPY for p in self.players)
if not rebels_alive and not spies_alive:
# 主公和忠臣胜利
return True, Role.LORD
return False, None
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"game_id": self.game_id,
"chat_id": self.chat_id,
"host_id": self.host_id,
"players": [p.to_dict() for p in self.players],
"current_player_idx": self.current_player_idx,
"current_phase": self.current_phase.value,
"round_number": self.round_number,
"is_started": self.is_started,
"is_finished": self.is_finished,
"winner_role": self.winner_role.value if self.winner_role else None,
"created_at": self.created_at
}
class GameManager:
"""游戏管理器"""
def __init__(self):
"""初始化"""
self.games: Dict[int, GameState] = {} # chat_id -> GameState
def create_game(self, chat_id: int, host_id: int, host_name: str) -> GameState:
"""创建游戏"""
game_id = f"sgs_{chat_id}_{int(time.time())}"
game = GameState(
game_id=game_id,
chat_id=chat_id,
host_id=host_id
)
game.add_player(host_id, host_name)
self.games[chat_id] = game
logger.info(f"创建游戏: {game_id}")
return game
def get_game(self, chat_id: int) -> Optional[GameState]:
"""获取游戏"""
return self.games.get(chat_id)
def remove_game(self, chat_id: int):
"""移除游戏"""
if chat_id in self.games:
del self.games[chat_id]
logger.info(f"移除游戏: chat_id={chat_id}")
def has_active_game(self, chat_id: int) -> bool:
"""是否有活跃游戏"""
game = self.get_game(chat_id)
return game is not None and not game.is_finished
# 全局游戏管理器
_game_manager: Optional[GameManager] = None
def get_game_manager() -> GameManager:
"""获取全局游戏管理器"""
global _game_manager
if _game_manager is None:
_game_manager = GameManager()
return _game_manager