5 Commits
main ... three

Author SHA1 Message Date
借我清欢与鹤梦
3679d60e0c fix:jion未注册加入问题 2025-10-31 10:25:38 +08:00
借我清欢与鹤梦
96513c6e60 fix:语法 2025-10-30 16:49:46 +08:00
借我清欢与鹤梦
9625fa7808 fix:占位符问题以及缩进问题 2025-10-30 16:33:25 +08:00
借我清欢与鹤梦
131fabeec2 配置文件 2025-10-30 16:24:35 +08:00
借我清欢与鹤梦
86f87b440e 新增三国杀系统 2025-10-30 16:19:02 +08:00
15 changed files with 1486 additions and 0 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

6
.idea/MarsCodeWorkspaceAppSettings.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="com.codeverse.userSettings.MarscodeWorkspaceAppSettingsState">
<option name="progress" value="0.93" />
</component>
</project>

9
.idea/WPSBot.iml generated Normal file
View File

@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.12 (pythonProject)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

7
.idea/codeStyles/Project.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<ScalaCodeStyleSettings>
<option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" />
</ScalaCodeStyleSettings>
</code_scheme>
</component>

5
.idea/codeStyles/codeStyleConfig.xml generated Normal file
View File

@@ -0,0 +1,5 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

View File

@@ -0,0 +1,12 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="list.*" />
</list>
</option>
</inspection_tool>
</profile>
</component>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12 (pythonProject)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (pythonProject)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/WPSBot.iml" filepath="$PROJECT_DIR$/.idea/WPSBot.iml" />
</modules>
</component>
</project>

7
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
<mapping directory="$PROJECT_DIR$/Convention" vcs="Git" />
</component>
</project>

View File

@@ -82,6 +82,13 @@ def get_help_message() -> str:
- `.gomoku list` - 列出所有对战 - `.gomoku list` - 列出所有对战
- `.gomoku stats` - 查看战绩 - `.gomoku stats` - 查看战绩
### ⚔️ 三国杀
- `.sgs create` - 创建游戏
- `.sgs join` - 加入游戏
- `.sgs start` - 开始游戏
- `.sgs status` - 查看状态
- `.sgs help` - 查看帮助
### 💎 积分系统 ### 💎 积分系统
- `.points` - 查看个人积分 - `.points` - 查看个人积分
- `.积分` - 查看个人积分 - `.积分` - 查看个人积分

534
games/sanguosha.py Normal file
View File

@@ -0,0 +1,534 @@
"""三国杀游戏主控制器"""
import logging
from typing import Optional, Tuple
from games.base import BaseGame
from games.sgs_game import GameState, get_game_manager
from games.sgs_core import Phase, Role
from utils.parser import CommandParser
from core.database import get_db
logger = logging.getLogger(__name__)
class SanguoshaGame(BaseGame):
"""三国杀游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.manager = get_game_manager()
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理游戏指令
Args:
command: 指令
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取指令和参数
cmd, args = CommandParser.extract_command_args(command)
args = args.strip().lower()
# 获取用户信息
user = self.db.get_or_create_user(user_id)
username = user.get('username') or f'用户{user_id}'
# 路由指令
if not args or args == 'help':
return self.get_help()
elif args == 'create' or args == '创建':
return await self._handle_create(chat_id, user_id, username)
elif args == 'join' or args == '加入':
return await self._handle_join(chat_id, user_id, username)
elif args == 'leave' or args == '离开':
return await self._handle_leave(chat_id, user_id)
elif args == 'start' or args == '开始':
return await self._handle_start(chat_id, user_id)
elif args == 'status' or args == '状态':
return await self._handle_status(chat_id, user_id)
elif args == 'players' or args == '玩家':
return await self._handle_players(chat_id)
elif args == 'hand' or args == '手牌':
return await self._handle_hand(chat_id, user_id)
elif args == 'next' or args == '下一阶段':
return await self._handle_next_phase(chat_id, user_id)
elif args.startswith('play ') or args.startswith('出牌 '):
card_name = args.split(maxsplit=1)[1]
return await self._handle_play_card(chat_id, user_id, card_name)
elif args.startswith('use ') or args.startswith('使用 '):
card_name = args.split(maxsplit=1)[1]
return await self._handle_use_card(chat_id, user_id, card_name)
elif args == 'draw' or args == '摸牌':
return await self._handle_draw(chat_id, user_id)
elif args == 'discard' or args == '弃牌':
return await self._handle_discard(chat_id, user_id)
elif args == 'cancel' or args == '取消':
return await self._handle_cancel(chat_id, user_id)
elif args == 'stats' or args == '战绩':
return await self._handle_stats(user_id)
else:
return f"❌ 未知指令: {args}\n\n使用 `.sgs help` 查看帮助"
except Exception as e:
logger.error(f"处理三国杀指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
async def _handle_create(self, chat_id: int, user_id: int, username: str) -> str:
"""创建游戏"""
# 检查是否已有游戏
if self.manager.has_active_game(chat_id):
return "❌ 当前已有进行中的游戏"
# 创建游戏
game = self.manager.create_game(chat_id, user_id, username)
return f"""✅ 三国杀游戏已创建!
**房主**: {username}
**游戏ID**: {game.game_id}
📝 其他玩家使用 `.sgs join` 加入游戏
📝 人数达到 {game.min_players}-{game.max_players} 人后,房主使用 `.sgs start` 开始游戏
当前玩家: 1/{game.max_players}"""
async def _handle_join(self, chat_id: int, user_id: int, username: str) -> str:
"""加入游戏"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏,使用 `.sgs create` 创建游戏"
if game.is_started:
return "❌ 游戏已经开始,无法加入"
if game.get_player_by_id(user_id):
return "❌ 你已经在游戏中了"
if not game.add_player(user_id, username):
return f"❌ 加入失败,游戏已满({game.max_players}人)"
return f"""{username} 加入游戏!
当前玩家: {len(game.players)}/{game.max_players}
玩家列表: {', '.join(p.username for p in game.players)}
{f'📝 人数已满,房主可以使用 `.sgs start` 开始游戏' if len(game.players) >= game.min_players else f'📝 还需要 {game.min_players - len(game.players)} 人才能开始'}"""
async def _handle_leave(self, chat_id: int, user_id: int) -> str:
"""离开游戏"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if game.is_started:
return "❌ 游戏已经开始,无法离开"
player = game.get_player_by_id(user_id)
if not player:
return "❌ 你不在游戏中"
if not game.remove_player(user_id):
return "❌ 离开失败"
# 如果房主离开且还有其他玩家,转移房主
if user_id == game.host_id and game.players:
game.host_id = game.players[0].user_id
return f"{player.username} 离开游戏,房主已转移给 {game.players[0].username}"
# 如果没有玩家了,删除游戏
if not game.players:
self.manager.remove_game(chat_id)
return f"{player.username} 离开游戏,游戏已解散"
return f"{player.username} 离开游戏\n\n当前玩家: {len(game.players)}/{game.max_players}"
async def _handle_start(self, chat_id: int, user_id: int) -> str:
"""开始游戏"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if user_id != game.host_id:
return "❌ 只有房主可以开始游戏"
success, message = game.start_game()
if not success:
return f"{message}"
# 构建游戏开始消息
result = "## 🎮 三国杀游戏开始!\n\n"
# 显示主公信息
lord = game.lord_player
if lord:
result += f"### 👑 主公\n"
result += f"**{lord.username}** - {lord.general.name}{lord.general.kingdom}\n"
result += f"体力: {lord.hp}/{lord.general.max_hp}\n"
result += f"技能: {', '.join(s.name for s in lord.general.skills)}\n\n"
# 显示其他玩家信息(不显示身份)
result += f"### 👥 玩家列表\n"
for idx, player in enumerate(game.players, 1):
if player.role != Role.LORD:
result += f"{idx}. **{player.username}** - {player.general.name}{player.general.kingdom}\n"
result += f" 体力: {player.hp}/{player.general.max_hp}\n"
result += f"\n### 📋 游戏信息\n"
result += f"- 玩家数: {len(game.players)}\n"
result += f"- 当前回合: {game.current_player.username}\n"
result += f"- 当前阶段: {game.current_phase.value}\n\n"
result += "💡 使用 `.sgs status` 查看游戏状态\n"
result += "💡 使用 `.sgs hand` 查看手牌\n"
result += "💡 使用 `.sgs next` 进入下一阶段"
return result
async def _handle_status(self, chat_id: int, user_id: int) -> str:
"""查看游戏状态"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if not game.is_started:
return f"""## 📋 游戏状态 (准备中)
**房主**: {game.players[0].username if game.players else ''}
**玩家数**: {len(game.players)}/{game.max_players}
**玩家列表**: {', '.join(p.username for p in game.players)}
📝 使用 `.sgs start` 开始游戏"""
# 游戏进行中
result = f"## 📋 游戏状态\n\n"
result += f"### 🕒 当前回合\n"
result += f"- 玩家: **{game.current_player.username}**\n"
result += f"- 阶段: **{game.current_phase.value}**\n"
result += f"- 回合数: {game.round_number}\n\n"
result += f"### 👥 玩家状态\n"
for idx, player in enumerate(game.players, 1):
status = "💀" if not player.is_alive else ""
role_display = player.role.value if player.role == Role.LORD or not player.is_alive else "???"
result += f"{idx}. {status} **{player.username}** ({role_display})\n"
result += f" 武将: {player.general.name}{player.general.kingdom}\n"
result += f" 体力: {'❤️' * player.hp}{'🖤' * (player.general.max_hp - player.hp)} {player.hp}/{player.general.max_hp}\n"
result += f" 手牌: {player.hand_count}\n"
if player.equipment:
equip_list = ', '.join(f"{k}:{v.name}" for k, v in player.equipment.items())
result += f" 装备: {equip_list}\n"
result += "\n"
return result
async def _handle_players(self, chat_id: int) -> str:
"""查看玩家列表"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
result = f"## 👥 玩家列表\n\n"
for idx, player in enumerate(game.players, 1):
result += f"{idx}. **{player.username}**"
if game.is_started:
result += f" - {player.general.name}"
if player.role == Role.LORD or not player.is_alive:
result += f"{player.role.value}"
result += "\n"
return result
async def _handle_hand(self, chat_id: int, user_id: int) -> str:
"""查看手牌"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if not game.is_started:
return "❌ 游戏还未开始"
player = game.get_player_by_id(user_id)
if not player:
return "❌ 你不在游戏中"
if not player.is_alive:
return "❌ 你已经阵亡"
if not player.hand_cards:
return "📋 你的手牌为空"
result = f"## 🃏 你的手牌({len(player.hand_cards)}张)\n\n"
# 按类型分组
basic_cards = [c for c in player.hand_cards if c.card_type.value == "基本牌"]
trick_cards = [c for c in player.hand_cards if c.card_type.value == "锦囊牌"]
equip_cards = [c for c in player.hand_cards if c.card_type.value == "装备牌"]
if basic_cards:
result += "### 基本牌\n"
for idx, card in enumerate(basic_cards, 1):
result += f"{idx}. {card}\n"
result += "\n"
if trick_cards:
result += "### 锦囊牌\n"
for idx, card in enumerate(trick_cards, 1):
result += f"{idx}. {card}\n"
result += "\n"
if equip_cards:
result += "### 装备牌\n"
for idx, card in enumerate(equip_cards, 1):
result += f"{idx}. {card}\n"
result += "\n💡 使用 `.sgs play 卡牌名` 出牌"
return result
async def _handle_next_phase(self, chat_id: int, user_id: int) -> str:
"""进入下一阶段"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if not game.is_started:
return "❌ 游戏还未开始"
if game.current_player.user_id != user_id:
return "❌ 不是你的回合"
# 执行阶段转换
new_phase, current_player = game.next_phase()
# 检查游戏是否结束
is_over, winner_role = game.check_game_over()
if is_over:
game.is_finished = True
game.winner_role = winner_role
return await self._handle_game_over(game)
result = f"✅ 进入下一阶段\n\n"
result += f"**当前玩家**: {current_player.username}\n"
result += f"**当前阶段**: {new_phase.value}\n\n"
# 阶段提示
if new_phase == Phase.DRAW:
result += "💡 摸牌阶段,使用 `.sgs draw` 摸牌"
elif new_phase == Phase.PLAY:
result += "💡 出牌阶段,使用 `.sgs play 卡牌名` 出牌"
elif new_phase == Phase.DISCARD:
result += "💡 弃牌阶段,使用 `.sgs discard` 弃牌"
else:
result += "请使用正确的语法"
return result
async def _handle_draw(self, chat_id: int, user_id: int) -> str:
"""摸牌"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if game.current_player.user_id != user_id:
return "❌ 不是你的回合"
if game.current_phase != Phase.DRAW:
return f"❌ 当前不是摸牌阶段(当前: {game.current_phase.value}"
# 摸2张牌
cards = game.deck.draw(2)
game.current_player.hand_cards.extend(cards)
result = f"✅ 摸牌成功!\n\n"
result += f"摸到: {', '.join(str(c) for c in cards)}\n"
result += f"当前手牌数: {game.current_player.hand_count}\n\n"
result += "💡 使用 `.sgs next` 进入出牌阶段"
return result
async def _handle_play_card(self, chat_id: int, user_id: int, card_name: str) -> str:
"""出牌"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if game.current_player.user_id != user_id:
return "❌ 不是你的回合"
if game.current_phase != Phase.PLAY:
return f"❌ 当前不是出牌阶段(当前: {game.current_phase.value}"
player = game.current_player
# 查找卡牌
card = None
for c in player.hand_cards:
if c.name == card_name:
card = c
break
if not card:
return f"❌ 你没有【{card_name}】这张牌"
# 简化处理:直接打出(实际游戏需要选择目标等)
player.remove_card(card)
game.deck.discard([card])
return f"✅ 使用了【{card}\n\n💡 继续出牌或使用 `.sgs next` 进入弃牌阶段"
async def _handle_use_card(self, chat_id: int, user_id: int, card_name: str) -> str:
"""使用卡牌同play_card"""
return await self._handle_play_card(chat_id, user_id, card_name)
async def _handle_discard(self, chat_id: int, user_id: int) -> str:
"""弃牌"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if game.current_player.user_id != user_id:
return "❌ 不是你的回合"
if game.current_phase != Phase.DISCARD:
return f"❌ 当前不是弃牌阶段(当前: {game.current_phase.value}"
player = game.current_player
# 检查是否需要弃牌
max_hand = player.hp
if player.hand_count <= max_hand:
return f"✅ 手牌数({player.hand_count})未超过体力值({max_hand}),无需弃牌\n\n💡 使用 `.sgs next` 结束回合"
# 简化处理:自动弃掉多余的牌
discard_count = player.hand_count - max_hand
discarded = player.hand_cards[:discard_count]
player.hand_cards = player.hand_cards[discard_count:]
game.deck.discard(discarded)
return f"✅ 弃置了 {discard_count} 张牌\n\n💡 使用 `.sgs next` 结束回合"
async def _handle_cancel(self, chat_id: int, user_id: int) -> str:
"""取消游戏"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if user_id != game.host_id:
return "❌ 只有房主可以取消游戏"
self.manager.remove_game(chat_id)
return "✅ 游戏已取消"
async def _handle_stats(self, user_id: int) -> str:
"""查看战绩"""
stats = self.db.get_game_stats(user_id, 'sanguosha')
if stats['total_plays'] == 0:
return "三国杀游戏记录"
win_rate = (stats['wins'] / stats['total_plays'] * 100) if stats['total_plays'] > 0 else 0
result ="战绩\n\n"
result += f"- 总局数: {stats['total_plays']}\n"
result += f"- 胜利: {stats['wins']}\n"
result += f"- 失败: {stats['losses']}\n"
result += f"- 胜率: {win_rate:.1f}%\n"
return result
async def _handle_game_over(self, game: GameState) -> str:
"""处理游戏结束"""
result = "## 🎉 游戏结束!\n\n"
if game.winner_role == Role.LORD:
result += "## 🎉公和忠臣获胜!\n\n"
winners = [p for p in game.players if p.role in [Role.LORD, Role.LOYAL]]
losers = [p for p in game.players if p.role in [Role.REBEL, Role.SPY]]
else:
result += "### ⚔️ 反贼获胜!\n\n"
winners = [p for p in game.players if p.role == Role.REBEL]
losers = [p for p in game.players if p.role in [Role.LORD, Role.LOYAL, Role.SPY]]
result += "**获胜方**:\n"
for player in winners:
result += f"- {player.username} ({player.role.value}) - {player.general.name}\n"
# 更新战绩
self.db.update_game_stats(player.user_id, 'sanguosha', win=True)
result += "\n**失败方**:\n"
for player in losers:
result += f"- {player.username} ({player.role.value}) - {player.general.name}\n"
# 更新战绩
self.db.update_game_stats(player.user_id, 'sanguosha', loss=True)
result += f"\n游戏时长: {game.round_number} 回合"
# 清理游戏
self.manager.remove_game(game.chat_id)
return result
def get_help(self) -> str:
"""获取帮助信息"""
return """## 🎮 三国杀游戏帮助
### 游戏准备
- `.sgs create` - 创建游戏房间
- `.sgs join` - 加入游戏
- `.sgs leave` - 离开游戏
- `.sgs start` - 开始游戏(房主)
- `.sgs cancel` - 取消游戏(房主)
### 游戏中
- `.sgs status` - 查看游戏状态
- `.sgs players` - 查看玩家列表
- `.sgs hand` - 查看手牌
- `.sgs next` - 进入下一阶段
- `.sgs draw` - 摸牌(摸牌阶段)
- `.sgs play 卡牌名` - 出牌(出牌阶段)
- `.sgs discard` - 弃牌(弃牌阶段)
### 其他
- `.sgs stats` - 查看个人战绩
- `.sgs help` - 显示帮助
### 游戏规则
1. **身份**: 主公、忠臣、反贼、内奸
2. **胜利条件**:
- 主公+忠臣: 消灭所有反贼和内奸
- 反贼: 击杀主公
- 内奸: 成为最后存活的人
3. **回合流程**:
- 准备阶段 → 判定阶段 → 摸牌阶段 → 出牌阶段 → 弃牌阶段 → 结束阶段
### 卡牌类型
- **基本牌**: 杀、闪、桃
- **锦囊牌**: 决斗、过河拆桥、顺手牵羊、南蛮入侵、万箭齐发等
- **装备牌**: 武器、防具、马
---
💡 提示:游戏支持 2-8 人,建议 5-8 人游戏体验最佳
"""

529
games/sgs_core.py Normal file
View File

@@ -0,0 +1,529 @@
"""三国杀游戏核心逻辑模块"""
import logging
from typing import List, Dict, Optional, Set
from enum import Enum
from dataclasses import dataclass, field
import random
logger = logging.getLogger(__name__)
class CardType(Enum):
"""卡牌类型"""
BASIC = "基本牌"
TRICK = "锦囊牌"
EQUIPMENT = "装备牌"
class CardSuit(Enum):
"""卡牌花色"""
SPADE = "" # 黑桃
HEART = "" # 红桃
CLUB = "" # 梅花
DIAMOND = "" # 方块
class CardColor(Enum):
"""卡牌颜色"""
RED = "红色"
BLACK = "黑色"
class Role(Enum):
"""角色身份"""
LORD = "主公"
LOYAL = "忠臣"
REBEL = "反贼"
SPY = "内奸"
class Phase(Enum):
"""回合阶段"""
PREPARE = "准备阶段"
JUDGE = "判定阶段"
DRAW = "摸牌阶段"
PLAY = "出牌阶段"
DISCARD = "弃牌阶段"
END = "结束阶段"
@dataclass
class Card:
"""卡牌"""
name: str # 卡牌名称
card_type: CardType # 卡牌类型
suit: CardSuit # 花色
number: int # 点数 (1-13)
description: str = "" # 描述
@property
def color(self) -> CardColor:
"""获取卡牌颜色"""
if self.suit in [CardSuit.HEART, CardSuit.DIAMOND]:
return CardColor.RED
return CardColor.BLACK
def __str__(self) -> str:
"""字符串表示"""
return f"{self.suit.value}{self.number} {self.name}"
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"name": self.name,
"type": self.card_type.value,
"suit": self.suit.value,
"number": self.number,
"color": self.color.value,
"description": self.description
}
@dataclass
class Skill:
"""技能"""
name: str # 技能名称
description: str # 技能描述
skill_type: str = "主动" # 技能类型: 主动/锁定/限定/觉醒
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"name": self.name,
"description": self.description,
"type": self.skill_type
}
@dataclass
class General:
"""武将"""
name: str # 武将名称
max_hp: int # 体力上限
skills: List[Skill] # 技能列表
kingdom: str = "" # 势力
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"name": self.name,
"max_hp": self.max_hp,
"kingdom": self.kingdom,
"skills": [skill.to_dict() for skill in self.skills]
}
@dataclass
class Player:
"""玩家"""
user_id: int # 用户ID
username: str # 用户名
general: Optional[General] = None # 武将
role: Optional[Role] = None # 身份
hp: int = 0 # 当前体力
hand_cards: List[Card] = field(default_factory=list) # 手牌
equipment: Dict[str, Card] = field(default_factory=dict) # 装备区
judge_area: List[Card] = field(default_factory=list) # 判定区
is_alive: bool = True # 是否存活
is_chained: bool = False # 是否横置
def __post_init__(self):
"""初始化后处理"""
if self.general and self.hp == 0:
self.hp = self.general.max_hp
@property
def hand_count(self) -> int:
"""手牌数量"""
return len(self.hand_cards)
@property
def attack_range(self) -> int:
"""攻击距离"""
weapon = self.equipment.get("武器")
if weapon:
# 根据武器名称返回距离
weapon_ranges = {
"诸葛连弩": 1,
"青釭剑": 2,
"雌雄双股剑": 2,
"青龙偃月刀": 3,
"丈八蛇矛": 3,
"方天画戟": 4,
"麒麟弓": 5
}
return weapon_ranges.get(weapon.name, 1)
return 1
def add_card(self, card: Card):
"""添加手牌"""
self.hand_cards.append(card)
def remove_card(self, card: Card) -> bool:
"""移除手牌"""
if card in self.hand_cards:
self.hand_cards.remove(card)
return True
return False
def take_damage(self, damage: int) -> bool:
"""受到伤害
Returns:
是否死亡
"""
self.hp -= damage
if self.hp <= 0:
self.is_alive = False
return True
return False
def recover(self, amount: int):
"""回复体力"""
if self.general:
self.hp = min(self.hp + amount, self.general.max_hp)
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"user_id": self.user_id,
"username": self.username,
"general": self.general.to_dict() if self.general else None,
"role": self.role.value if self.role else None,
"hp": self.hp,
"max_hp": self.general.max_hp if self.general else 0,
"hand_count": self.hand_count,
"equipment": {k: v.to_dict() for k, v in self.equipment.items()},
"is_alive": self.is_alive,
"is_chained": self.is_chained
}
class CardDeck:
"""牌堆"""
def __init__(self):
"""初始化牌堆"""
self.cards: List[Card] = []
self.discard_pile: List[Card] = []
self._init_standard_deck()
def _init_standard_deck(self):
"""初始化标准牌堆(简化版)"""
# 杀 (30张)
for suit, numbers in [
(CardSuit.SPADE, [7, 8, 8, 9, 9, 10, 10]),
(CardSuit.CLUB, [2, 3, 4, 5, 6, 7, 8, 8, 9, 9, 10, 10, 11]),
(CardSuit.HEART, [10, 10, 11]),
(CardSuit.DIAMOND, [6, 7, 8, 9, 10, 13])
]:
for num in numbers:
self.cards.append(Card("", CardType.BASIC, suit, num, "对攻击范围内的一名角色造成1点伤害"))
# 闪 (15张)
for suit, numbers in [
(CardSuit.HEART, [2, 2, 13]),
(CardSuit.DIAMOND, [2, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 11])
]:
for num in numbers:
self.cards.append(Card("", CardType.BASIC, suit, num, "抵消一张【杀】的效果"))
# 桃 (8张)
for suit, numbers in [
(CardSuit.HEART, [3, 4, 6, 7, 8, 9, 12]),
(CardSuit.DIAMOND, [12])
]:
for num in numbers:
self.cards.append(Card("", CardType.BASIC, suit, num, "回复1点体力"))
# 锦囊牌
# 无懈可击 (3张)
for suit, num in [(CardSuit.SPADE, 11), (CardSuit.CLUB, 12), (CardSuit.CLUB, 13)]:
self.cards.append(Card("无懈可击", CardType.TRICK, suit, num, "抵消一张锦囊牌的效果"))
# 决斗 (3张)
for suit, num in [(CardSuit.SPADE, 1), (CardSuit.CLUB, 1), (CardSuit.DIAMOND, 1)]:
self.cards.append(Card("决斗", CardType.TRICK, suit, num, "与目标角色拼点失败者受到1点伤害"))
# 过河拆桥 (6张)
for suit, numbers in [
(CardSuit.SPADE, [3, 4, 12]),
(CardSuit.CLUB, [3, 4]),
(CardSuit.HEART, [12])
]:
for num in numbers:
self.cards.append(Card("过河拆桥", CardType.TRICK, suit, num, "弃置目标角色的一张牌"))
# 顺手牵羊 (5张)
for suit, numbers in [
(CardSuit.SPADE, [3, 4, 11]),
(CardSuit.DIAMOND, [3, 4])
]:
for num in numbers:
self.cards.append(Card("顺手牵羊", CardType.TRICK, suit, num, "获得目标角色的一张牌"))
# 南蛮入侵 (3张)
for suit, num in [(CardSuit.SPADE, 7), (CardSuit.SPADE, 13), (CardSuit.CLUB, 7)]:
self.cards.append(Card("南蛮入侵", CardType.TRICK, suit, num, "所有其他角色需打出【杀】否则受到1点伤害"))
# 万箭齐发 (1张)
self.cards.append(Card("万箭齐发", CardType.TRICK, CardSuit.HEART, 1, "所有其他角色需打出【闪】否则受到1点伤害"))
# 桃园结义 (1张)
self.cards.append(Card("桃园结义", CardType.TRICK, CardSuit.HEART, 1, "所有角色回复1点体力"))
# 五谷丰登 (2张)
for num in [3, 4]:
self.cards.append(Card("五谷丰登", CardType.TRICK, CardSuit.HEART, num, "所有角色依次获得一张牌"))
# 装备牌(简化版,只添加几种)
# 诸葛连弩
self.cards.append(Card("诸葛连弩", CardType.EQUIPMENT, CardSuit.CLUB, 1, "武器攻击范围1出牌阶段可以使用任意张【杀】"))
self.cards.append(Card("诸葛连弩", CardType.EQUIPMENT, CardSuit.DIAMOND, 1, "武器攻击范围1出牌阶段可以使用任意张【杀】"))
# 青釭剑
self.cards.append(Card("青釭剑", CardType.EQUIPMENT, CardSuit.SPADE, 6, "武器攻击范围2无视目标防具"))
# 八卦阵
self.cards.append(Card("八卦阵", CardType.EQUIPMENT, CardSuit.SPADE, 2, "防具,判定为红色时视为使用了【闪】"))
self.cards.append(Card("八卦阵", CardType.EQUIPMENT, CardSuit.CLUB, 2, "防具,判定为红色时视为使用了【闪】"))
# 的卢
self.cards.append(Card("的卢", CardType.EQUIPMENT, CardSuit.CLUB, 5, "+1马其他角色计算与你的距离+1"))
# 赤兔
self.cards.append(Card("赤兔", CardType.EQUIPMENT, CardSuit.HEART, 5, "-1马你计算与其他角色的距离-1"))
# 洗牌
self.shuffle()
def shuffle(self):
"""洗牌"""
random.shuffle(self.cards)
logger.info(f"牌堆已洗牌,共 {len(self.cards)} 张牌")
def draw(self, count: int = 1) -> List[Card]:
"""摸牌
Args:
count: 摸牌数量
Returns:
摸到的牌列表
"""
if len(self.cards) < count:
# 牌不够,将弃牌堆洗入牌堆
self.cards.extend(self.discard_pile)
self.discard_pile.clear()
self.shuffle()
drawn = self.cards[:count]
self.cards = self.cards[count:]
return drawn
def discard(self, cards: List[Card]):
"""弃牌"""
self.discard_pile.extend(cards)
class GeneralPool:
"""武将池"""
def __init__(self):
"""初始化武将池"""
self.generals: List[General] = []
self._init_standard_generals()
def _init_standard_generals(self):
"""初始化标准武将(简化版)"""
# 刘备
self.generals.append(General(
name="刘备",
max_hp=4,
kingdom="",
skills=[
Skill("仁德", "出牌阶段你可以将任意张手牌交给其他角色若你给出的牌达到两张或更多你回复1点体力", "主动"),
Skill("激将", "主公技,当你需要使用或打出【杀】时,你可以令其他蜀势力角色打出一张【杀】(视为由你使用或打出)", "主动")
]
))
# 关羽
self.generals.append(General(
name="关羽",
max_hp=4,
kingdom="",
skills=[
Skill("武圣", "你可以将一张红色牌当【杀】使用或打出", "主动")
]
))
# 张飞
self.generals.append(General(
name="张飞",
max_hp=4,
kingdom="",
skills=[
Skill("咆哮", "锁定技,出牌阶段,你使用【杀】无次数限制", "锁定")
]
))
# 诸葛亮
self.generals.append(General(
name="诸葛亮",
max_hp=3,
kingdom="",
skills=[
Skill("观星", "准备阶段你可以观看牌堆顶的X张牌X为存活角色数且至多为5将任意数量的牌置于牌堆顶其余的牌置于牌堆底", "主动"),
Skill("空城", "锁定技,当你没有手牌时,你不能成为【杀】或【决斗】的目标", "锁定")
]
))
# 赵云
self.generals.append(General(
name="赵云",
max_hp=4,
kingdom="",
skills=[
Skill("龙胆", "你可以将【杀】当【闪】、【闪】当【杀】使用或打出", "主动")
]
))
# 曹操
self.generals.append(General(
name="曹操",
max_hp=4,
kingdom="",
skills=[
Skill("奸雄", "当你受到伤害后,你可以获得对你造成伤害的牌", "主动"),
Skill("护驾", "主公技,当你需要使用或打出【闪】时,你可以令其他魏势力角色打出一张【闪】(视为由你使用或打出)", "主动")
]
))
# 司马懿
self.generals.append(General(
name="司马懿",
max_hp=3,
kingdom="",
skills=[
Skill("反馈", "当你受到1点伤害后你可以获得伤害来源的一张牌", "主动"),
Skill("鬼才", "在任意角色的判定牌生效前,你可以打出一张手牌代替之", "主动")
]
))
# 夏侯惇
self.generals.append(General(
name="夏侯惇",
max_hp=4,
kingdom="",
skills=[
Skill("刚烈", "当你受到伤害后你可以进行判定若结果不为♥则伤害来源选择一项弃置两张手牌或受到你造成的1点伤害", "主动")
]
))
# 甄姬
self.generals.append(General(
name="甄姬",
max_hp=3,
kingdom="",
skills=[
Skill("洛神", "准备阶段,你可以进行判定:当黑色判定牌生效后,你获得之。若结果为黑色,你可以重复此流程", "主动"),
Skill("倾国", "你可以将一张黑色手牌当【闪】使用或打出", "主动")
]
))
# 孙权
self.generals.append(General(
name="孙权",
max_hp=4,
kingdom="",
skills=[
Skill("制衡", "出牌阶段限一次,你可以弃置任意张牌,然后摸等量的牌", "主动"),
Skill("救援", "主公技,锁定技,其他吴势力角色对你使用【桃】时,该角色摸一张牌", "锁定")
]
))
# 周瑜
self.generals.append(General(
name="周瑜",
max_hp=3,
kingdom="",
skills=[
Skill("英姿", "摸牌阶段,你可以额外摸一张牌", "主动"),
Skill("反间", "出牌阶段限一次你可以令一名其他角色选择一种花色然后该角色获得你的一张手牌并展示之若此牌与所选花色不同你对其造成1点伤害", "主动")
]
))
# 吕蒙
self.generals.append(General(
name="吕蒙",
max_hp=4,
kingdom="",
skills=[
Skill("克己", "若你于出牌阶段内没有使用或打出过【杀】,你可以跳过此回合的弃牌阶段", "主动")
]
))
# 黄盖
self.generals.append(General(
name="黄盖",
max_hp=4,
kingdom="",
skills=[
Skill("苦肉", "出牌阶段你可以失去1点体力然后摸两张牌", "主动")
]
))
# 吕布
self.generals.append(General(
name="吕布",
max_hp=4,
kingdom="",
skills=[
Skill("无双", "锁定技,当你使用【杀】指定一个目标后,该角色需依次使用两张【闪】才能抵消此【杀】;当你使用【决斗】指定一个目标后,该角色每次响应此【决斗】需依次打出两张【杀】", "锁定")
]
))
# 貂蝉
self.generals.append(General(
name="貂蝉",
max_hp=3,
kingdom="",
skills=[
Skill("离间", "出牌阶段限一次,你可以弃置一张牌并选择两名男性角色,后选择的角色视为对先选择的角色使用一张【决斗】(此【决斗】不能被【无懈可击】响应)", "主动"),
Skill("闭月", "结束阶段,你可以摸一张牌", "主动")
]
))
# 华佗
self.generals.append(General(
name="华佗",
max_hp=3,
kingdom="",
skills=[
Skill("急救", "你的回合外,你可以将一张红色牌当【桃】使用", "主动"),
Skill("青囊", "出牌阶段限一次你可以弃置一张手牌并令一名角色回复1点体力", "主动")
]
))
def get_random_generals(self, count: int, exclude: List[str] = None) -> List[General]:
"""随机获取武将
Args:
count: 数量
exclude: 排除的武将名称列表
Returns:
武将列表
"""
available = [g for g in self.generals if not exclude or g.name not in exclude]
if len(available) < count:
return available
return random.sample(available, count)
def get_general_by_name(self, name: str) -> Optional[General]:
"""根据名称获取武将"""
for general in self.generals:
if general.name == name:
return general
return None

337
games/sgs_game.py Normal file
View File

@@ -0,0 +1,337 @@
"""三国杀游戏状态管理"""
import logging
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, field
import random
import time
from games.sgs_core import (
Player, Card, CardDeck, GeneralPool, Role, Phase,
CardType, CardSuit, General
)
logger = logging.getLogger(__name__)
@dataclass
class GameState:
"""游戏状态"""
game_id: str # 游戏ID
chat_id: int # 会话ID
host_id: int # 房主ID
players: List[Player] = field(default_factory=list) # 玩家列表
deck: CardDeck = field(default_factory=CardDeck) # 牌堆
current_player_idx: int = 0 # 当前玩家索引
current_phase: Phase = Phase.PREPARE # 当前阶段
round_number: int = 1 # 回合数
is_started: bool = False # 是否已开始
is_finished: bool = False # 是否已结束
winner_role: Optional[Role] = None # 获胜身份
created_at: int = field(default_factory=lambda: int(time.time()))
# 游戏配置
max_players: int = 8 # 最大玩家数
min_players: int = 2 # 最小玩家数简化版标准是5人
# 临时状态
pending_action: Optional[Dict] = None # 等待的操作
action_queue: List[Dict] = field(default_factory=list) # 操作队列
@property
def current_player(self) -> Optional[Player]:
"""获取当前玩家"""
if 0 <= self.current_player_idx < len(self.players):
return self.players[self.current_player_idx]
return None
@property
def alive_players(self) -> List[Player]:
"""获取存活玩家"""
return [p for p in self.players if p.is_alive]
@property
def lord_player(self) -> Optional[Player]:
"""获取主公"""
for player in self.players:
if player.role == Role.LORD:
return player
return None
def get_player_by_id(self, user_id: int) -> Optional[Player]:
"""根据用户ID获取玩家"""
for player in self.players:
if player.user_id == user_id:
return player
return None
def get_next_alive_player(self, from_idx: int = None) -> Optional[Player]:
"""获取下一个存活玩家"""
if from_idx is None:
from_idx = self.current_player_idx
idx = (from_idx + 1) % len(self.players)
checked = 0
while checked < len(self.players):
if self.players[idx].is_alive:
return self.players[idx]
idx = (idx + 1) % len(self.players)
checked += 1
return None
def add_player(self, user_id: int, username: str) -> bool:
"""添加玩家
Returns:
是否成功
"""
if self.is_started:
return False
if len(self.players) >= self.max_players:
return False
if self.get_player_by_id(user_id):
return False
player = Player(user_id=user_id, username=username)
self.players.append(player)
logger.info(f"玩家 {username} 加入游戏")
return True
def remove_player(self, user_id: int) -> bool:
"""移除玩家(仅限游戏未开始时)"""
if self.is_started:
return False
player = self.get_player_by_id(user_id)
if player:
self.players.remove(player)
logger.info(f"玩家 {player.username} 离开游戏")
return True
return False
def start_game(self) -> Tuple[bool, str]:
"""开始游戏
Returns:
(是否成功, 消息)
"""
if self.is_started:
return False, "游戏已经开始"
if len(self.players) < self.min_players:
return False, f"人数不足,至少需要 {self.min_players}"
if len(self.players) > self.max_players:
return False, f"人数过多,最多 {self.max_players}"
# 分配身份
self._assign_roles()
# 选择武将
self._assign_generals()
# 初始化体力
for player in self.players:
if player.general:
player.hp = player.general.max_hp
# 主公额外+1体力上限
if player.role == Role.LORD:
player.hp += 1
if player.general:
player.general.max_hp += 1
# 发初始手牌
self._deal_initial_cards()
# 确定起始玩家(主公先手)
for idx, player in enumerate(self.players):
if player.role == Role.LORD:
self.current_player_idx = idx
break
self.is_started = True
self.current_phase = Phase.PREPARE
logger.info(f"游戏开始,共 {len(self.players)} 名玩家")
return True, "游戏开始!"
def _assign_roles(self):
"""分配身份"""
player_count = len(self.players)
# 根据人数分配身份
role_distribution = {
2: [Role.LORD, Role.REBEL],
3: [Role.LORD, Role.REBEL, Role.SPY],
4: [Role.LORD, Role.LOYAL, Role.REBEL, Role.SPY],
5: [Role.LORD, Role.LOYAL, Role.REBEL, Role.REBEL, Role.SPY],
6: [Role.LORD, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY],
7: [Role.LORD, Role.LOYAL, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY],
8: [Role.LORD, Role.LOYAL, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY]
}
roles = role_distribution.get(player_count, [Role.LORD] + [Role.REBEL] * (player_count - 1))
random.shuffle(roles)
for player, role in zip(self.players, roles):
player.role = role
logger.info(f"玩家 {player.username} 的身份是 {role.value}")
def _assign_generals(self):
"""分配武将"""
general_pool = GeneralPool()
# 主公先选
lord = self.lord_player
if lord:
# 主公从3个武将中选择这里简化为随机
lord_options = general_pool.get_random_generals(3)
lord.general = lord_options[0] # 简化:直接选第一个
logger.info(f"主公 {lord.username} 选择了武将 {lord.general.name}")
# 其他玩家随机分配
used_generals = [lord.general.name] if lord and lord.general else []
for player in self.players:
if player.role != Role.LORD:
generals = general_pool.get_random_generals(1, exclude=used_generals)
if generals:
player.general = generals[0]
used_generals.append(player.general.name)
logger.info(f"玩家 {player.username} 获得武将 {player.general.name}")
def _deal_initial_cards(self):
"""发初始手牌"""
for player in self.players:
# 每人发4张手牌
cards = self.deck.draw(4)
player.hand_cards.extend(cards)
logger.info(f"玩家 {player.username} 获得 {len(cards)} 张初始手牌")
def next_phase(self) -> Tuple[Phase, Optional[Player]]:
"""进入下一阶段
Returns:
(新阶段, 当前玩家)
"""
current = self.current_player
if not current:
return self.current_phase, None
# 阶段流转
phase_order = [
Phase.PREPARE,
Phase.JUDGE,
Phase.DRAW,
Phase.PLAY,
Phase.DISCARD,
Phase.END
]
current_idx = phase_order.index(self.current_phase)
if current_idx < len(phase_order) - 1:
# 进入下一阶段
self.current_phase = phase_order[current_idx + 1]
else:
# 回合结束,轮到下一个玩家
next_player = self.get_next_alive_player()
if next_player:
for idx, p in enumerate(self.players):
if p.user_id == next_player.user_id:
self.current_player_idx = idx
break
self.current_phase = Phase.PREPARE
self.round_number += 1
return self.current_phase, self.current_player
def check_game_over(self) -> Tuple[bool, Optional[Role]]:
"""检查游戏是否结束
Returns:
(是否结束, 获胜身份)
"""
lord = self.lord_player
# 主公死亡
if not lord or not lord.is_alive:
# 反贼胜利
return True, Role.REBEL
# 检查是否所有反贼和内奸都死亡
rebels_alive = any(p.is_alive and p.role == Role.REBEL for p in self.players)
spies_alive = any(p.is_alive and p.role == Role.SPY for p in self.players)
if not rebels_alive and not spies_alive:
# 主公和忠臣胜利
return True, Role.LORD
return False, None
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"game_id": self.game_id,
"chat_id": self.chat_id,
"host_id": self.host_id,
"players": [p.to_dict() for p in self.players],
"current_player_idx": self.current_player_idx,
"current_phase": self.current_phase.value,
"round_number": self.round_number,
"is_started": self.is_started,
"is_finished": self.is_finished,
"winner_role": self.winner_role.value if self.winner_role else None,
"created_at": self.created_at
}
class GameManager:
"""游戏管理器"""
def __init__(self):
"""初始化"""
self.games: Dict[int, GameState] = {} # chat_id -> GameState
def create_game(self, chat_id: int, host_id: int, host_name: str) -> GameState:
"""创建游戏"""
game_id = f"sgs_{chat_id}_{int(time.time())}"
game = GameState(
game_id=game_id,
chat_id=chat_id,
host_id=host_id
)
game.add_player(host_id, host_name)
self.games[chat_id] = game
logger.info(f"创建游戏: {game_id}")
return game
def get_game(self, chat_id: int) -> Optional[GameState]:
"""获取游戏"""
return self.games.get(chat_id)
def remove_game(self, chat_id: int):
"""移除游戏"""
if chat_id in self.games:
del self.games[chat_id]
logger.info(f"移除游戏: chat_id={chat_id}")
def has_active_game(self, chat_id: int) -> bool:
"""是否有活跃游戏"""
game = self.get_game(chat_id)
return game is not None and not game.is_finished
# 全局游戏管理器
_game_manager: Optional[GameManager] = None
def get_game_manager() -> GameManager:
"""获取全局游戏管理器"""
global _game_manager
if _game_manager is None:
_game_manager = GameManager()
return _game_manager

View File

@@ -168,6 +168,12 @@ async def handle_command(game_type: str, command: str,
from games.gomoku import GomokuGame from games.gomoku import GomokuGame
game = GomokuGame() game = GomokuGame()
return await game.handle(command, chat_id, user_id) return await game.handle(command, chat_id, user_id)
# 三国杀
if game_type == 'sanguosha':
from games.sanguosha import SanguoshaGame
game = SanguoshaGame()
return await game.handle(command, chat_id, user_id)
# 积分系统 # 积分系统
if game_type == 'points': if game_type == 'points':

View File

@@ -43,6 +43,10 @@ class CommandParser:
'.gomoku': 'gomoku', '.gomoku': 'gomoku',
'.五子棋': 'gomoku', '.五子棋': 'gomoku',
'.gobang': 'gomoku', '.gobang': 'gomoku',
# 三国杀
'.sgs': 'sanguosha',
'.三国杀': 'sanguosha',
# 积分系统 # 积分系统
'.points': 'points', '.points': 'points',