"""五子棋游戏"""
import time
import re
import logging
from typing import Optional, Dict, Any
from games.base import BaseGame
from games import gomoku_logic as logic
from utils.parser import CommandParser
from config import GAME_CONFIG
logger = logging.getLogger(__name__)
class GomokuGame(BaseGame):
"""五子棋游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.config = GAME_CONFIG.get('gomoku', {})
self.max_concurrent_games = self.config.get('max_concurrent_games', 5)
self.board_size = self.config.get('board_size', 15)
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理五子棋指令
Args:
command: 指令,如 ".gomoku @对手" 或 ".gomoku A1"
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip()
# 调试日志
logger.info(f"五子棋指令解析 - command: {command}")
logger.info(f"五子棋指令解析 - args: {args}")
# 没有参数,显示帮助
if not args:
return self.get_help()
# 解析参数
parts = args.split(maxsplit=1)
action = parts[0].lower()
logger.info(f"五子棋指令解析 - action: {action}")
# 帮助
if action in ['help', '帮助']:
return self.get_help()
# 发起挑战
if action in ['challenge', 'start', '挑战', '开始']:
return self._create_challenge(chat_id, user_id)
# 接受挑战
if action in ['accept', 'join', '接受', '加入']:
return self._accept_challenge(chat_id, user_id)
# 取消挑战
if action in ['cancel', '取消']:
return self._cancel_challenge(chat_id, user_id)
# 列出所有对战
if action in ['list', '列表', '查看']:
return self._list_games(chat_id)
# 查看战绩
if action in ['stats', '战绩', '统计']:
return self._get_stats(user_id)
# 显示棋盘
if action in ['show', '显示', '棋盘']:
return self._show_board(chat_id, user_id)
# 认输
if action in ['resign', '认输', '投降']:
return self._resign(chat_id, user_id)
# 尝试解析为坐标(落子)
coord = logic.parse_coord(action)
if coord is not None:
return self._make_move(chat_id, user_id, action)
# 未识别的指令
logger.warning(f"五子棋未识别的指令 - args: {args}")
return f"❌ 未识别的指令:{args}\n\n💡 提示:\n- 发起挑战:`.gomoku challenge`\n- 接受挑战:`.gomoku accept`\n- 落子:`.gomoku A1`\n- 查看帮助:`.gomoku help`"
except Exception as e:
logger.error(f"处理五子棋指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
def _get_game_pool(self, chat_id: int) -> Dict[str, Any]:
"""获取游戏池
Args:
chat_id: 会话ID
Returns:
游戏池数据
"""
state = self.db.get_game_state(chat_id, 0, 'gomoku')
if state:
return state['state_data']
else:
return {
"games": [],
"max_concurrent_games": self.max_concurrent_games
}
def _save_game_pool(self, chat_id: int, pool_data: Dict[str, Any]):
"""保存游戏池
Args:
chat_id: 会话ID
pool_data: 游戏池数据
"""
self.db.save_game_state(chat_id, 0, 'gomoku', pool_data)
def _find_user_game(self, chat_id: int, user_id: int) -> Optional[Dict[str, Any]]:
"""查找用户参与的游戏
Args:
chat_id: 会话ID
user_id: 用户ID
Returns:
游戏数据或None
"""
pool = self._get_game_pool(chat_id)
for game in pool.get("games", []):
if game["status"] == "playing":
if game["player_black"] == user_id or game["player_white"] == user_id:
return game
return None
def _create_challenge(self, chat_id: int, user_id: int) -> str:
"""创建挑战
Args:
chat_id: 会话ID
user_id: 发起者ID
Returns:
提示消息
"""
# 获取游戏池
pool = self._get_game_pool(chat_id)
games = pool.get("games", [])
challenges = pool.get("challenges", [])
# 检查用户是否已经在对战中
user_game = self._find_user_game(chat_id, user_id)
if user_game:
return "⚠️ 你已经在对战中!\n\n输入 `.gomoku show` 查看棋盘"
# 检查用户是否已经发起了挑战
for challenge in challenges:
if challenge["challenger_id"] == user_id:
return "⚠️ 你已经发起了一个挑战!\n\n等待其他人接受,或输入 `.gomoku cancel` 取消挑战"
# 创建挑战
current_time = int(time.time())
challenge = {
"challenger_id": user_id,
"created_at": current_time
}
challenges.append(challenge)
pool["challenges"] = challenges
self._save_game_pool(chat_id, pool)
text = f"## 🎯 五子棋挑战\n\n"
text += f" 发起了五子棋挑战!\n\n"
text += f"💡 想要应战吗?输入 `.gomoku accept` 接受挑战"
return text
def _accept_challenge(self, chat_id: int, user_id: int) -> str:
"""接受挑战
Args:
chat_id: 会话ID
user_id: 接受者ID
Returns:
提示消息
"""
# 获取游戏池
pool = self._get_game_pool(chat_id)
games = pool.get("games", [])
challenges = pool.get("challenges", [])
if not challenges:
return "⚠️ 当前没有挑战可以接受\n\n输入 `.gomoku challenge` 发起挑战"
# 检查用户是否已经在对战中
user_game = self._find_user_game(chat_id, user_id)
if user_game:
return "⚠️ 你已经在对战中!\n\n输入 `.gomoku show` 查看棋盘"
# 获取最新的挑战
challenge = challenges[-1]
challenger_id = challenge["challenger_id"]
# 不能接受自己的挑战
if challenger_id == user_id:
return "❌ 不能接受自己的挑战!"
# 检查是否已达到最大并发数
active_games = [g for g in games if g["status"] == "playing"]
if len(active_games) >= self.max_concurrent_games:
return f"⚠️ 当前聊天已有 {len(active_games)} 局对战,已达到最大并发数限制"
# 创建游戏
current_time = int(time.time())
game_id = f"p{challenger_id}_p{user_id}_{current_time}"
new_game = {
"game_id": game_id,
"player_black": challenger_id, # 挑战者执黑(先手)
"player_white": user_id, # 接受者执白(后手)
"current_player": challenger_id, # 黑方先手
"board": logic.create_empty_board(),
"status": "playing",
"winner": None,
"moves": [],
"last_move": None,
"created_at": current_time,
"updated_at": current_time
}
games.append(new_game)
# 移除已接受的挑战
challenges.remove(challenge)
pool["games"] = games
pool["challenges"] = challenges
self._save_game_pool(chat_id, pool)
text = f"## ⚫ 五子棋对战开始!\n\n"
text += f"**黑方(先手)**: ⚫\n\n"
text += f"**白方(后手)**: ⚪\n\n"
text += f"**轮到**: ⚫\n\n"
text += "💡 提示:\n"
text += "- 黑方有禁手规则(三三、四四、长连禁手)\n"
text += "- 输入 `.gomoku A1` 在A1位置落子\n"
text += "- 输入 `.gomoku show` 查看棋盘"
return text
def _cancel_challenge(self, chat_id: int, user_id: int) -> str:
"""取消挑战
Args:
chat_id: 会话ID
user_id: 用户ID
Returns:
提示消息
"""
# 获取游戏池
pool = self._get_game_pool(chat_id)
challenges = pool.get("challenges", [])
# 查找用户的挑战
user_challenge = None
for challenge in challenges:
if challenge["challenger_id"] == user_id:
user_challenge = challenge
break
if not user_challenge:
return "⚠️ 你没有发起挑战"
# 移除挑战
challenges.remove(user_challenge)
pool["challenges"] = challenges
self._save_game_pool(chat_id, pool)
return "✅ 已取消挑战"
def _make_move(self, chat_id: int, user_id: int, coord: str) -> str:
"""落子
Args:
chat_id: 会话ID
user_id: 用户ID
coord: 坐标字符串
Returns:
结果消息
"""
# 查找用户的游戏
game = self._find_user_game(chat_id, user_id)
if not game:
return "⚠️ 你当前没有进行中的对战\n\n输入 `.gomoku @对手` 开始新游戏"
# 检查是否轮到该用户
if game["current_player"] != user_id:
opponent_id = game["player_white"] if game["player_black"] == user_id else game["player_black"]
return f"⚠️ 现在轮到 落子"
# 解析坐标
position = logic.parse_coord(coord)
if position is None:
return f"❌ 无效的坐标:{coord}\n\n坐标格式如:A1, O15"
row, col = position
# 检查位置是否已有棋子
if game["board"][row][col] != 0:
return f"❌ 位置 {coord.upper()} 已有棋子"
# 确定当前玩家颜色
player = 1 if game["player_black"] == user_id else 2
player_name = "黑方" if player == 1 else "白方"
player_emoji = "⚫" if player == 1 else "⚪"
# 检查黑方禁手
if player == 1:
is_forbidden, forbidden_type = logic.check_forbidden(game["board"], row, col)
if is_forbidden:
text = f"## ❌ {forbidden_type}!\n\n"
text += f"位置 {coord.upper()} 触发禁手,黑方判负!\n\n"
text += f"**获胜者**: ⚪ 白方\n\n"
text += f"📊 战绩已更新"
# 更新战绩
self.db.update_game_stats(game['player_white'], 'gomoku', win=True)
self.db.update_game_stats(game['player_black'], 'gomoku', loss=True)
# 移除游戏
pool = self._get_game_pool(chat_id)
pool["games"] = [g for g in pool["games"] if g["game_id"] != game["game_id"]]
self._save_game_pool(chat_id, pool)
return text
# 落子
game["board"][row][col] = player
game["moves"].append((row, col, player))
game["last_move"] = (row, col)
game["updated_at"] = int(time.time())
# 检查是否获胜
if logic.check_win(game["board"], row, col, player):
text = f"## 🎉 五连珠!游戏结束!\n\n"
text += f"**获胜者**: {player_emoji} {player_name}\n\n"
# 渲染棋盘
board_str = logic.render_board(game["board"], game["last_move"])
text += f"```\n{board_str}\n```\n\n"
text += f"📊 战绩已更新"
# 更新战绩
opponent_id = game["player_white"] if player == 1 else game["player_black"]
self.db.update_game_stats(user_id, 'gomoku', win=True)
self.db.update_game_stats(opponent_id, 'gomoku', loss=True)
# 移除游戏
pool = self._get_game_pool(chat_id)
pool["games"] = [g for g in pool["games"] if g["game_id"] != game["game_id"]]
self._save_game_pool(chat_id, pool)
return text
# 切换玩家
opponent_id = game["player_white"] if player == 1 else game["player_black"]
game["current_player"] = opponent_id
opponent_emoji = "⚪" if player == 1 else "⚫"
opponent_name = "白方" if player == 1 else "黑方"
# 更新游戏池
pool = self._get_game_pool(chat_id)
for i, g in enumerate(pool["games"]):
if g["game_id"] == game["game_id"]:
pool["games"][i] = game
break
self._save_game_pool(chat_id, pool)
# 渲染棋盘
board_str = logic.render_board(game["board"], game["last_move"])
text = f"## ✅ 落子成功!\n\n"
text += f"**位置**:{coord.upper()} {player_emoji}\n\n"
text += f"**轮到**: {opponent_emoji} {opponent_name}\n\n"
text += f"```\n{board_str}\n```"
return text
def _show_board(self, chat_id: int, user_id: int) -> str:
"""显示棋盘
Args:
chat_id: 会话ID
user_id: 用户ID
Returns:
棋盘显示
"""
game = self._find_user_game(chat_id, user_id)
if not game:
return "⚠️ 你当前没有进行中的对战\n\n输入 `.gomoku @对手` 开始新游戏"
# 渲染棋盘
board_str = logic.render_board(game["board"], game["last_move"])
# 获取当前玩家信息
current_id = game["current_player"]
current_emoji = "⚫" if game["player_black"] == current_id else "⚪"
current_name = "黑方" if game["player_black"] == current_id else "白方"
text = f"## ⚫ 五子棋对战\n\n"
text += f"**黑方**: ⚫\n\n"
text += f"**白方**: ⚪\n\n"
text += f"**轮到**: {current_emoji} {current_name}\n\n"
text += f"**手数**:{len(game['moves'])}\n\n"
text += f"```\n{board_str}\n```"
return text
def _resign(self, chat_id: int, user_id: int) -> str:
"""认输
Args:
chat_id: 会话ID
user_id: 用户ID
Returns:
结果消息
"""
game = self._find_user_game(chat_id, user_id)
if not game:
return "⚠️ 你当前没有进行中的对战"
# 确定胜者
if game["player_black"] == user_id:
winner_id = game["player_white"]
loser_name = "黑方"
winner_emoji = "⚪"
else:
winner_id = game["player_black"]
loser_name = "白方"
winner_emoji = "⚫"
text = f"## 🏳️ 认输\n\n"
text += f" {loser_name} 认输\n\n"
text += f"**获胜者**: {winner_emoji}\n\n"
text += f"📊 战绩已更新"
# 更新战绩
self.db.update_game_stats(winner_id, 'gomoku', win=True)
self.db.update_game_stats(user_id, 'gomoku', loss=True)
# 移除游戏
pool = self._get_game_pool(chat_id)
pool["games"] = [g for g in pool["games"] if g["game_id"] != game["game_id"]]
self._save_game_pool(chat_id, pool)
return text
def _list_games(self, chat_id: int) -> str:
"""列出所有进行中的游戏
Args:
chat_id: 会话ID
Returns:
游戏列表
"""
pool = self._get_game_pool(chat_id)
active_games = [g for g in pool.get("games", []) if g["status"] == "playing"]
if not active_games:
return "📋 当前没有进行中的对战\n\n输入 `.gomoku @对手` 开始新游戏"
text = f"## 📋 进行中的对战 ({len(active_games)}/{self.max_concurrent_games})\n\n"
for idx, game in enumerate(active_games, 1):
current_emoji = "⚫" if game["player_black"] == game["current_player"] else "⚪"
text += f"### {idx}. 对战\n"
text += f"- **黑方**: ⚫\n"
text += f"- **白方**: ⚪\n"
text += f"- **轮到**: {current_emoji}\n"
text += f"- **手数**:{len(game['moves'])}\n\n"
return text
def _get_stats(self, user_id: int) -> str:
"""获取用户战绩
Args:
user_id: 用户ID
Returns:
战绩信息
"""
stats = self.db.get_game_stats(user_id, 'gomoku')
total = stats['total_plays']
if total == 0:
return "📊 你还没有五子棋对战记录\n\n快来挑战吧!输入 `.gomoku @对手` 开始游戏"
wins = stats['wins']
losses = stats['losses']
win_rate = (wins / total * 100) if total > 0 else 0
text = f"## 📊 五子棋战绩\n\n"
text += f"**总局数**:{total} 局\n\n"
text += f"**胜利**:{wins} 次 🎉\n\n"
text += f"**失败**:{losses} 次\n\n"
text += f"**胜率**:{win_rate:.1f}%"
return text
def get_help(self) -> str:
"""获取帮助信息"""
return """## ⚫ 五子棋
### 基础用法
- `.gomoku challenge` - 发起挑战
- `.gomoku accept` - 接受挑战
- `.gomoku A1` - 在A1位置落子
- `.gomoku show` - 显示当前棋盘
- `.gomoku resign` - 认输
### 其他指令
- `.gomoku cancel` - 取消自己的挑战
- `.gomoku list` - 列出所有进行中的对战
- `.gomoku stats` - 查看个人战绩
### 游戏规则
- 标准15×15棋盘,五子连珠获胜
- 黑方先手,但有禁手规则:
- **三三禁手**:一手棋同时形成两个活三
- **四四禁手**:一手棋同时形成两个四(活四或冲四)
- **长连禁手**:一手棋形成六子或以上连珠
- 触发禁手者判负
- 允许多轮对战同时进行(对战双方不同即可)
### 坐标系统
- 列:A-O(15列)
- 行:1-15(15行)
- 示例:A1(左上角)、O15(右下角)、H8(中心)
### 示例
```
.gomoku challenge # 发起挑战
.gomoku accept # 接受挑战
.gomoku H8 # 在中心位置落子
.gomoku show # 查看棋盘
.gomoku resign # 认输
```
💡 提示:黑方虽然先手,但需要注意禁手规则
"""