"""五子棋游戏""" import time import re import logging from typing import Optional, Dict, Any from games.base import BaseGame from games import gomoku_logic as logic from utils.parser import CommandParser from config import GAME_CONFIG logger = logging.getLogger(__name__) class GomokuGame(BaseGame): """五子棋游戏""" def __init__(self): """初始化游戏""" super().__init__() self.config = GAME_CONFIG.get('gomoku', {}) self.max_concurrent_games = self.config.get('max_concurrent_games', 5) self.board_size = self.config.get('board_size', 15) async def handle(self, command: str, chat_id: int, user_id: int) -> str: """处理五子棋指令 Args: command: 指令,如 ".gomoku @对手" 或 ".gomoku A1" chat_id: 会话ID user_id: 用户ID Returns: 回复消息 """ try: # 提取参数 _, args = CommandParser.extract_command_args(command) args = args.strip() # 调试日志 logger.info(f"五子棋指令解析 - command: {command}") logger.info(f"五子棋指令解析 - args: {args}") # 没有参数,显示帮助 if not args: return self.get_help() # 解析参数 parts = args.split(maxsplit=1) action = parts[0].lower() logger.info(f"五子棋指令解析 - action: {action}") # 帮助 if action in ['help', '帮助']: return self.get_help() # 发起挑战 if action in ['challenge', 'start', '挑战', '开始']: return self._create_challenge(chat_id, user_id) # 接受挑战 if action in ['accept', 'join', '接受', '加入']: return self._accept_challenge(chat_id, user_id) # 取消挑战 if action in ['cancel', '取消']: return self._cancel_challenge(chat_id, user_id) # 列出所有对战 if action in ['list', '列表', '查看']: return self._list_games(chat_id) # 查看战绩 if action in ['stats', '战绩', '统计']: return self._get_stats(user_id) # 显示棋盘 if action in ['show', '显示', '棋盘']: return self._show_board(chat_id, user_id) # 认输 if action in ['resign', '认输', '投降']: return self._resign(chat_id, user_id) # 尝试解析为坐标(落子) coord = logic.parse_coord(action) if coord is not None: return self._make_move(chat_id, user_id, action) # 未识别的指令 logger.warning(f"五子棋未识别的指令 - args: {args}") return f"❌ 未识别的指令:{args}\n\n💡 提示:\n- 发起挑战:`.gomoku challenge`\n- 接受挑战:`.gomoku accept`\n- 落子:`.gomoku A1`\n- 查看帮助:`.gomoku help`" except Exception as e: logger.error(f"处理五子棋指令错误: {e}", exc_info=True) return f"❌ 处理指令出错: {str(e)}" def _get_game_pool(self, chat_id: int) -> Dict[str, Any]: """获取游戏池 Args: chat_id: 会话ID Returns: 游戏池数据 """ state = self.db.get_game_state(chat_id, 0, 'gomoku') if state: return state['state_data'] else: return { "games": [], "max_concurrent_games": self.max_concurrent_games } def _save_game_pool(self, chat_id: int, pool_data: Dict[str, Any]): """保存游戏池 Args: chat_id: 会话ID pool_data: 游戏池数据 """ self.db.save_game_state(chat_id, 0, 'gomoku', pool_data) def _find_user_game(self, chat_id: int, user_id: int) -> Optional[Dict[str, Any]]: """查找用户参与的游戏 Args: chat_id: 会话ID user_id: 用户ID Returns: 游戏数据或None """ pool = self._get_game_pool(chat_id) for game in pool.get("games", []): if game["status"] == "playing": if game["player_black"] == user_id or game["player_white"] == user_id: return game return None def _create_challenge(self, chat_id: int, user_id: int) -> str: """创建挑战 Args: chat_id: 会话ID user_id: 发起者ID Returns: 提示消息 """ # 获取游戏池 pool = self._get_game_pool(chat_id) games = pool.get("games", []) challenges = pool.get("challenges", []) # 检查用户是否已经在对战中 user_game = self._find_user_game(chat_id, user_id) if user_game: return "⚠️ 你已经在对战中!\n\n输入 `.gomoku show` 查看棋盘" # 检查用户是否已经发起了挑战 for challenge in challenges: if challenge["challenger_id"] == user_id: return "⚠️ 你已经发起了一个挑战!\n\n等待其他人接受,或输入 `.gomoku cancel` 取消挑战" # 创建挑战 current_time = int(time.time()) challenge = { "challenger_id": user_id, "created_at": current_time } challenges.append(challenge) pool["challenges"] = challenges self._save_game_pool(chat_id, pool) text = f"## 🎯 五子棋挑战\n\n" text += f" 发起了五子棋挑战!\n\n" text += f"💡 想要应战吗?输入 `.gomoku accept` 接受挑战" return text def _accept_challenge(self, chat_id: int, user_id: int) -> str: """接受挑战 Args: chat_id: 会话ID user_id: 接受者ID Returns: 提示消息 """ # 获取游戏池 pool = self._get_game_pool(chat_id) games = pool.get("games", []) challenges = pool.get("challenges", []) if not challenges: return "⚠️ 当前没有挑战可以接受\n\n输入 `.gomoku challenge` 发起挑战" # 检查用户是否已经在对战中 user_game = self._find_user_game(chat_id, user_id) if user_game: return "⚠️ 你已经在对战中!\n\n输入 `.gomoku show` 查看棋盘" # 获取最新的挑战 challenge = challenges[-1] challenger_id = challenge["challenger_id"] # 不能接受自己的挑战 if challenger_id == user_id: return "❌ 不能接受自己的挑战!" # 检查是否已达到最大并发数 active_games = [g for g in games if g["status"] == "playing"] if len(active_games) >= self.max_concurrent_games: return f"⚠️ 当前聊天已有 {len(active_games)} 局对战,已达到最大并发数限制" # 创建游戏 current_time = int(time.time()) game_id = f"p{challenger_id}_p{user_id}_{current_time}" new_game = { "game_id": game_id, "player_black": challenger_id, # 挑战者执黑(先手) "player_white": user_id, # 接受者执白(后手) "current_player": challenger_id, # 黑方先手 "board": logic.create_empty_board(), "status": "playing", "winner": None, "moves": [], "last_move": None, "created_at": current_time, "updated_at": current_time } games.append(new_game) # 移除已接受的挑战 challenges.remove(challenge) pool["games"] = games pool["challenges"] = challenges self._save_game_pool(chat_id, pool) text = f"## ⚫ 五子棋对战开始!\n\n" text += f"**黑方(先手)**: ⚫\n\n" text += f"**白方(后手)**: ⚪\n\n" text += f"**轮到**: ⚫\n\n" text += "💡 提示:\n" text += "- 黑方有禁手规则(三三、四四、长连禁手)\n" text += "- 输入 `.gomoku A1` 在A1位置落子\n" text += "- 输入 `.gomoku show` 查看棋盘" return text def _cancel_challenge(self, chat_id: int, user_id: int) -> str: """取消挑战 Args: chat_id: 会话ID user_id: 用户ID Returns: 提示消息 """ # 获取游戏池 pool = self._get_game_pool(chat_id) challenges = pool.get("challenges", []) # 查找用户的挑战 user_challenge = None for challenge in challenges: if challenge["challenger_id"] == user_id: user_challenge = challenge break if not user_challenge: return "⚠️ 你没有发起挑战" # 移除挑战 challenges.remove(user_challenge) pool["challenges"] = challenges self._save_game_pool(chat_id, pool) return "✅ 已取消挑战" def _make_move(self, chat_id: int, user_id: int, coord: str) -> str: """落子 Args: chat_id: 会话ID user_id: 用户ID coord: 坐标字符串 Returns: 结果消息 """ # 查找用户的游戏 game = self._find_user_game(chat_id, user_id) if not game: return "⚠️ 你当前没有进行中的对战\n\n输入 `.gomoku @对手` 开始新游戏" # 检查是否轮到该用户 if game["current_player"] != user_id: opponent_id = game["player_white"] if game["player_black"] == user_id else game["player_black"] return f"⚠️ 现在轮到 落子" # 解析坐标 position = logic.parse_coord(coord) if position is None: return f"❌ 无效的坐标:{coord}\n\n坐标格式如:A1, O15" row, col = position # 检查位置是否已有棋子 if game["board"][row][col] != 0: return f"❌ 位置 {coord.upper()} 已有棋子" # 确定当前玩家颜色 player = 1 if game["player_black"] == user_id else 2 player_name = "黑方" if player == 1 else "白方" player_emoji = "⚫" if player == 1 else "⚪" # 检查黑方禁手 if player == 1: is_forbidden, forbidden_type = logic.check_forbidden(game["board"], row, col) if is_forbidden: text = f"## ❌ {forbidden_type}!\n\n" text += f"位置 {coord.upper()} 触发禁手,黑方判负!\n\n" text += f"**获胜者**: ⚪ 白方\n\n" text += f"📊 战绩已更新" # 更新战绩 self.db.update_game_stats(game['player_white'], 'gomoku', win=True) self.db.update_game_stats(game['player_black'], 'gomoku', loss=True) # 移除游戏 pool = self._get_game_pool(chat_id) pool["games"] = [g for g in pool["games"] if g["game_id"] != game["game_id"]] self._save_game_pool(chat_id, pool) return text # 落子 game["board"][row][col] = player game["moves"].append((row, col, player)) game["last_move"] = (row, col) game["updated_at"] = int(time.time()) # 检查是否获胜 if logic.check_win(game["board"], row, col, player): text = f"## 🎉 五连珠!游戏结束!\n\n" text += f"**获胜者**: {player_emoji} {player_name}\n\n" # 渲染棋盘 board_str = logic.render_board(game["board"], game["last_move"]) text += f"```\n{board_str}\n```\n\n" text += f"📊 战绩已更新" # 更新战绩 opponent_id = game["player_white"] if player == 1 else game["player_black"] self.db.update_game_stats(user_id, 'gomoku', win=True) self.db.update_game_stats(opponent_id, 'gomoku', loss=True) # 移除游戏 pool = self._get_game_pool(chat_id) pool["games"] = [g for g in pool["games"] if g["game_id"] != game["game_id"]] self._save_game_pool(chat_id, pool) return text # 切换玩家 opponent_id = game["player_white"] if player == 1 else game["player_black"] game["current_player"] = opponent_id opponent_emoji = "⚪" if player == 1 else "⚫" opponent_name = "白方" if player == 1 else "黑方" # 更新游戏池 pool = self._get_game_pool(chat_id) for i, g in enumerate(pool["games"]): if g["game_id"] == game["game_id"]: pool["games"][i] = game break self._save_game_pool(chat_id, pool) # 渲染棋盘 board_str = logic.render_board(game["board"], game["last_move"]) text = f"## ✅ 落子成功!\n\n" text += f"**位置**:{coord.upper()} {player_emoji}\n\n" text += f"**轮到**: {opponent_emoji} {opponent_name}\n\n" text += f"```\n{board_str}\n```" return text def _show_board(self, chat_id: int, user_id: int) -> str: """显示棋盘 Args: chat_id: 会话ID user_id: 用户ID Returns: 棋盘显示 """ game = self._find_user_game(chat_id, user_id) if not game: return "⚠️ 你当前没有进行中的对战\n\n输入 `.gomoku @对手` 开始新游戏" # 渲染棋盘 board_str = logic.render_board(game["board"], game["last_move"]) # 获取当前玩家信息 current_id = game["current_player"] current_emoji = "⚫" if game["player_black"] == current_id else "⚪" current_name = "黑方" if game["player_black"] == current_id else "白方" text = f"## ⚫ 五子棋对战\n\n" text += f"**黑方**: ⚫\n\n" text += f"**白方**: ⚪\n\n" text += f"**轮到**: {current_emoji} {current_name}\n\n" text += f"**手数**:{len(game['moves'])}\n\n" text += f"```\n{board_str}\n```" return text def _resign(self, chat_id: int, user_id: int) -> str: """认输 Args: chat_id: 会话ID user_id: 用户ID Returns: 结果消息 """ game = self._find_user_game(chat_id, user_id) if not game: return "⚠️ 你当前没有进行中的对战" # 确定胜者 if game["player_black"] == user_id: winner_id = game["player_white"] loser_name = "黑方" winner_emoji = "⚪" else: winner_id = game["player_black"] loser_name = "白方" winner_emoji = "⚫" text = f"## 🏳️ 认输\n\n" text += f" {loser_name} 认输\n\n" text += f"**获胜者**: {winner_emoji}\n\n" text += f"📊 战绩已更新" # 更新战绩 self.db.update_game_stats(winner_id, 'gomoku', win=True) self.db.update_game_stats(user_id, 'gomoku', loss=True) # 移除游戏 pool = self._get_game_pool(chat_id) pool["games"] = [g for g in pool["games"] if g["game_id"] != game["game_id"]] self._save_game_pool(chat_id, pool) return text def _list_games(self, chat_id: int) -> str: """列出所有进行中的游戏 Args: chat_id: 会话ID Returns: 游戏列表 """ pool = self._get_game_pool(chat_id) active_games = [g for g in pool.get("games", []) if g["status"] == "playing"] if not active_games: return "📋 当前没有进行中的对战\n\n输入 `.gomoku @对手` 开始新游戏" text = f"## 📋 进行中的对战 ({len(active_games)}/{self.max_concurrent_games})\n\n" for idx, game in enumerate(active_games, 1): current_emoji = "⚫" if game["player_black"] == game["current_player"] else "⚪" text += f"### {idx}. 对战\n" text += f"- **黑方**: ⚫\n" text += f"- **白方**: ⚪\n" text += f"- **轮到**: {current_emoji}\n" text += f"- **手数**:{len(game['moves'])}\n\n" return text def _get_stats(self, user_id: int) -> str: """获取用户战绩 Args: user_id: 用户ID Returns: 战绩信息 """ stats = self.db.get_game_stats(user_id, 'gomoku') total = stats['total_plays'] if total == 0: return "📊 你还没有五子棋对战记录\n\n快来挑战吧!输入 `.gomoku @对手` 开始游戏" wins = stats['wins'] losses = stats['losses'] win_rate = (wins / total * 100) if total > 0 else 0 text = f"## 📊 五子棋战绩\n\n" text += f"**总局数**:{total} 局\n\n" text += f"**胜利**:{wins} 次 🎉\n\n" text += f"**失败**:{losses} 次\n\n" text += f"**胜率**:{win_rate:.1f}%" return text def get_help(self) -> str: """获取帮助信息""" return """## ⚫ 五子棋 ### 基础用法 - `.gomoku challenge` - 发起挑战 - `.gomoku accept` - 接受挑战 - `.gomoku A1` - 在A1位置落子 - `.gomoku show` - 显示当前棋盘 - `.gomoku resign` - 认输 ### 其他指令 - `.gomoku cancel` - 取消自己的挑战 - `.gomoku list` - 列出所有进行中的对战 - `.gomoku stats` - 查看个人战绩 ### 游戏规则 - 标准15×15棋盘,五子连珠获胜 - 黑方先手,但有禁手规则: - **三三禁手**:一手棋同时形成两个活三 - **四四禁手**:一手棋同时形成两个四(活四或冲四) - **长连禁手**:一手棋形成六子或以上连珠 - 触发禁手者判负 - 允许多轮对战同时进行(对战双方不同即可) ### 坐标系统 - 列:A-O(15列) - 行:1-15(15行) - 示例:A1(左上角)、O15(右下角)、H8(中心) ### 示例 ``` .gomoku challenge # 发起挑战 .gomoku accept # 接受挑战 .gomoku H8 # 在中心位置落子 .gomoku show # 查看棋盘 .gomoku resign # 认输 ``` 💡 提示:黑方虽然先手,但需要注意禁手规则 """