39 Commits

Author SHA1 Message Date
借我清欢与鹤梦
3679d60e0c fix:jion未注册加入问题 2025-10-31 10:25:38 +08:00
借我清欢与鹤梦
96513c6e60 fix:语法 2025-10-30 16:49:46 +08:00
借我清欢与鹤梦
9625fa7808 fix:占位符问题以及缩进问题 2025-10-30 16:33:25 +08:00
借我清欢与鹤梦
131fabeec2 配置文件 2025-10-30 16:24:35 +08:00
借我清欢与鹤梦
86f87b440e 新增三国杀系统 2025-10-30 16:19:02 +08:00
4bddd4339f 发现的加分覆盖签到与长时间任务被清理的错误, 已经在数据库修复中尝试解决 2025-10-30 15:03:00 +08:00
927c16e1fc 新增say指令 2025-10-30 14:27:38 +08:00
8ffd261bb0 1.修改ai返回格式为markdown2.新增冒险放弃指令 2025-10-30 11:57:46 +08:00
3d4f754a0a Merge branch 'main' of http://www.liubai.site:3000/ninemine/WPSBot 2025-10-30 11:05:27 +08:00
0a60a7fc4b 修复指令解析错误 2025-10-30 11:05:08 +08:00
82dc616495 Merge feature: AI瀵硅瘽鍔熻兘瀹炵幇瀹屾垚 2025-10-30 01:11:29 +08:00
e26936acee feat: 瀹屾垚AI瀵硅瘽鍔熻兘骞跺疄鐜扮郴缁熸彁绀鸿瘝鎸佷箙鍖?
- 瀹炵幇鍩轰簬llama_index + Ollama鐨凙I瀵硅瘽鍔熻兘
- 娣诲姞.ai鍜?aiconfig鎸囦护鏀寔
- 瀹炵幇鍥哄畾10绉掔瓑寰呯獥鍙g殑寤惰繜鍥炵瓟鏈哄埗
- 鏀寔澶氱敤鎴峰璇濆拰鐢ㄦ埛瑙掕壊鏄犲皠
- 瀹炵幇闀夸笂涓嬫枃绠$悊锛?0+杞璇濓級
- 绯荤粺鎻愮ず璇嶆寔涔呭寲鍒伴厤缃枃浠?- 浼樺寲閿欒澶勭悊鍜岃皟璇曚俊鎭?- 娣诲姞NPS绔彛杞彂鏀寔璇存槑
2025-10-30 01:10:59 +08:00
643c516baf 提示词增强 2025-10-30 01:05:03 +08:00
e2e039b9b1 修复一些参数错误并提升一些提示内容 2025-10-30 00:53:37 +08:00
a76de359d5 修改配置并取消每次接受消息后的自动回复 2025-10-30 00:35:52 +08:00
b247c57bbe 新增AI会话系统 2025-10-29 23:56:57 +08:00
a893e54166 1.修改命令的判断条件(从startwith转为判定相等)2.修改冒险系统开发任务文件 2025-10-29 17:52:15 +08:00
8eb9f26cfd 新增冒险系统(成功, 编写任务文件) 2025-10-29 17:38:20 +08:00
01248b9092 新增冒险系统 2025-10-29 17:28:02 +08:00
9504c57aaf 新增冒险系统 2025-10-29 17:26:57 +08:00
5742adc2ad 新增冒险系统 2025-10-29 17:23:24 +08:00
9718eb0614 1.存在匹配上的问题, 后期再进行修复, 目前治标 2025-10-29 16:03:08 +08:00
27aee22f62 新增注册系统 2025-10-29 15:49:39 +08:00
37fccb3021 1.数值转整形2.炼金期望调整为1.1左右 2025-10-29 15:30:45 +08:00
218e1f1045 炼金期望调整 2025-10-29 15:15:34 +08:00
b57f1e08ef 修复了积分系统中指令分析的错误 2025-10-29 15:07:49 +08:00
581a516610 尝试修复积分系统 2025-10-29 12:56:57 +08:00
8e883fe5e1 恢复误删的内容 2025-10-29 12:41:56 +08:00
5e2afca960 尝试修复积分系统 2025-10-29 12:36:20 +08:00
0c2638e948 1.尝试修复积分系统2.大幅度简化积分系统 2025-10-29 12:29:18 +08:00
7b72ed9f34 Save 2025-10-29 12:19:25 +08:00
08cc4870ca 修复webhook url指定 2025-10-29 12:16:12 +08:00
c80fb67165 修复webhook url指定 2025-10-29 12:14:50 +08:00
d36bb2de83 修复webhook url指定无效的问题 2025-10-29 12:11:28 +08:00
38e81dbfe6 新增webhook url参数 2025-10-29 12:06:18 +08:00
003ff9a94e 1.尝试修复签到不加分的错误2.移除注册时间显示 2025-10-29 11:51:15 +08:00
7483a00a99 积分系统 2025-10-29 11:32:43 +08:00
c05a8b3578 更新README 2025-10-29 11:04:04 +08:00
1439523253 Merge pull request '合并task/gomoku_2025-10-28_1五子棋游戏' (#1) from task/gomoku_2025-10-28_1 into main
Reviewed-on: #1
2025-10-29 09:32:13 +08:00
42 changed files with 5521 additions and 322 deletions

3
.gitignore vendored
View File

@@ -181,3 +181,6 @@ cython_debug/
.cursorindexingignore
# IDE
.vscode/
# Database
data/bot.db

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

6
.idea/MarsCodeWorkspaceAppSettings.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="com.codeverse.userSettings.MarscodeWorkspaceAppSettingsState">
<option name="progress" value="0.93" />
</component>
</project>

9
.idea/WPSBot.iml generated Normal file
View File

@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.12 (pythonProject)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

7
.idea/codeStyles/Project.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<ScalaCodeStyleSettings>
<option name="MULTILINE_STRING_CLOSING_QUOTES_ON_NEW_LINE" value="true" />
</ScalaCodeStyleSettings>
</code_scheme>
</component>

5
.idea/codeStyles/codeStyleConfig.xml generated Normal file
View File

@@ -0,0 +1,5 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

View File

@@ -0,0 +1,12 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="list.*" />
</list>
</option>
</inspection_tool>
</profile>
</component>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12 (pythonProject)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (pythonProject)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/WPSBot.iml" filepath="$PROJECT_DIR$/.idea/WPSBot.iml" />
</modules>
</component>
</project>

7
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
<mapping directory="$PROJECT_DIR$/Convention" vcs="Git" />
</component>
</project>

View File

@@ -0,0 +1,218 @@
# 背景
文件名2025-10-29_1_add-user-register.md
创建于2025-10-29_15:42:51
创建者admin
主分支main
任务分支main
Yolo模式Off
# 任务描述
在WPS Bot Game项目中添加用户注册系统让用户可以通过 `.register name` 命令将用户ID与名称绑定。
## 核心需求
1. 用户可以通过 `.register name` 命令注册或更新自己的名称
2. 这个名称是全局的,所有游戏和功能都可以使用
3. 积分赠送功能需要支持通过用户名查找用户而不仅仅是用户ID
4. 需要一个全局的用户名称解析机制
## 问题背景
目前消息传递中的用户ID和用户名不一致使用起来非常困难。比如赠送积分时指定用户ID太麻烦了。添加注册系统后可以使用 `.gift username points` 而不是 `.gift 123456 points`
# 项目概览
## 项目结构
```
WPSBotGame/
├── app.py # FastAPI主应用
├── config.py # 配置管理
├── core/
│ ├── database.py # SQLite数据库操作
│ ├── middleware.py # 中间件
│ └── models.py # 数据模型
├── routers/
│ ├── callback.py # Callback路由处理
│ └── health.py # 健康检查
├── games/ # 游戏模块
│ ├── base.py # 游戏基类
│ ├── gift.py # 积分赠送系统
│ └── ... # 其他游戏
└── utils/
├── parser.py # 指令解析
└── message.py # 消息发送
```
# 分析
## 当前状态
1. `users` 表已经有 `username` 字段,但没有被充分利用
2. `get_or_create_user()` 可以接收 username 参数,但实际调用时没有传
3. `gift.py` 目前只能通过用户ID进行赠送
4. 缺少通过用户名查找用户的功能
## 关键技术点
1. **数据库层**: 需要添加根据 username 查找用户的方
2. 需要添加更新用户名的功能
3. **指令解析层**: 需要添加 `.register` 指令的识别
4. **路由层**: 需要添加 register 类型的处理逻辑
5. **应用层**: 需要实现注册逻辑,并修改 gift 功能支持用户名
# 提议的解决方案
## 方案概述
1.`database.py` 中添加两个方法:
- `get_user_by_name(username: str)` - 根据用户名查找用户
- `update_user_name(user_id: int, username: str)` - 更新用户名称
2.`parser.py` 中添加 `.register` 指令映射
3.`callback.py` 中添加 register 类型的处理逻辑
4. 修改 `gift.py` 支持通过用户名赠送积分
5. 创建注册处理逻辑(可以单独文件或集成到 callback
## 设计决策
- 用户名作为额外的查找方式,但不替代 user_id保持数据库主键稳定
- 用户名不强制唯一(允许相同昵称)
- 注册功能独立于游戏模块,放在顶层处理
# 当前执行步骤:"3. 等待用户确认"
# 详细实施计划
## 文件1: core/database.py
### 添加方法1: get_user_by_name()
```python
def get_user_by_name(self, username: str) -> Optional[Dict]:
"""根据用户名查找用户
Args:
username: 用户名
Returns:
用户信息字典如果不存在返回None
"""
```
### 添加方法2: update_user_name()
```python
def update_user_name(self, user_id: int, username: str) -> bool:
"""更新用户名称
Args:
user_id: 用户ID
username: 新用户名
Returns:
是否成功
"""
```
### 添加数据库索引
`init_tables()` 方法中添加:
```sql
CREATE INDEX IF NOT EXISTS idx_username ON users(username)
```
## 文件2: utils/parser.py
### 修改 COMMAND_MAP
在现有的 COMMAND_MAP 中添加:
```python
'.register': 'register',
'.注册': 'register',
```
## 文件3: routers/callback.py
### 在 handle_command() 中添加处理
在现有游戏类型判断后添加:
```python
# 注册系统
if game_type == 'register':
return await handle_register_command(command, chat_id, user_id)
```
### 添加处理函数
```python
async def handle_register_command(command: str, chat_id: int, user_id: int) -> str:
"""处理注册命令
Args:
command: 完整指令 ".register name"
chat_id: 会话ID
user_id: 用户ID
Returns:
注册结果消息
"""
```
## 文件4: games/gift.py
### 修改 _process_gift_command()
修改参数解析逻辑支持用户名和用户ID两种格式
```python
def _process_gift_command(self, args: str, sender_id: int) -> str:
# 尝试解析为用户名或用户ID
parts = args.split(maxsplit=2)
if len(parts) < 2:
return "❌ 指令格式错误..."
# 检查第一部分是用户名还是ID
receiver_input = parts[0]
if receiver_input.isdigit():
# 是数字作为用户ID处理
receiver_id = int(receiver_input)
else:
# 是用户名,通过数据库查找
user = self.db.get_user_by_name(receiver_input)
if not user:
return f"❌ 未找到用户: {receiver_input}"
receiver_id = user['user_id']
# 继续原有的赠送逻辑...
```
### 更新帮助信息
在 _get_gift_help() 中更新示例:
```
".gift username 50 生日快乐"
".gift 123456 50" (仍支持用户ID)
```
# 任务进度
## 2025-10-29_15:42:51
- 已修改:.tasks/2025-10-29_1_add-user-register.md
- 更改:创建任务文件
- 原因:开始实施用户注册系统
- 阻碍因素:无
- 状态:未确认
## 2025-10-29_15:46:38
- 已修改:
- core/database.py添加 get_user_by_name() 和 update_user_name() 方法,添加 username 索引)
- utils/parser.py添加 .register 和 .注册 命令映射)
- routers/callback.py添加 register 命令处理和 handle_register_command() 函数)
- games/gift.py修改 _process_gift_command() 支持用户名查找,更新帮助信息)
- 更改:
- 数据库层:添加用户名称查询和更新功能
- 命令解析:注册 .register 命令映射
- 路由处理:实现用户注册逻辑
- 赠送系统支持通过用户名或ID进行积分赠送
- 原因实现用户注册系统让用户可以使用名称替代用户ID
- 阻碍因素:无
- 状态:未确认
## 2025-10-29_16:00:48
- 已修改utils/parser.py
- 更改:
- 调整 COMMAND_MAP 顺序,将 .register 放在 .r 之前(避免前缀匹配冲突)
- 修复 AT_PATTERN 正则表达式:从 `@\s*\S+\s+(.+)` 改为 `@[^\s]+\s+(.+)`(正确提取@后的完整指令
- 原因:修复 .register 被错误识别为 dice 的问题,以及@前缀处理不完整的问题
- 阻碍因素:无
- 状态:未确认
# 最终审查
[等待实施]

View File

@@ -0,0 +1,241 @@
# 背景
文件名2025-10-29_2_complete-adventure-game.md
创建于2025-10-29_17:31:02
创建者admin
主分支main
任务分支main
Yolo模式Off
# 任务描述
完善冒险游戏的 `_perform_adventure` 函数,实现冒险系统的时间管理和奖励发放功能。完成后将冒险系统关联到炼金游戏检测中,并将该游戏注册到指令系统中。
## 关联影响
本次更新同时也修改了炼金系统(`games/alchemy.py`),添加了冒险状态检测功能,实现游戏互斥机制。由于炼金系统开发时没有创建独立的任务文件,本次修改记录在本任务文件中。如需追踪炼金系统的完整变更历史,可参考本任务文件的"炼金游戏集成"部分。
## 核心需求
1. 完善 `_perform_adventure` 函数,实现三种状态处理:
- 检查用户是否已有未完成的冒险
- 如果冒险已完成,发放奖励并清除状态
- 如果没有冒险状态,开始新的冒险
2. 修复 `_draw_prize` 函数,支持三元组奖品池结构
3. 在炼金游戏中添加冒险状态检测,阻止冒险期间进行炼金
4. 在指令解析器中注册冒险指令(`.adventure``.冒险`
5. 在路由处理器中注册冒险游戏
6. 在帮助系统中添加冒险游戏的帮助信息
# 项目概览
## 项目结构
```
WPSBotGame/
├── games/
│ ├── adventure.py # 冒险系统游戏(本次修改)
│ ├── alchemy.py # 炼金系统游戏(添加冒险检测)
│ └── base.py # 游戏基类(添加帮助信息)
├── routers/
│ └── callback.py # Callback路由处理注册冒险游戏
└── utils/
└── parser.py # 指令解析(注册冒险指令)
```
# 分析
## 当前状态
1. `games/adventure.py``_perform_adventure` 函数只有框架,所有逻辑都是 `if False: pass`
2. `_draw_prize` 函数期望4元组奖品池但实际奖品池是3元组 `(权重, 倍率, 描述)`
3. 炼金游戏中没有检测用户是否在冒险中
4. 指令系统未注册冒险游戏
5. 帮助系统未包含冒险游戏说明
## 关键技术点
1. **状态管理**:使用 `game_states` 表存储冒险状态,使用 `chat_id=0` 作为用户级标识
2. **时间计算**:使用 Unix 时间戳计算冒险开始和结束时间,以分钟为单位
3. **奖品池结构**:修复为支持三元组格式 `(权重, 倍率, 描述)`
4. **游戏互斥**:冒险期间禁止炼金操作
5. **指令注册**:完整集成到指令解析和路由系统
# 提议的解决方案
## 方案概述
1. **完善冒险核心逻辑**
- 使用 `game_states` 表存储状态:`{'start_time': timestamp, 'cost_time': minutes}`
- 实现三种情况的状态判断和处理
- 时间计算:`end_time = start_time + cost_time * 60`
2. **修复奖品池处理**
- 修改 `_draw_prize` 支持三元组:`(权重, 倍率, 描述)`
- 返回格式:`{'value': 倍率, 'description': 描述}`
3. **炼金游戏集成**
-`_perform_alchemy` 开始时检查冒险状态
- 冒险进行中时阻止操作并显示剩余时间
- 冒险已完成时自动清理状态
4. **系统注册**
-`parser.py` 中添加指令映射
-`callback.py` 中添加游戏处理器
-`base.py` 中添加帮助信息
# 任务进度
## 2025-10-29_17:31:02
- 已修改:
- games/adventure.py完善 `_perform_adventure` 函数,修复 `_draw_prize` 函数,添加 time 导入)
- games/alchemy.py添加冒险状态检测添加 time 导入)
- utils/parser.py添加 `.adventure``.冒险` 指令映射)
- routers/callback.py添加冒险游戏处理分支
- games/base.py在帮助系统中添加冒险游戏说明
- 更改:
1. **冒险系统核心功能**
- 添加 `import time` 模块
- 修改 `handle` 方法传递 `chat_id` 参数
- 完善 `_perform_adventure` 方法,实现完整的状态管理逻辑:
* 参数验证:确保 `cost_time >= 1`
* 状态查询:使用 `chat_id=0` 查询用户级冒险状态
* 未完成冒险:计算并显示剩余时间(分钟和秒)
* 已完成冒险:发放奖励(倍率 × 消耗时间),清除状态
* 新冒险:创建状态并保存,显示预计完成时间
- 修复 `_draw_prize` 方法支持三元组奖品池
2. **炼金游戏集成**
- 添加 `import time` 模块
-`_perform_alchemy` 方法开始处添加冒险状态检测
- 冒险进行中时返回错误提示并显示剩余时间
- 冒险已完成时自动清理状态,允许继续炼金
3. **指令系统注册**
-`utils/parser.py``COMMAND_MAP` 中添加 `.adventure``.冒险` 映射
-`routers/callback.py``handle_command` 函数中添加冒险游戏处理分支
4. **帮助系统更新**
-`games/base.py``get_help_message` 函数中添加冒险系统帮助信息
- 原因:实现冒险系统完整功能,包括时间管理、奖励发放、游戏互斥和系统集成
- 阻碍因素:无
- 状态:成功
## 2025-10-30_00:00:00
- 已修改:
- games/adventure.py新增放弃指令分支新增 `_abandon_adventure`,更新帮助)
- games/base.py在全局帮助中添加放弃用法
- 更改:
1. 新增冒险放弃功能:
- 支持指令:`.adventure abandon``.adventure 放弃`
- 结算规则:最低倍率 × 已冒险分钟向下取整至少1分钟
- 发放奖励后删除状态
2. 帮助信息更新:
- 本地帮助与全局帮助均加入放弃用法说明
- 原因:允许用户在冒险过程中主动放弃并按最低倍率获得奖励
- 阻碍因素:无
- 状态:成功
# 详细实施记录
## 文件修改清单
### 1. games/adventure.py
- **添加导入**`import time`
- **修改方法签名**
- `handle`:传递 `chat_id``_perform_adventure`
- `_perform_adventure`:添加 `chat_id` 参数,改为 `async`
- **完善 `_perform_adventure` 逻辑**
- 参数验证:`cost_time >= 1`
- 使用 `get_game_state(0, user_id, 'adventure')` 查询状态
- 实现三种状态处理:
1. 未完成计算剩余时间格式化显示X分Y秒
2. 已完成:执行抽奖,发放奖励(倍率 × 消耗时间),删除状态
3. 新冒险:创建状态数据,保存到数据库,计算预计完成时间
- 异常处理:捕获状态数据异常,自动清理损坏状态
- **修复 `_draw_prize` 方法**
- 修改循环:`for weight, multiplier, description in prize_pool:`
- 返回值:`{'value': multiplier, 'description': description}`
- 兜底返回:使用 `prize_pool[0][1]``prize_pool[0][2]`
### 2. games/alchemy.py
- **添加导入**`import time`
- **在 `_perform_alchemy` 中添加冒险检测**
- 使用 `get_game_state(0, user_id, 'adventure')` 查询状态
- 如果存在状态:
* 计算剩余时间
* 如果已完成:自动删除状态,允许继续
* 如果未完成返回错误消息显示剩余时间X分Y秒
- 异常处理:捕获状态数据异常,自动清理损坏状态
### 3. utils/parser.py
- **在 `COMMAND_MAP` 中添加**
```python
# 冒险系统
'.adventure': 'adventure',
'.冒险': 'adventure',
```
### 4. routers/callback.py
- **在 `handle_command` 函数中添加**
```python
# 冒险系统
if game_type == 'adventure':
from games.adventure import AdventureGame
game = AdventureGame()
return await game.handle(command, chat_id, user_id)
```
### 5. games/base.py
- **在 `get_help_message` 函数中添加**
```markdown
### ⚡️ 冒险系统
- `.adventure` - 消耗1分钟进行冒险
- `.冒险` - 消耗1分钟进行冒险
- `.adventure 5` - 消耗5分钟进行冒险
- `.adventure help` - 查看冒险帮助
```
## 关键实现细节
### 状态数据结构
```python
state_data = {
'start_time': int(time.time()), # Unix时间戳
'cost_time': 5 # 消耗时间(分钟)
}
```
### 时间计算逻辑
- 开始时间:`start_time = int(time.time())`
- 结束时间:`end_time = start_time + cost_time * 60`
- 剩余时间:`remaining_seconds = end_time - current_time`
- 剩余时间显示:`remaining_minutes = remaining_seconds // 60``remaining_secs = remaining_seconds % 60`
### 奖励计算
- 抽奖获取倍率:`reward = self._draw_prize(prize_pool)`
- 奖励积分:`reward_points = int(reward['value'] * cost_time)`
- 发放奖励:`self.db.add_points(user_id, reward_points, "adventure", "冒险奖励")`
### 游戏互斥机制
- 炼金前检查:查询冒险状态
- 如果冒险进行中:返回错误,显示剩余时间
- 如果冒险已完成:自动清理状态,允许炼金
- 状态异常:自动清理,允许继续操作
# 最终审查
## 功能验证
- ✅ 冒险开始:用户可以指定时间(分钟)开始冒险
- ✅ 冒险进行中:显示剩余时间,阻止重复开始
- ✅ 冒险完成:自动发放奖励,清除状态
- ✅ 时间计算:正确计算剩余时间和完成时间
- ✅ 奖励发放:根据倍率和消耗时间计算奖励积分
- ✅ 游戏互斥:冒险期间阻止炼金操作
- ✅ 指令注册:`.adventure` 和 `.冒险` 指令正常工作
- ✅ 帮助信息:显示在全局帮助中
- ✅ 冒险放弃:`.adventure abandon` / `.adventure 放弃` 按最低倍率结算已冒险分钟并清理状态
## 代码质量
- ✅ 所有语法检查通过
- ✅ 错误处理完善(参数验证、状态异常处理)
- ✅ 日志记录完整
- ✅ 代码风格一致
- ✅ 不了解释清晰
## 集成完成
- ✅ 指令解析器:已注册指令映射
- ✅ 路由处理器:已添加游戏处理分支
- ✅ 帮助系统:已添加帮助信息
- ✅ 游戏互斥:已集成到炼金系统
**实施与计划完全匹配**
所有功能已按计划完成冒险系统已完整集成到WPS Bot Game系统中。

View File

@@ -0,0 +1,428 @@
# 背景
文件名2025-10-29_3_ai_chat.md
创建于2025-10-29_23:32:40
创建者user
主分支main
任务分支task/ai_chat_2025-10-29_3
Yolo模式Off
# 任务描述
在本项目中新增一个AI对话功能使用llama_index构建服务商为本地部署的ollama。
这个智能体将拥有足够长的上下文超过30轮对话历史能够同时与不同的用户展开交流。例如用户A提问后用户B进行补充智能体将通过时间间隔判断机制来决定何时进行回答。
## 核心需求
1. **显式指令触发**:使用 `.ai <问题>` 指令触发AI对话
2. **配置指令**:使用 `.aiconfig` 指令配置Ollama服务地址、端口和模型名称
3. **时间间隔判断**智能体通过时间间隔判断是否需要回答固定10秒等待窗口
4. **长上下文管理**保留超过30轮对话历史
5. **多用户对话支持**同一chat_id下不同用户的消息能够被正确识别和处理
## 技术方案决策(已确定)
1. **延迟任务机制**:使用 asyncio 的延迟任务(方案三)
- 每个 chat_id 维护独立的延迟任务句柄
- 使用全局字典存储任务映射
- 收到新消息时取消旧任务并创建新任务
2. **上下文管理**:使用 llama_index 的 ChatMemoryBuffer策略A
- 设置足够的 token_limit 确保保留30+轮对话
- 按 chat_id 独立维护 ChatEngine 实例
3. **多用户识别**:消息角色映射 + 系统提示方案C
- 将不同用户映射为不同角色(如"用户1"、"用户2"
- 在系统提示中明确告知存在多用户场景
- ChatMemoryBuffer 中使用角色区分不同用户
4. **等待窗口**固定10秒窗口变体1
- 收到消息后等待10秒
- 等待期间有新消息则重新计时
5. **配置管理**使用单独的JSON文件
- 配置存储在 `data/ai_config.json`
- 全局单一配置服务器级别非chat级别
- 通过 `.aiconfig` 指令修改配置并保存到文件
# 项目概览
## 项目结构
```
WPSBot/
├── app.py # FastAPI主应用
├── config.py # 配置管理
├── core/ # 核心模块
│ ├── database.py # 数据库操作
│ ├── models.py # 数据模型
│ └── middleware.py # 中间件
├── routers/ # 路由模块
│ ├── callback.py # 回调处理
│ └── health.py # 健康检查
├── games/ # 游戏模块
│ ├── base.py # 游戏基类
│ └── ... # 其他游戏
├── utils/ # 工具模块
│ ├── message.py # 消息发送
│ ├── parser.py # 指令解析
│ └── rate_limit.py # 限流控制
└── data/ # 数据文件
```
## 技术栈
- FastAPIWeb框架
- SQLite数据存储
- llama-index-coreAI对话框架核心
- llama-index-llms-ollamaOllama LLM集成
- Ollama本地LLM服务
# 分析
## 现有架构
1. **指令处理流程**
- 消息通过 `/api/callback` 接收
- `CommandParser` 解析指令,只处理以 `.` 开头的命令
- 非指令消息会被忽略
- 指令分发到对应的游戏处理器
2. **状态管理**
- 游戏状态存储在 `game_states`
- 使用 `(chat_id, user_id, game_type)` 作为联合主键
- 对于群组共享状态,使用 `user_id=0`(如成语接龙)
3. **异步任务**
- 已有 `periodic_cleanup` 后台清理任务示例
- 使用 `asyncio.create_task``asyncio.sleep` 实现
## 关键技术挑战
### 1. 延迟回答机制
需要实现一个基于时间间隔的判断机制:
- 收到 `.ai` 指令时,将消息加入等待队列
- 设置一个等待窗口例如5-10秒
- 如果在等待窗口内有新消息,重新计时
- 等待窗口结束后,如果没有新消息,生成回答
- 需要在 `chat_id` 级别维护等待队列和延迟任务
### 2. 长上下文管理
- 使用 llama_index 的 `ChatMemoryBuffer` 管理对话历史
- 确保超过30轮对话历史能够被保留
- 对话历史需要按 `chat_id` 独立存储
- 对话历史中需要包含用户ID信息以便区分不同用户
### 3. Ollama配置管理
- 使用全局单一配置(服务器级别)
- 配置存储在 `data/ai_config.json` 文件中
- 配置包括:服务地址、端口、模型名称
- 通过 `.aiconfig` 指令修改配置并持久化到文件
- 配置需要有默认值localhost:11434默认模型需指定
### 4. 多用户对话识别
- 对话历史中需要记录每条消息的发送者user_id
- 生成回复时,需要识别上下文中的不同用户
- 回复格式可以考虑使用 @用户 的方式
### 5. 依赖管理
- 需要添加 llama-index-core 和相关依赖
- 需要确保与现有代码库的兼容性
- 考虑资源占用内存、CPU
## 数据结构设计
### AI对话状态数据结构
对话状态由 llama_index 的 ChatMemoryBuffer 管理,存储在内存中。
需要存储的额外信息:
```python
# 存储在 game_states 表中的 state_data
{
"user_mapping": { # 用户ID到角色名称的映射
"123456": "用户1",
"789012": "用户2",
...
},
"user_count": 2 # 当前对话中的用户数量
}
```
### 配置数据结构(存储在 data/ai_config.json
```json
{
"host": "localhost",
"port": 11434,
"model": "llama3.1"
}
```
### 数据库扩展
使用 `game_states` 表存储用户映射信息:
- `chat_id`: 会话ID
- `user_id`: 0表示群组级别
- `game_type`: "ai_chat"
- `state_data`: JSON格式的用户映射信息
注意:对话历史由 ChatMemoryBuffer 在内存中管理,不持久化到数据库。
# 提议的解决方案
## 方案概述
1. 创建一个新的游戏模块 `games/ai_chat.py`,继承 `BaseGame`
2. 使用 `game_states` 表存储用户映射信息用户ID到角色名称的映射
3. 使用全局字典维护每个 `chat_id` 的延迟任务句柄
4. 使用全局字典维护每个 `chat_id` 的 ChatEngine 实例和待处理消息队列
5. 使用 `data/ai_config.json` 存储 Ollama 全局配置
6. 使用 llama_index 的 ChatMemoryBuffer 管理对话上下文(内存中)
## 实现细节
### 1. 指令注册
`utils/parser.py` 中添加:
- `.ai`: 触发AI对话
- `.aiconfig`: 配置Ollama参数
### 2. AI对话模块 (`games/ai_chat.py`)
- `handle()`: 主处理函数,处理 `.ai``.aiconfig` 指令
- `_handle_ai()`: 处理AI对话请求
- 将消息加入等待队列
- 取消旧的延迟任务(如果存在)
- 创建新的延迟任务10秒后执行
- `_handle_config()`: 处理配置请求
- 解析配置参数host, port, model
- 更新 `data/ai_config.json` 文件
- 返回配置确认消息
- `_add_to_queue()`: 将消息加入等待队列(按 chat_id 组织)
- `_delayed_response()`: 延迟回答任务(内部异步函数)
- 等待10秒后执行
- 检查队列并生成回答
- 处理任务取消异常
- `_generate_response()`: 使用LLM生成回答
- 获取或创建 ChatEngine 实例
- 获取用户角色映射
- 将队列中的消息按用户角色格式化
- 调用 ChatEngine.chat() 生成回答
- 更新 ChatMemoryBuffer
- `_get_chat_engine()`: 获取或创建ChatEngine实例
- 检查全局字典中是否已存在
- 不存在则创建新的 ChatEngine配置 ChatMemoryBuffer
- 设置系统提示(告知多用户场景)
- `_get_user_role()`: 获取用户角色名称(创建或获取映射)
- `_load_config()`: 从 JSON 文件加载配置
- `_save_config()`: 保存配置到 JSON 文件
### 3. 延迟任务管理
- 使用全局字典 `_pending_tasks` 存储每个 `chat_id` 的延迟任务句柄
- 使用全局字典 `_message_queues` 存储每个 `chat_id` 的待处理消息队列
- 使用全局字典 `_chat_engines` 存储每个 `chat_id` 的 ChatEngine 实例
- 新消息到达时,取消旧任务(调用 task.cancel())并创建新任务
- 使用 `asyncio.create_task``asyncio.sleep(10)` 实现固定10秒延迟
- 处理 `asyncio.CancelledError` 异常,避免任务取消时的错误日志
### 4. 用户角色映射机制
- 为每个 chat_id 维护用户ID到角色名称的映射如"用户1"、"用户2"
- 映射信息存储在 `game_states` 表中chat_id, user_id=0, game_type='ai_chat'
- 首次出现的用户自动分配角色名称(按出现顺序)
- 在将消息添加到 ChatMemoryBuffer 时使用角色名称作为消息角色
- 系统提示中包含:"这是一个多用户对话场景,不同用户的发言会用不同的角色标识。你需要理解不同用户的发言内容,并根据上下文给出合适的回复。"
### 4. 依赖添加
`requirements.txt` 中添加:
```
llama-index-core>=0.10.0
llama-index-llms-ollama>=0.1.0
```
### 5. 路由注册
`routers/callback.py``handle_command()` 中添加AI对话处理分支
### 6. 帮助信息更新
`games/base.py``get_help_message()` 中添加AI对话帮助
## 时间间隔判断逻辑固定10秒窗口
1. **默认等待窗口**10秒固定
2. **收到 `.ai` 指令时**
- 提取消息内容(去除 `.ai` 前缀)
- 获取用户ID和chat_id
- 将消息用户ID + 内容)加入该 `chat_id` 的等待队列
- 如果有待处理的延迟任务(检查 `_pending_tasks[chat_id]`),取消它
- 创建新的延迟任务(`asyncio.create_task(_delayed_response(chat_id))`
- 将任务句柄存储到 `_pending_tasks[chat_id]`
3. **在等待窗口内收到新消息**(无论是否是指令):
- 如果新消息也是 `.ai` 指令:
- 将新消息加入队列
- 取消当前延迟任务(`task.cancel()`
- 创建新的延迟任务重新计时10秒
- 如果新消息不是指令但chat_id在等待队列中
- 可以考虑忽略,或也加入队列(根据需求决定)
4. **等待窗口结束(延迟任务执行)**
- 检查队列中是否有消息
- 如果有,获取该 chat_id 的 ChatEngine 和用户映射
- 将队列中的消息按用户角色格式化后添加到 ChatMemoryBuffer
- 调用 ChatEngine.chat() 生成回答
- 清空队列
-`_pending_tasks` 中移除任务句柄
## 配置文件管理data/ai_config.json
- 文件结构:
```json
{
"host": "localhost",
"port": 11434,
"model": "llama3.1"
}
```
- 首次加载时如果文件不存在,创建默认配置
- 通过 `.aiconfig` 指令修改配置时,实时保存到文件
- ChatEngine 创建时从配置文件加载配置
# 当前执行步骤:"4. 执行模式 - 代码实施完成并测试通过"
# 任务进度
## [2025-10-29_23:55:08] 执行阶段完成
- 已修改:
- requirements.txt添加 llama-index-core 和 llama-index-llms-ollama 依赖
- data/ai_config.json创建默认配置文件
- utils/parser.py添加 .ai 和 .aiconfig 指令映射和解析逻辑
- games/ai_chat.py创建完整的 AI 对话模块实现
- routers/callback.py添加 ai_chat 处理分支
- games/base.py添加 AI 对话帮助信息
- 更改:
- 实现了基于 llama_index 和 Ollama 的 AI 对话功能
- 实现了固定10秒等待窗口的延迟回答机制
- 实现了用户角色映射和长上下文管理
- 实现了配置文件的 JSON 存储和管理
- 原因:按照计划实施 AI 对话功能的所有核心组件
- 阻碍因素:无
- 状态:成功
## [2025-10-30_00:56:44] 功能优化和问题修复
- 已修改:
- games/ai_chat.py优化错误处理和用户体验
1. 移除收到消息后的确认回复(静默处理)
2. 修复转义字符警告SyntaxWarning
3. 改进错误处理,提供详细的调试信息和排查步骤
4. 添加超时设置120秒
5. 针对NPS端口转发的特殊错误提示
- 更改:
- 优化了错误提示信息,包含当前配置、测试命令和详细排查步骤
- 专门针对NPS端口转发场景添加了Ollama监听地址配置说明
- 改进了连接错误的诊断能力
- 原因:根据实际使用中发现的问题进行优化
- 阻碍因素:无
- 状态:成功
## [2025-10-30_01:10:05] 系统提示词持久化和功能完善
- 已修改:
- games/ai_chat.py
1. 实现系统提示词的持久化存储(保存到配置文件)
2. 添加 `_get_default_system_prompt()` 方法定义默认系统提示词
3. 添加 `_get_system_prompt()` 方法从配置文件加载系统提示词
4. 更新系统提示词内容明确AI身份和职责
5. 在系统提示词中包含完整的机器人功能列表和指引
- 更改:
- 系统提示词现在会保存到 `data/ai_config.json` 文件中
- 服务重启后系统提示词会自动从配置文件加载,保持长期记忆
- AI助手能够了解自己的身份和所有机器人功能可以主动指引用户
- 系统提示词包含了完整的13个功能模块介绍和回复指南
- 原因实现系统提示词的长期记忆让AI能够始终记住自己的身份和职责
- 阻碍因素:无
- 状态:成功
# 最终审查
## 实施总结
✅ 所有计划功能已成功实施并通过测试
### 核心功能实现
1. ✅ AI对话系统基于 llama_index + Ollama 构建
2. ✅ 显式指令触发(`.ai <问题>`
3. ✅ 配置指令(`.aiconfig`支持动态配置Ollama服务
4. ✅ 固定10秒等待窗口的延迟回答机制
5. ✅ 用户角色映射和长上下文管理30+轮对话)
6. ✅ 配置文件持久化存储
7. ✅ 系统提示词持久化存储(新增)
8. ✅ 完善的错误处理和调试信息
### 文件修改清单
- ✅ requirements.txt - 添加依赖
- ✅ data/ai_config.json - 配置文件(包含系统提示词)
- ✅ utils/parser.py - 指令解析
- ✅ games/ai_chat.py - AI对话模块完整实现
- ✅ routers/callback.py - 路由注册
- ✅ games/base.py - 帮助信息更新
### 技术特性
- ✅ 多用户对话支持
- ✅ 延迟任务管理asyncio
- ✅ ChatMemoryBuffer长上下文管理
- ✅ JSON配置文件管理
- ✅ NPS端口转发支持
- ✅ 详细的错误诊断和排查指南
### 测试状态
- ✅ 功能测试通过
- ✅ Ollama服务连接测试通过
- ✅ NPS端口转发配置测试通过
- ✅ 系统提示词持久化测试通过
## 实施与计划匹配度
实施与计划完全匹配 ✅
## 补充分析Markdown 渲染与发送通道2025-10-30
### 现状观察
- `routers/callback.py` 中仅当 `response_text.startswith('#')` 时才通过 `send_markdown()` 发送,否则使用 `send_text()`。这意味着即使 AI 返回了合法的 Markdown但不以 `#` 开头例如代码块、列表、表格、普通段落等也会被按纯文本通道发送导致下游WPS 侧)不进行 Markdown 渲染。
- `games/ai_chat.py` 的 `_generate_response()` 直接返回 `str(response)`,未对内容类型进行标注或判定,上层仅依赖首字符为 `#` 的启发式判断来选择发送通道。
- `utils/message.py` 已具备 `send_markdown()` 与 `send_text()` 两种发送方式,对应 `{"msgtype":"markdown"}` 与 `{"msgtype":"text"}` 消息结构;当前缺少自动识别 Markdown 的逻辑。
### 影响
- 当 AI 返回包含 Markdown 元素但非标题(不以 `#` 开头)的内容时,用户端看到的是未渲染的原始 Markdown 文本,表现为“格式不能成功排版”。
### 待确认问题(不含解决方案,需产品/实现口径)
1. 目标平台WPS 机器人)对 Markdown 的要求是否仅需 `msgtype=markdown` 即可渲染?是否存在必须以标题开头的限制?
2. 期望策略:
- 是否希望“.ai 的所有回复”统一走 Markdown 通道?
- 还是需要基于 Markdown 特征进行判定(如代码块、列表、链接、表格、行内格式等)?
3. 兼容性:若统一改为 Markdown 通道,是否会影响既有纯文本展示(例如换行、转义、表情)?
4. 其他指令模块是否也可能返回 Markdown若有是否一并纳入同一策略
### 相关代码参照点(路径)
- `routers/callback.py`:回复通道选择逻辑(基于 `startswith('#')`
- `games/ai_chat.py`AI 回复内容生成与返回(直接返回字符串)
- `utils/message.py``send_markdown()` 与 `send_text()` 的消息结构
### 决策结论与范围2025-10-30
- 分支策略:不创建新分支,继续在当前任务上下文内推进。
- 发送策略:`.ai` 产生的回复统一按 Markdown 发送。
- 影响范围:仅限 AI 对话功能(`.ai`/`ai_chat`),不扩展到其他指令模块。
# 任务进度(补充)
## [2025-10-30_??:??:??] 标注 Markdown 渲染问题(记录现状与待确认项)
- 已修改:
- `.tasks/2025-10-29_3_ai_chat.md`补充“Markdown 渲染与发送通道”分析与待确认清单(仅问题陈述,无解决方案)。
- 更改:
- 明确当前仅以标题开头触发 Markdown 发送的启发式导致部分 Markdown 未被渲染。
- 原因:
- 用户反馈“AI 返回内容支持 Markdown但当前直接当作文本返回导致无法正确排版”。
- 阻碍因素:
- 目标平台的 Markdown 渲染细节与统一策略选择待确认。
- 状态:
- 未确认(等待策略口径与平台渲染规范确认)。
## [2025-10-30_11:40:31] 执行AI 回复统一按 Markdown 发送(仅限 AI
- 已修改:
- `routers/callback.py`:在 `callback_receive()` 的发送阶段,当 `game_type == 'ai_chat'` 且存在 `response_text` 时,无条件调用 `send_markdown(response_text)`;若发送异常,记录日志并回退到 `send_text(response_text)`;其他指令模块继续沿用 `startswith('#')` 的启发式逻辑。
- 更改:
- 使 `.ai` 产生的回复在 WPS 端稳定触发 Markdown 渲染,不再依赖以 `#` 开头。
- 原因:
- 对齐“统一按 Markdown 发送(仅限 AI”的决策解决 Markdown 文本被当作纯文本发送导致的排版问题。
- 阻碍因素:
- 暂无。
- 状态:
- 成功。

175
README.md
View File

@@ -29,6 +29,41 @@
- 3次答题机会
- 关键词智能匹配
### 🀄 成语接龙
- 支持多音字和谐音接龙
- 全局黑名单系统
- 实时状态显示
- 参与者统计排行
- 支持指定下一位接龙者
### ⚫ 五子棋
- 标准15×15棋盘
- 黑方禁手规则(三三、四四、长连)
- 多轮对战同时进行
- 完整的战绩统计
- 实时棋盘显示
### 💎 积分系统
- 每日签到获得固定积分10分
- 运势占卜随机获得积分1-20分30%概率)
- 个人积分查询和记录
- 积分排行榜
- 完整的积分变动记录
### ⚗️ 炼金系统
- 消耗积分进行抽奖10/20/50积分
- 奖品池数学期望略高于消耗积分,对玩家友好
- 包含大奖(巨额积分)和负面奖励(额外扣分)
- 完整的炼金记录和统计
- 支持多种消耗档位的抽奖
### 🎁 积分赠送系统
- 用户间积分赠送功能
- 支持附赠个性化消息
- 完整的赠送和接收记录
- 赠送统计和记录查询
- 单次最多赠送1000积分
## 🚀 快速开始
### 环境要求
@@ -50,26 +85,62 @@ cd WPSBotGame
# 使用conda环境
conda activate liubai
pip install -r requirements.txt
# 注意成语接龙游戏需要pypinyin库进行拼音处理
```
3. **配置环境变量**
3. **配置Webhook**
有三种方式配置Webhook URL
#### 方式1命令行参数推荐
```bash
# Linux/Mac
python app.py --webhook-url "https://xz.wps.cn/api/v1/webhook/send?key=your_key"
# Windows
python app.py --webhook-url "https://xz.wps.cn/api/v1/webhook/send?key=your_key"
```
#### 方式2使用启动脚本
```bash
# Linux/Mac
./start.sh -w "https://xz.wps.cn/api/v1/webhook/send?key=your_key"
# Windows
start.bat -w "https://xz.wps.cn/api/v1/webhook/send?key=your_key"
```
#### 方式3环境变量
```bash
# 复制配置文件模板
cp env.example .env
cp config.env.example config.env
# 编辑配置文件填入你的Webhook URL
nano .env
nano config.env
```
4. **运行应用**
```bash
# 开发模式
# 基本启动
python app.py
# 自定义参数启动
python app.py --webhook-url "your_webhook_url" --port 8080 --log-level debug
# 使用启动脚本
./start.sh -w "your_webhook_url" -p 8080 -l debug
# 生产模式使用uvicorn
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1
```
### 命令行参数说明
- `--webhook-url, -w`: WPS Webhook URL
- `--host, -H`: 服务器主机地址 (默认: 0.0.0.0)
- `--port, -p`: 服务器端口 (默认: 11000)
- `--workers`: 工作进程数 (默认: 1)
- `--log-level`: 日志级别 (默认: info)
## 📝 配置说明
### 环境变量
@@ -90,6 +161,13 @@ MESSAGE_RATE_LIMIT=20
# 日志配置
LOG_LEVEL=INFO
# 游戏配置(可选,使用默认值)
# 成语接龙最大历史显示数量
IDIOM_MAX_HISTORY_DISPLAY=10
# 五子棋最大并发游戏数
GOMOKU_MAX_CONCURRENT_GAMES=5
```
### WPS机器人配置
@@ -148,6 +226,65 @@ LOG_LEVEL=INFO
.quiz 答案 # 回答问题
```
### 成语接龙
```
.idiom start [成语] # 开始游戏(可指定起始成语)
.idiom [成语] # 接龙
.idiom [成语] @某人 # 接龙并指定下一位
.idiom stop # 结束游戏
.idiom status # 查看游戏状态
.idiom reject [词语] # 拒绝词语加入黑名单(仅发起人)
.idiom blacklist # 查看黑名单
.idiom next @某人 # 指定下一位(仅最后接龙者)
```
### 五子棋
```
.gomoku challenge # 发起挑战
.gomoku accept # 接受挑战
.gomoku A1 # 在A1位置落子
.gomoku show # 显示当前棋盘
.gomoku resign # 认输
.gomoku cancel # 取消自己的挑战
.gomoku list # 列出所有进行中的对战
.gomoku stats # 查看个人战绩
```
### 积分系统
```
.points # 查看个人积分
.积分 # 查看个人积分
.checkin # 每日签到
.签到 # 每日签到
.打卡 # 每日签到
.points leaderboard # 积分排行榜
```
### 炼金系统
```
.alchemy # 消耗10积分进行炼金
.炼金 # 消耗10积分进行炼金
.alchemy 20 # 消耗20积分进行炼金
.alchemy 50 # 消耗50积分进行炼金
.alchemy stats # 查看炼金统计
.alchemy records # 查看炼金记录
```
### 积分赠送系统
```
.gift 123 50 生日快乐 # 赠送50积分给用户123附赠消息
.赠送 456 100 感谢帮助 # 赠送100积分给用户456附赠消息
.送 789 200 # 赠送200积分给用户789
.gift stats # 查看赠送统计
.gift sent # 查看发送记录
.gift received # 查看接收记录
```
## 🏗️ 项目结构
```
@@ -155,6 +292,7 @@ WPSBotGame/
├── app.py # FastAPI主应用
├── config.py # 配置管理
├── requirements.txt # Python依赖
├── env.example # 环境变量模板
├── core/ # 核心模块
│ ├── database.py # SQLite数据库
│ ├── models.py # 数据模型
@@ -167,15 +305,31 @@ WPSBotGame/
│ ├── parser.py # 指令解析
│ └── rate_limit.py # 限流控制
├── games/ # 游戏模块
│ ├── base.py # 游戏基类
│ ├── dice.py # 骰娘系统
│ ├── rps.py # 石头剪刀布
│ ├── fortune.py # 运势占卜
│ ├── guess.py # 猜数字
── quiz.py # 问答游戏
└── data/ # 数据文件
├── bot.db # SQLite数据库
├── fortunes.json # 运势数据
── quiz.json # 问答题库
── quiz.py # 问答游戏
│ ├── idiom.py # 成语接龙
├── gomoku.py # 五子棋
├── gomoku_logic.py # 五子棋逻辑
── points.py # 积分系统
│ ├── alchemy.py # 炼金系统
│ └── gift.py # 积分赠送系统
├── data/ # 数据文件
│ ├── bot.db # SQLite数据库
│ ├── fortunes.json # 运势数据
│ ├── quiz.json # 问答题库
│ └── idiom_blacklist.json # 成语黑名单
├── deploy/ # 部署配置
│ ├── install.sh # 安装脚本
│ ├── manage.sh # 管理脚本
│ ├── nginx/ # Nginx配置
│ └── systemd/ # systemd服务配置
└── Convention/ # 通用工具库
├── Runtime/ # 运行时工具
└── Image/ # 图像处理工具
```
## 🔧 部署
@@ -256,11 +410,12 @@ curl http://localhost:8000/stats
## 📈 性能指标
- **内存占用**150-250MB
- **内存占用**150-300MB(包含拼音处理库)
- **响应时间**<500ms
- **并发支持**5-10个同时请求
- **用户规模**50-100个活跃用户
- **消息限制**20条/分钟WPS限制
- **支持游戏**10种游戏类型骰子石头剪刀布运势猜数字问答成语接龙五子棋积分系统炼金系统积分赠送系统
## 🤝 贡献

243
STARTUP_GUIDE.md Normal file
View File

@@ -0,0 +1,243 @@
# WPS Bot Game 启动方式说明
## 🚀 多种启动方式
### 1. 命令行参数启动(推荐)
#### 基本启动
```bash
python app.py --webhook-url "https://xz.wps.cn/api/v1/webhook/send?key=your_key"
```
#### 自定义参数启动
```bash
python app.py \
--webhook-url "https://xz.wps.cn/api/v1/webhook/send?key=your_key" \
--host 0.0.0.0 \
--port 8080 \
--workers 1 \
--log-level debug
```
#### 短参数形式
```bash
python app.py -w "https://xz.wps.cn/api/v1/webhook/send?key=your_key" -p 8080 -l debug
```
### 2. 使用启动脚本
#### Linux/Mac
```bash
# 基本启动
./start.sh -w "https://xz.wps.cn/api/v1/webhook/send?key=your_key"
# 自定义参数
./start.sh -w "your_webhook_url" -p 8080 -l debug
# 查看帮助
./start.sh --help
```
#### Windows
```cmd
REM 基本启动
start.bat -w "https://xz.wps.cn/api/v1/webhook/send?key=your_key"
REM 自定义参数
start.bat -w "your_webhook_url" -p 8080 -l debug
REM 查看帮助
start.bat --help
```
### 3. 环境变量启动
#### 设置环境变量
```bash
# Linux/Mac
export WEBHOOK_URL="https://xz.wps.cn/api/v1/webhook/send?key=your_key"
export HOST="0.0.0.0"
export PORT="8080"
export LOG_LEVEL="debug"
# Windows
set WEBHOOK_URL=https://xz.wps.cn/api/v1/webhook/send?key=your_key
set HOST=0.0.0.0
set PORT=8080
set LOG_LEVEL=debug
```
#### 启动应用
```bash
python app.py
```
### 4. 配置文件启动
#### 创建配置文件
```bash
# 复制配置模板
cp config.env.example config.env
# 编辑配置文件
nano config.env
```
#### 配置文件内容
```env
WEBHOOK_URL=https://xz.wps.cn/api/v1/webhook/send?key=your_key
HOST=0.0.0.0
PORT=8080
WORKERS=1
LOG_LEVEL=debug
```
#### 启动应用
```bash
python app.py
```
## 📋 参数说明
| 参数 | 短参数 | 默认值 | 说明 |
|------|--------|--------|------|
| `--webhook-url` | `-w` | 无 | WPS Webhook URL |
| `--host` | `-H` | `0.0.0.0` | 服务器主机地址 |
| `--port` | `-p` | `11000` | 服务器端口 |
| `--workers` | 无 | `1` | 工作进程数 |
| `--log-level` | `-l` | `info` | 日志级别 |
## 🔧 生产环境部署
### 使用systemd服务
```bash
# 编辑服务文件
sudo nano /etc/systemd/system/wps-bot.service
# 服务文件内容
[Unit]
Description=WPS Bot Game
After=network.target
[Service]
Type=simple
User=your_user
WorkingDirectory=/path/to/WPSBotGame
ExecStart=/usr/bin/python3 app.py --webhook-url "your_webhook_url" --port 11000
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
```
### 使用Docker
```dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "app.py", "--webhook-url", "your_webhook_url"]
```
### 使用Nginx反向代理
```nginx
server {
listen 80;
server_name your-domain.com;
location / {
proxy_pass http://127.0.0.1:11000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
```
## 🐛 故障排除
### 1. 端口被占用
```bash
# 查看端口占用
netstat -tlnp | grep :11000
# 杀死占用进程
sudo kill -9 PID
# 或使用其他端口
python app.py --port 8080
```
### 2. 权限问题
```bash
# 给启动脚本执行权限
chmod +x start.sh
# 检查文件权限
ls -la start.sh
```
### 3. 依赖问题
```bash
# 检查Python版本
python --version
# 安装依赖
pip install -r requirements.txt
# 检查依赖
python -c "import fastapi, uvicorn"
```
### 4. 网络问题
```bash
# 测试Webhook URL
curl -X POST "your_webhook_url" \
-H "Content-Type: application/json" \
-d '{"msg_type": "text", "content": {"text": "test"}}'
# 检查防火墙
sudo ufw status
sudo ufw allow 11000
```
## 📊 监控和日志
### 查看日志
```bash
# 实时查看日志
tail -f /var/log/wps-bot.log
# 查看系统日志
journalctl -u wps-bot -f
# 查看错误日志
grep ERROR /var/log/wps-bot.log
```
### 健康检查
```bash
# 检查服务状态
curl http://localhost:11000/health
# 检查统计信息
curl http://localhost:11000/stats
```
### 性能监控
```bash
# 查看进程状态
ps aux | grep python
# 查看内存使用
free -h
# 查看磁盘使用
df -h
```

43
app.py
View File

@@ -1,11 +1,13 @@
"""WPS Bot Game - FastAPI主应用"""
import logging
import argparse
import os
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import asyncio
from config import APP_CONFIG, SESSION_TIMEOUT
from config import APP_CONFIG, SESSION_TIMEOUT, SetWebhookURL, GetWebhookURL
from core.middleware import ConcurrencyLimitMiddleware
from core.database import get_db
from routers import callback, health
@@ -94,14 +96,45 @@ async def global_exception_handler(request, exc):
)
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='WPS Bot Game')
parser.add_argument('--webhook-url', '-w',
help='WPS Webhook URL')
parser.add_argument('--host', '-H',
default='0.0.0.0',
help='服务器主机地址 (默认: 0.0.0.0)')
parser.add_argument('--port', '-p',
type=int, default=11000,
help='服务器端口 (默认: 11000)')
parser.add_argument('--workers',
type=int, default=1,
help='工作进程数 (默认: 1)')
parser.add_argument('--log-level',
default='info',
choices=['debug', 'info', 'warning', 'error'],
help='日志级别 (默认: info)')
return parser.parse_args()
if __name__ == "__main__":
import uvicorn
# 解析命令行参数
args = parse_args()
# 如果提供了webhook URL设置环境变量
if args.webhook_url:
SetWebhookURL(args.webhook_url)
logger.info(f"设置Webhook URL: {args.webhook_url}")
# 启动服务器
uvicorn.run(
"app:app",
host="0.0.0.0",
port=11000,
workers=1,
host=args.host,
port=args.port,
workers=args.workers,
limit_concurrency=5,
log_level="info"
log_level=args.log_level
)

21
config.env.example Normal file
View File

@@ -0,0 +1,21 @@
# WPS Bot Game 配置文件示例
# 复制此文件为 config.env 并修改相应配置
# WPS Webhook URL (必需)
# 格式: https://xz.wps.cn/api/v1/webhook/send?key=your_key
WEBHOOK_URL=https://xz.wps.cn/api/v1/webhook/send?key=your_key_here
# 服务器配置
HOST=0.0.0.0
PORT=11000
WORKERS=1
# 日志配置
LOG_LEVEL=info
# 数据库配置
DATABASE_URL=sqlite:///data/bot.db
# 其他配置
SESSION_TIMEOUT=300
CONCURRENCY_LIMIT=5

View File

@@ -9,11 +9,20 @@ load_dotenv()
# 项目根目录
BASE_DIR = Path(__file__).resolve().parent
# WPS Webhook配置
# WPS Webhook配置 - 使用函数动态获取
WEBHOOK_URL = os.getenv(
"WEBHOOK_URL",
"https://xz.wps.cn/api/v1/webhook/send?key=da86927e491f2aef4b964223687c2c80"
)
"WEBHOOK_URL",
"https://xz.wps.cn/api/v1/webhook/send?key=da86927e491f2aef4b964223687c2c80"
)
def SetWebhookURL(url: str):
"""设置Webhook URL"""
global WEBHOOK_URL
WEBHOOK_URL = url
def GetWebhookURL() -> str:
"""获取Webhook URL"""
return WEBHOOK_URL
# 数据库配置
DATABASE_PATH = os.getenv("DATABASE_PATH", str(BASE_DIR / "data" / "bot.db"))
@@ -72,5 +81,4 @@ GAME_CONFIG = {
"max_concurrent_games": 5, # 每个聊天最多同时进行的游戏数
"board_size": 15, # 棋盘大小
},
}
}

View File

@@ -33,12 +33,25 @@ class Database:
def conn(self) -> sqlite3.Connection:
"""获取数据库连接(懒加载)"""
if self._conn is None:
self._conn = sqlite3.connect(
self.db_path,
check_same_thread=False, # 允许多线程访问
isolation_level=None # 自动提交
)
self._conn.row_factory = sqlite3.Row # 支持字典式访问
try:
self._conn = sqlite3.connect(
self.db_path,
check_same_thread=False, # 允许多线程访问
isolation_level=None, # 自动提交
timeout=30.0 # 增加超时时间
)
self._conn.row_factory = sqlite3.Row # 支持字典式访问
# 启用WAL模式以提高并发性能
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._conn.execute("PRAGMA cache_size=1000")
self._conn.execute("PRAGMA temp_store=MEMORY")
logger.info(f"数据库连接成功: {self.db_path}")
except Exception as e:
logger.error(f"数据库连接失败: {e}", exc_info=True)
raise
return self._conn
def init_tables(self):
@@ -55,7 +68,7 @@ class Database:
)
""")
# 游戏状态表
# 游戏状态表(新增 confirmed 列用于标记可回收状态)
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -65,9 +78,18 @@ class Database:
state_data TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
confirmed INTEGER DEFAULT 0,
UNIQUE(chat_id, user_id, game_type)
)
""")
# 幂等迁移:为已有表增加 confirmed 列
try:
cursor.execute("ALTER TABLE game_states ADD COLUMN confirmed INTEGER DEFAULT 0")
logger.info("为 game_states 增加 confirmed 列")
except Exception:
# 列已存在或不可变更时忽略
pass
# 创建索引
cursor.execute("""
@@ -75,6 +97,12 @@ class Database:
ON game_states(chat_id, user_id)
""")
# 创建用户名索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_username
ON users(username)
""")
# 游戏统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_stats (
@@ -89,6 +117,18 @@ class Database:
)
""")
# 用户积分表 - 简化版本,只保留必要字段
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_points (
user_id INTEGER PRIMARY KEY,
points INTEGER DEFAULT 0,
last_checkin_date TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
logger.info("数据库表初始化完成")
# ===== 用户相关操作 =====
@@ -133,6 +173,53 @@ class Database:
'last_active': current_time
}
def get_user_by_name(self, username: str) -> Optional[Dict]:
"""根据用户名查找用户
Args:
username: 用户名
Returns:
用户信息字典如果不存在返回None
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM users WHERE username = ?",
(username,)
)
row = cursor.fetchone()
if row:
return dict(row)
return None
def update_user_name(self, user_id: int, username: str) -> bool:
"""更新用户名称
Args:
user_id: 用户ID
username: 新用户名
Returns:
是否成功
"""
try:
# 确保用户存在
self.get_or_create_user(user_id)
cursor = self.conn.cursor()
cursor.execute(
"UPDATE users SET username = ? WHERE user_id = ?",
(username, user_id)
)
logger.info(f"用户 {user_id} 更新名称为: {username}")
return True
except Exception as e:
logger.error(f"更新用户名失败: user_id={user_id}, username={username}, error={e}", exc_info=True)
return False
# ===== 游戏状态相关操作 =====
def get_game_state(self, chat_id: int, user_id: int, game_type: str) -> Optional[Dict]:
@@ -175,14 +262,31 @@ class Database:
state_json = json.dumps(state_data, ensure_ascii=False)
cursor.execute("""
INSERT INTO game_states (chat_id, user_id, game_type, state_data, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
INSERT INTO game_states (chat_id, user_id, game_type, state_data, created_at, updated_at, confirmed)
VALUES (?, ?, ?, ?, ?, ?, 0)
ON CONFLICT(chat_id, user_id, game_type)
DO UPDATE SET state_data = ?, updated_at = ?
""", (chat_id, user_id, game_type, state_json, current_time, current_time,
state_json, current_time))
DO UPDATE SET state_data = excluded.state_data,
updated_at = excluded.updated_at,
confirmed = 0
""", (chat_id, user_id, game_type, state_json, current_time, current_time))
logger.debug(f"保存游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def confirm_game_state(self, chat_id: int, user_id: int, game_type: str):
"""标记游戏状态为已确认(可由清理器删除)
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
"""
cursor = self.conn.cursor()
current_time = int(time.time())
cursor.execute(
"UPDATE game_states SET confirmed = 1, updated_at = ? WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(current_time, chat_id, user_id, game_type)
)
logger.debug(f"确认游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def delete_game_state(self, chat_id: int, user_id: int, game_type: str):
"""删除游戏状态
@@ -208,8 +312,9 @@ class Database:
cursor = self.conn.cursor()
cutoff_time = int(time.time()) - timeout
# 仅删除已确认confirmed=1的旧会话避免误删进行中的长时任务
cursor.execute(
"DELETE FROM game_states WHERE updated_at < ?",
"DELETE FROM game_states WHERE confirmed = 1 AND updated_at < ?",
(cutoff_time,)
)
deleted = cursor.rowcount
@@ -277,6 +382,211 @@ class Database:
logger.debug(f"更新游戏统计: user_id={user_id}, game_type={game_type}")
# ===== 积分相关操作 =====
def get_user_points(self, user_id: int) -> Dict:
"""获取用户积分信息
Args:
user_id: 用户ID
Returns:
积分信息字典
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if row:
return dict(row)
else:
# 创建新用户积分记录
current_time = int(time.time())
cursor.execute(
"INSERT INTO user_points (user_id, points, created_at, updated_at) VALUES (?, 0, ?, ?)",
(user_id, current_time, current_time)
)
return {
'user_id': user_id,
'points': 0,
'last_checkin_date': None,
'created_at': current_time,
'updated_at': current_time
}
def add_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""添加积分
Args:
user_id: 用户ID
points: 积分数量
source: 积分来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
logger.warning(f"积分数量无效: {points}")
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 确保用户存在
self.get_or_create_user(user_id)
# 更新用户积分(不触碰 last_checkin_date
# 若不存在则插入;存在则 points += ? 且更新 updated_at
cursor.execute("""
INSERT INTO user_points (user_id, points, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
points = user_points.points + excluded.points,
updated_at = excluded.updated_at
""", (user_id, points, current_time, current_time))
logger.info(f"用户 {user_id} 成功获得 {points} 积分,来源:{source}")
return True
except Exception as e:
logger.error(f"添加积分失败: user_id={user_id}, points={points}, error={e}", exc_info=True)
return False
def consume_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""消费积分
Args:
user_id: 用户ID
points: 积分数量
source: 消费来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 检查积分是否足够
cursor.execute(
"SELECT points FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if not row or row[0] < points:
logger.warning(f"用户 {user_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}")
return False
# 消费积分
cursor.execute(
"UPDATE user_points SET points = points - ?, updated_at = ? WHERE user_id = ?",
(points, current_time, user_id)
)
logger.debug(f"用户 {user_id} 消费 {points} 积分,来源:{source}")
return True
except Exception as e:
logger.error(f"消费积分失败: {e}")
return False
def check_daily_checkin(self, user_id: int, date: str) -> bool:
"""检查用户是否已签到
Args:
user_id: 用户ID
date: 日期字符串 (YYYY-MM-DD)
Returns:
是否已签到
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT last_checkin_date FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
return row and row[0] == date
def daily_checkin(self, user_id: int, points: int) -> bool:
"""每日签到
Args:
user_id: 用户ID
points: 签到积分
Returns:
是否成功
"""
from datetime import datetime
today = datetime.now().strftime('%Y-%m-%d')
if self.check_daily_checkin(user_id, today):
logger.warning(f"用户 {user_id} 今日已签到")
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 确保用户存在
self.get_or_create_user(user_id)
# 获取签到前积分
points_before = self.get_user_points(user_id)
logger.info(f"用户 {user_id} 签到前积分: {points_before['points']}")
# 更新积分和签到日期
cursor.execute("""
INSERT OR REPLACE INTO user_points (user_id, points, last_checkin_date, created_at, updated_at)
VALUES (?, COALESCE((SELECT points FROM user_points WHERE user_id = ?), 0) + ?, ?, ?, ?)
""", (user_id, user_id, points, today, current_time, current_time))
# 验证积分是否真的增加了
points_after = self.get_user_points(user_id)
logger.info(f"用户 {user_id} 签到后积分: {points_after['points']}")
if points_after['points'] > points_before['points']:
logger.info(f"用户 {user_id} 签到成功,积分增加: {points_after['points'] - points_before['points']}")
return True
else:
logger.error(f"用户 {user_id} 签到失败,积分未增加")
return False
except Exception as e:
logger.error(f"每日签到失败: {e}", exc_info=True)
return False
def get_points_leaderboard(self, limit: int = 10) -> List[Dict]:
"""获取积分排行榜
Args:
limit: 限制数量
Returns:
排行榜列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT u.user_id, u.username, up.points
FROM users u
LEFT JOIN user_points up ON u.user_id = up.user_id
ORDER BY COALESCE(up.points, 0) DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def close(self):
"""关闭数据库连接"""
if self._conn:

6
data/ai_config.json Normal file
View File

@@ -0,0 +1,6 @@
{
"host": "0.0.0.0",
"port": 11434,
"model": "qwen3:0.6b"
}

Binary file not shown.

View File

@@ -1,146 +1,147 @@
{
"fortunes": [
{
"level": "大吉",
"color": "#FF4757",
"emoji": "🌟",
"description": "今天运势爆棚!做什么都顺利,是实现愿望的好日子!",
"advice": "抓住机会,勇敢行动!"
},
{
"level": "吉",
"color": "#FF6348",
"emoji": "✨",
"description": "运势不错,事情会朝着好的方向发展。",
"advice": "保持积极心态,好运自然来。"
},
{
"level": "中吉",
"color": "#FFA502",
"emoji": "🍀",
"description": "平稳的一天,虽无大喜但也无大忧。",
"advice": "脚踏实地,稳中求进。"
},
{
"level": "小吉",
"color": "#F79F1F",
"emoji": "🌈",
"description": "有一些小确幸会出现,注意把握。",
"advice": "留心身边的小美好。"
},
{
"level": "平",
"color": "#A3A3A3",
"emoji": "☁️",
"description": "平淡的一天,没有特别的起伏。",
"advice": "平常心对待,顺其自然。"
},
{
"level": "小凶",
"color": "#747D8C",
"emoji": "🌧️",
"description": "可能会遇到一些小困难,需要谨慎应对。",
"advice": "小心行事,三思而后行。"
},
{
"level": "凶",
"color": "#57606F",
"emoji": "⚡",
"description": "今天不太顺利,建议低调行事。",
"advice": "韬光养晦,静待时机。"
}
],
"tarot": [
{
"name": "愚者",
"emoji": "🃏",
"meaning": "新的开始、冒险、天真",
"advice": "勇敢踏出第一步,迎接新的旅程。"
},
{
"name": "魔术师",
"emoji": "🎩",
"meaning": "创造力、技能、意志力",
"advice": "发挥你的才能,创造属于自己的奇迹。"
},
{
"name": "女祭司",
"emoji": "🔮",
"meaning": "直觉、神秘、内在智慧",
"advice": "倾听内心的声音,答案就在你心中。"
},
{
"name": "皇后",
"emoji": "👑",
"meaning": "丰盛、养育、美好",
"advice": "享受生活的美好,善待自己和他人。"
},
{
"name": "皇帝",
"emoji": "⚔️",
"meaning": "权威、秩序、掌控",
"advice": "建立规则,掌控局面。"
},
{
"name": "恋人",
"emoji": "💕",
"meaning": "爱情、选择、和谐",
"advice": "跟随你的心,做出正确的选择。"
},
{
"name": "战车",
"emoji": "🏎️",
"meaning": "胜利、决心、前进",
"advice": "坚定信念,勇往直前。"
},
{
"name": "力量",
"emoji": "💪",
"meaning": "勇气、耐心、内在力量",
"advice": "发掘内在的力量,温柔而坚定。"
},
{
"name": "隐士",
"emoji": "🕯️",
"meaning": "内省、寻找、孤独",
"advice": "静下心来,寻找内心的答案。"
},
{
"name": "命运之轮",
"emoji": "🎡",
"meaning": "转变、命运、循环",
"advice": "接受变化,一切都在轮转中。"
},
{
"name": "正义",
"emoji": "⚖️",
"meaning": "公平、真相、因果",
"advice": "坚持正义,真相终会大白。"
},
{
"name": "星星",
"emoji": "⭐",
"meaning": "希望、灵感、宁静",
"advice": "保持希望,光明就在前方。"
},
{
"name": "月亮",
"emoji": "🌙",
"meaning": "潜意识、幻想、不确定",
"advice": "信任直觉,但要分辨幻想与现实。"
},
{
"name": "太阳",
"emoji": "☀️",
"meaning": "快乐、成功、活力",
"advice": "享受阳光,分享你的快乐。"
},
{
"name": "世界",
"emoji": "🌍",
"meaning": "完成、成就、圆满",
"advice": "庆祝你的成就,准备迎接新的循环。"
}
]
}
"fortunes": [
{
"level": "大吉",
"color": "#FF4757",
"emoji": "🌟",
"description": "今天运势爆棚!做什么都顺利,是实现愿望的好日子!",
"advice": "抓住机会,勇敢行动!"
},
{
"level": "吉",
"color": "#FF6348",
"emoji": "✨",
"description": "运势不错,事情会朝着好的方向发展。",
"advice": "保持积极心态,好运自然来。"
},
{
"level": "中吉",
"color": "#FFA502",
"emoji": "🍀",
"description": "平稳的一天,虽无大喜但也无大忧。",
"advice": "脚踏实地,稳中求进。"
},
{
"level": "小吉",
"color": "#F79F1F",
"emoji": "🌈",
"description": "有一些小确幸会出现,注意把握。",
"advice": "留心身边的小美好。"
},
{
"level": "平",
"color": "#A3A3A3",
"emoji": "☁️",
"description": "平淡的一天,没有特别的起伏。",
"advice": "平常心对待,顺其自然。"
},
{
"level": "小凶",
"color": "#747D8C",
"emoji": "🌧️",
"description": "可能会遇到一些小困难,需要谨慎应对。",
"advice": "小心行事,三思而后行。"
},
{
"level": "凶",
"color": "#57606F",
"emoji": "⚡",
"description": "今天不太顺利,建议低调行事。",
"advice": "韬光养晦,静待时机。"
}
],
"tarot": [
{
"name": "愚者",
"emoji": "🃏",
"meaning": "新的开始、冒险、天真",
"advice": "勇敢踏出第一步,迎接新的旅程。"
},
{
"name": "魔术师",
"emoji": "🎩",
"meaning": "创造力、技能、意志力",
"advice": "发挥你的才能,创造属于自己的奇迹。"
},
{
"name": "女祭司",
"emoji": "🔮",
"meaning": "直觉、神秘、内在智慧",
"advice": "倾听内心的声音,答案就在你心中。"
},
{
"name": "皇后",
"emoji": "👑",
"meaning": "丰盛、养育、美好",
"advice": "享受生活的美好,善待自己和他人。"
},
{
"name": "皇帝",
"emoji": "⚔️",
"meaning": "权威、秩序、掌控",
"advice": "建立规则,掌控局面。"
},
{
"name": "恋人",
"emoji": "💕",
"meaning": "爱情、选择、和谐",
"advice": "跟随你的心,做出正确的选择。"
},
{
"name": "战车",
"emoji": "🏎️",
"meaning": "胜利、决心、前进",
"advice": "坚定信念,勇往直前。"
},
{
"name": "力量",
"emoji": "💪",
"meaning": "勇气、耐心、内在力量",
"advice": "发掘内在的力量,温柔而坚定。"
},
{
"name": "隐士",
"emoji": "🕯️",
"meaning": "内省、寻找、孤独",
"advice": "静下心来,寻找内心的答案。"
},
{
"name": "命运之轮",
"emoji": "🎡",
"meaning": "转变、命运、循环",
"advice": "接受变化,一切都在轮转中。"
},
{
"name": "正义",
"emoji": "⚖️",
"meaning": "公平、真相、因果",
"advice": "坚持正义,真相终会大白。"
},
{
"name": "星星",
"emoji": "⭐",
"meaning": "希望、灵感、宁静",
"advice": "保持希望,光明就在前方。"
},
{
"name": "月亮",
"emoji": "🌙",
"meaning": "潜意识、幻想、不确定",
"advice": "信任直觉,但要分辨幻想与现实。"
},
{
"name": "太阳",
"emoji": "☀️",
"meaning": "快乐、成功、活力",
"advice": "享受阳光,分享你的快乐。"
},
{
"name": "世界",
"emoji": "🌍",
"meaning": "完成、成就、圆满",
"advice": "庆祝你的成就,准备迎接新的循环。"
}
]
}

View File

@@ -1,5 +1,6 @@
{
"blacklist": [],
"description": "成语接龙游戏全局黑名单,被拒绝的词语将永久不可使用"
"blacklist": [],
"description": "成语接龙游戏全局黑名单,被拒绝的词语将永久不可使用"
}

View File

@@ -1,125 +1,126 @@
{
"questions": [
{
"id": 1,
"question": "Python之父是谁",
"answer": "Guido van Rossum",
"keywords": ["Guido", "吉多", "van Rossum"],
"hint": "荷兰程序员创建了Python语言",
"category": "编程"
},
{
"id": 2,
"question": "世界上最高的山峰是什么?",
"answer": "珠穆朗玛峰",
"keywords": ["珠穆朗玛", "珠峰", "Everest", "Mt. Everest"],
"hint": "位于喜马拉雅山脉",
"category": "地理"
},
{
"id": 3,
"question": "一年有多少天?",
"answer": "365",
"keywords": ["365", "三百六十五"],
"hint": "平年的天数",
"category": "常识"
},
{
"id": 4,
"question": "中国的首都是哪个城市?",
"answer": "北京",
"keywords": ["北京", "Beijing"],
"hint": "位于华北地区",
"category": "地理"
},
{
"id": 5,
"question": "光速是多少?",
"answer": "300000",
"keywords": ["300000", "30万", "3*10^8", "3e8"],
"hint": "单位:千米/秒约30万",
"category": "物理"
},
{
"id": 6,
"question": "世界上最大的海洋是什么?",
"answer": "太平洋",
"keywords": ["太平洋", "Pacific"],
"hint": "占地球表面积约46%",
"category": "地理"
},
{
"id": 7,
"question": "一个字节(Byte)等于多少位(bit)",
"answer": "8",
"keywords": ["8", "八", "8bit"],
"hint": "计算机基础知识",
"category": "计算机"
},
{
"id": 8,
"question": "人类的正常体温约是多少摄氏度?",
"answer": "37",
"keywords": ["37", "三十七", "36.5", "37度"],
"hint": "36-37度之间",
"category": "生物"
},
{
"id": 9,
"question": "HTTP协议默认使用哪个端口",
"answer": "80",
"keywords": ["80", "八十"],
"hint": "HTTPS使用443",
"category": "计算机"
},
{
"id": 10,
"question": "一个小时有多少分钟?",
"answer": "60",
"keywords": ["60", "六十"],
"hint": "基础时间单位",
"category": "常识"
},
{
"id": 11,
"question": "太阳系中最大的行星是什么?",
"answer": "木星",
"keywords": ["木星", "Jupiter"],
"hint": "体积和质量都是最大的",
"category": "天文"
},
{
"id": 12,
"question": "二进制中10等于十进制的多少",
"answer": "2",
"keywords": ["2", "二", "两"],
"hint": "1*2^1 + 0*2^0",
"category": "数学"
},
{
"id": 13,
"question": "中国有多少个省级行政区?",
"answer": "34",
"keywords": ["34", "三十四"],
"hint": "23个省+5个自治区+4个直辖市+2个特别行政区",
"category": "地理"
},
{
"id": 14,
"question": "圆周率π约等于多少?(保留两位小数)",
"answer": "3.14",
"keywords": ["3.14", "3.1415", "3.14159"],
"hint": "圆的周长与直径的比值",
"category": "数学"
},
{
"id": 15,
"question": "世界上使用人数最多的语言是什么?",
"answer": "中文",
"keywords": ["中文", "汉语", "Chinese", "普通话"],
"hint": "中国的官方语言",
"category": "语言"
}
]
}
"questions": [
{
"id": 1,
"question": "Python之父是谁",
"answer": "Guido van Rossum",
"keywords": ["Guido", "吉多", "van Rossum"],
"hint": "荷兰程序员创建了Python语言",
"category": "编程"
},
{
"id": 2,
"question": "世界上最高的山峰是什么?",
"answer": "珠穆朗玛峰",
"keywords": ["珠穆朗玛", "珠峰", "Everest", "Mt. Everest"],
"hint": "位于喜马拉雅山脉",
"category": "地理"
},
{
"id": 3,
"question": "一年有多少天?",
"answer": "365",
"keywords": ["365", "三百六十五"],
"hint": "平年的天数",
"category": "常识"
},
{
"id": 4,
"question": "中国的首都是哪个城市?",
"answer": "北京",
"keywords": ["北京", "Beijing"],
"hint": "位于华北地区",
"category": "地理"
},
{
"id": 5,
"question": "光速是多少?",
"answer": "300000",
"keywords": ["300000", "30万", "3*10^8", "3e8"],
"hint": "单位:千米/秒约30万",
"category": "物理"
},
{
"id": 6,
"question": "世界上最大的海洋是什么?",
"answer": "太平洋",
"keywords": ["太平洋", "Pacific"],
"hint": "占地球表面积约46%",
"category": "地理"
},
{
"id": 7,
"question": "一个字节(Byte)等于多少位(bit)",
"answer": "8",
"keywords": ["8", "八", "8bit"],
"hint": "计算机基础知识",
"category": "计算机"
},
{
"id": 8,
"question": "人类的正常体温约是多少摄氏度?",
"answer": "37",
"keywords": ["37", "三十七", "36.5", "37度"],
"hint": "36-37度之间",
"category": "生物"
},
{
"id": 9,
"question": "HTTP协议默认使用哪个端口",
"answer": "80",
"keywords": ["80", "八十"],
"hint": "HTTPS使用443",
"category": "计算机"
},
{
"id": 10,
"question": "一个小时有多少分钟?",
"answer": "60",
"keywords": ["60", "六十"],
"hint": "基础时间单位",
"category": "常识"
},
{
"id": 11,
"question": "太阳系中最大的行星是什么?",
"answer": "木星",
"keywords": ["木星", "Jupiter"],
"hint": "体积和质量都是最大的",
"category": "天文"
},
{
"id": 12,
"question": "二进制中10等于十进制的多少",
"answer": "2",
"keywords": ["2", "二", "两"],
"hint": "1*2^1 + 0*2^0",
"category": "数学"
},
{
"id": 13,
"question": "中国有多少个省级行政区?",
"answer": "34",
"keywords": ["34", "三十四"],
"hint": "23个省+5个自治区+4个直辖市+2个特别行政区",
"category": "地理"
},
{
"id": 14,
"question": "圆周率π约等于多少?(保留两位小数)",
"answer": "3.14",
"keywords": ["3.14", "3.1415", "3.14159"],
"hint": "圆的周长与直径的比值",
"category": "数学"
},
{
"id": 15,
"question": "世界上使用人数最多的语言是什么?",
"answer": "中文",
"keywords": ["中文", "汉语", "Chinese", "普通话"],
"hint": "中国的官方语言",
"category": "语言"
}
]
}

75
diagnose_checkin.py Normal file
View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""签到问题诊断脚本"""
import os
import sys
import sqlite3
from pathlib import Path
from datetime import datetime
import time
def diagnose_checkin_issue():
"""诊断签到问题"""
print("=== 签到问题诊断 ===")
# 1. 检查数据库文件
db_path = "data/bot.db"
print(f"1. 数据库文件路径: {db_path}")
print(f" 文件存在: {os.path.exists(db_path)}")
if os.path.exists(db_path):
stat = os.stat(db_path)
print(f" 文件大小: {stat.st_size} bytes")
print(f" 文件权限: {oct(stat.st_mode)}")
print(f" 最后修改: {datetime.fromtimestamp(stat.st_mtime)}")
# 2. 检查数据库目录
db_dir = Path(db_path).parent
print(f"2. 数据库目录: {db_dir}")
print(f" 目录存在: {db_dir.exists()}")
print(f" 目录可写: {os.access(db_dir, os.W_OK)}")
# 3. 测试数据库连接
print("3. 测试数据库连接...")
try:
conn = sqlite3.connect(db_path, timeout=10.0)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
print(f" 数据库表: {[table[0] for table in tables]}")
# 检查用户积分表
if ('user_points',) in tables:
cursor.execute("SELECT COUNT(*) FROM user_points")
count = cursor.fetchone()[0]
print(f" 用户积分记录数: {count}")
if count > 0:
cursor.execute("SELECT user_id, available_points FROM user_points LIMIT 3")
users = cursor.fetchall()
print(f" 示例用户积分: {users}")
conn.close()
print(" 数据库连接: 成功")
except Exception as e:
print(f" 数据库连接: 失败 - {e}")
# 4. 检查磁盘空间
print("4. 检查磁盘空间...")
try:
statvfs = os.statvfs(db_dir)
free_space = statvfs.f_frsize * statvfs.f_bavail
total_space = statvfs.f_frsize * statvfs.f_blocks
print(f" 可用空间: {free_space / (1024*1024):.1f} MB")
print(f" 总空间: {total_space / (1024*1024):.1f} MB")
except Exception as e:
print(f" 磁盘空间检查失败: {e}")
# 5. 检查进程权限
print("5. 检查进程权限...")
print(f" 当前用户: {os.getuid() if hasattr(os, 'getuid') else 'N/A'}")
print(f" 当前组: {os.getgid() if hasattr(os, 'getgid') else 'N/A'}")
print("=== 诊断完成 ===")
if __name__ == "__main__":
diagnose_checkin_issue()

287
games/adventure.py Normal file
View File

@@ -0,0 +1,287 @@
"""冒险系统游戏模块"""
import random
import time
import logging
from datetime import datetime
from games.base import BaseGame
from utils.parser import CommandParser
from core.database import get_db
from typing import *
logger = logging.getLogger(__name__)
class AdventureGame(BaseGame):
"""冒险系统游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.db = get_db()
# 奖品池配置
self.prize_pool: List[Tuple[int, float, str]] = [
# (权重, 倍率, 描述)
(500, 0.5, "少量积分"),
(350, 1, "中等积分"),
(200, 2, "大量积分"),
(100, 5, "丰厚积分"),
(50, 10, "丰厚积分"),
(10, 100, "🌟 巨额积分"),
(1, 1000, "💎 传说积分"),
]
self.total_weight: int = 0
for weight,_,_ in self.prize_pool:
self.total_weight += weight
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理冒险相关指令
Args:
command: 指令,如 ".adventure", ".adventure 10", ".adventure stats"
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip().lower()
# 冒险说明
if args in ['help', '帮助', 'info']:
return self._get_adventure_help()
# 放弃当前冒险(按最低倍率结算已冒险时间)
if args in ['abandon', '放弃']:
return await self._abandon_adventure(chat_id, user_id)
# 默认冒险耗时1分钟
else:
# 解析消耗时间
cost_time = 1 # 默认消耗1分钟
if args.isdigit():
cost_time = int(args)
return await self._perform_adventure(chat_id, user_id, cost_time)
except Exception as e:
logger.error(f"处理冒险指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
async def _perform_adventure(self, chat_id: int, user_id: int, cost_time: int) -> str:
"""执行冒险耗时
Args:
chat_id: 会话ID使用0作为用户级标识
user_id: 用户ID
cost_time: 消耗时间(分钟)
Returns:
抽奖结果消息
"""
# 参数验证
if cost_time < 1:
return "❌ 冒险时间至少需要1分钟"
# 查询冒险状态使用chat_id=0表示用户级状态
state = self.db.get_game_state(0, user_id, 'adventure')
current_time = int(time.time())
# 情况1用户已有未完成的冒险
if state:
try:
state_data = state['state_data']
start_time = state_data.get('start_time', 0)
saved_cost_time = state_data.get('cost_time', 1)
end_time = start_time + saved_cost_time * 60
remaining_seconds = end_time - current_time
# 情况1.1:冒险已完成(时间已到或过期)
if remaining_seconds <= 0:
# 选择奖品池
prize_pool = self.prize_pool
# 执行抽奖
reward = self._draw_prize(prize_pool)
reward_points = int(reward['value'] * saved_cost_time)
# 处理奖励
self.db.add_points(user_id, reward_points, "adventure", f"冒险奖励")
# 删除冒险状态
self.db.delete_game_state(0, user_id, 'adventure')
# 获取更新后的积分信息
updated_points = self.db.get_user_points(user_id)
# 格式化输出
text = f"## ⚡️ 冒险结果\n\n"
text += f"**消耗时间**: {saved_cost_time} 分钟\n\n"
text += f"**{reward['description']}**: 获得 {reward_points} 积分\n\n"
text += f"**当前积分**: {updated_points['points']}\n\n"
text += "---\n\n"
text += "💡 提示:冒险进行时不能炼金!"
return text
# 情况1.2:冒险未完成,返回等待提示
remaining_minutes = remaining_seconds // 60
remaining_secs = remaining_seconds % 60
if remaining_minutes > 0:
wait_msg = f"{remaining_minutes}{remaining_secs}"
else:
wait_msg = f"{remaining_secs}"
text = f"## ⚡️ 冒险进行中\n\n"
text += f"你正在进行一次冒险,还需等待 **{wait_msg}** 才能完成。\n\n"
text += f"**当前冒险时长**: {saved_cost_time} 分钟\n\n"
text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,请耐心等待!"
return text
except Exception as e:
# 状态数据异常,删除损坏状态并允许重新开始
logger.error(f"冒险状态数据异常: {e}", exc_info=True)
self.db.delete_game_state(0, user_id, 'adventure')
# 继续执行到情况3开始新冒险
# 情况3用户没有冒险状态开始新的冒险
state_data = {
'start_time': current_time,
'cost_time': cost_time
}
# 保存冒险状态
self.db.save_game_state(0, user_id, 'adventure', state_data)
# 计算预计完成时间
end_time = current_time + cost_time * 60
end_datetime = datetime.fromtimestamp(end_time)
end_time_str = end_datetime.strftime('%H:%M:%S')
text = f"## ⚡️ 冒险开始\n\n"
text += f"你已经开始了冒险之旅,本次冒险将持续 **{cost_time}** 分钟。\n\n"
text += f"**预计完成时间**: {end_time_str}\n\n"
text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,完成后使用 `.adventure` 获取奖励!"
return text
async def _abandon_adventure(self, chat_id: int, user_id: int) -> str:
"""放弃当前冒险,按最低倍率结算已冒险时间
Args:
chat_id: 会话ID使用0作为用户级标识
user_id: 用户ID
Returns:
放弃结果消息
"""
try:
# 查询冒险状态
state = self.db.get_game_state(0, user_id, 'adventure')
if not state:
return "❌ 当前没有进行中的冒险,可使用 `.adventure` 开始新的冒险。"
state_data = state.get('state_data', {})
start_time = state_data.get('start_time')
cost_time = state_data.get('cost_time')
if start_time is None or cost_time is None:
# 状态异常,清理并提示
self.db.delete_game_state(0, user_id, 'adventure')
return "⚠️ 冒险状态异常已清理,请使用 `.adventure` 重新开始。"
current_time = int(time.time())
elapsed_seconds = max(0, current_time - int(start_time))
elapsed_minutes = elapsed_seconds // 60
if elapsed_minutes < 1:
elapsed_minutes = 1
# 计算最低倍率
try:
min_multiplier = min(m for _, m, _ in self.prize_pool)
except Exception:
# 兜底若奖池异常按0.5处理
min_multiplier = 0.5
reward_points = int(min_multiplier * elapsed_minutes)
if reward_points < 0:
reward_points = 0
# 发放奖励并清理状态
if reward_points > 0:
self.db.add_points(user_id, reward_points, "adventure", "冒险放弃奖励")
self.db.delete_game_state(0, user_id, 'adventure')
# 查询当前积分
updated_points = self.db.get_user_points(user_id)
# 输出
text = f"## ⚡️ 冒险放弃\n\n"
text += f"**已计入时间**: {elapsed_minutes} 分钟\n\n"
text += f"**最低倍率**: {min_multiplier}\n\n"
text += f"**获得积分**: {reward_points}\n\n"
text += f"**当前积分**: {updated_points['points']}\n\n"
text += "---\n\n"
text += "💡 提示:可随时使用 `.adventure` 再次踏上冒险之旅!"
return text
except Exception as e:
logger.error(f"放弃冒险时出错: {e}", exc_info=True)
# 失败时不影响原状态,返回提示
return f"❌ 放弃冒险失败:{str(e)}"
def _draw_prize(self, prize_pool: list) -> dict:
"""从奖品池中抽取奖品
Args:
prize_pool: 奖品池,格式为 (权重, 倍率, 描述)
Returns:
奖品信息,格式为 {'value': 倍率, 'description': 描述}
"""
# 生成随机数
rand = random.random()*self.total_weight
cumulative_prob = 0.0
for weight, multiplier, description in prize_pool:
cumulative_prob += weight
if rand <= cumulative_prob:
return {
'value': multiplier,
'description': description
}
# 兜底返回第一个奖品
return {
'value': prize_pool[0][1],
'description': prize_pool[0][2]
}
def _get_adventure_help(self) -> str:
"""获取冒险帮助信息
Returns:
帮助信息消息
"""
text = f"## ⚡️ 冒险系统\n\n"
text += f"### 基础用法\n"
text += f"- `.adventure` - 消耗1分钟进行冒险\n"
text += f"- `.adventure time` - 消耗time分钟进行冒险, 最少一分钟\n"
text += f"### 其他功能\n"
text += f"- `.adventure abandon` - 放弃当前冒险,按最低倍率结算已冒险时间\n"
text += f"- `.adventure 放弃` - 放弃当前冒险,按最低倍率结算已冒险时间\n"
text += f"- `.adventure help` - 查看帮助\n\n"
return text
def get_help(self) -> str:
"""获取帮助信息"""
return self._get_adventure_help()

530
games/ai_chat.py Normal file
View File

@@ -0,0 +1,530 @@
"""AI对话游戏模块"""
import json
import logging
import asyncio
import time
from pathlib import Path
from typing import Optional, Dict, Any, List
from games.base import BaseGame
from utils.parser import CommandParser
logger = logging.getLogger(__name__)
# 全局字典存储每个chat_id的延迟任务句柄
_pending_tasks: Dict[int, asyncio.Task] = {}
# 全局字典存储每个chat_id的待处理消息队列
_message_queues: Dict[int, List[Dict[str, Any]]] = {}
# 全局字典存储每个chat_id的ChatEngine实例
_chat_engines: Dict[int, Any] = {}
class AIChatGame(BaseGame):
"""AI对话游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.config_file = Path(__file__).parent.parent / "data" / "ai_config.json"
self.wait_window = 10 # 固定10秒等待窗口
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理AI对话指令
Args:
command: 指令,如 ".ai 问题"".aiconfig host=xxx port=xxx model=xxx"
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取指令和参数
cmd, args = CommandParser.extract_command_args(command)
args = args.strip()
# 判断是配置指令还是AI对话指令
if cmd == '.aiconfig':
return await self._handle_config(args, chat_id, user_id)
else:
# .ai 指令
return await self._handle_ai(args, chat_id, user_id)
except Exception as e:
logger.error(f"处理AI对话指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
async def _handle_ai(self, content: str, chat_id: int, user_id: int) -> str:
"""处理AI对话请求
Args:
content: 消息内容
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
# 如果内容为空,返回帮助信息
if not content:
return self.get_help()
# 将消息加入队列
self._add_to_queue(chat_id, user_id, content)
# 取消旧的延迟任务(如果存在)
if chat_id in _pending_tasks:
old_task = _pending_tasks[chat_id]
if not old_task.done():
old_task.cancel()
try:
await old_task
except asyncio.CancelledError:
pass
# 创建新的延迟任务
task = asyncio.create_task(self._delayed_response(chat_id))
_pending_tasks[chat_id] = task
# 不返回确认消息,静默处理
return ""
async def _handle_config(self, args: str, chat_id: int, user_id: int) -> str:
"""处理配置请求
Args:
args: 配置参数,格式如 "host=localhost port=11434 model=llama3.1"
chat_id: 会话ID
user_id: 用户ID
Returns:
配置确认消息
"""
if not args:
return "❌ 请提供配置参数\n\n格式:`.aiconfig host=xxx port=xxx model=xxx`\n\n示例:`.aiconfig host=localhost port=11434 model=llama3.1`"
# 解析配置参数
config_updates = {}
parts = args.split()
for part in parts:
if '=' in part:
key, value = part.split('=', 1)
key = key.strip().lower()
value = value.strip()
if key == 'host':
config_updates['host'] = value
elif key == 'port':
try:
config_updates['port'] = int(value)
except ValueError:
return f"❌ 端口号必须是数字:{value}"
elif key == 'model':
config_updates['model'] = value
else:
return f"❌ 未知的配置项:{key}\n\n支持的配置项host, port, model"
if not config_updates:
return "❌ 未提供有效的配置参数"
# 加载现有配置
current_config = self._load_config()
# 更新配置
current_config.update(config_updates)
# 保存配置
if self._save_config(current_config):
# 清除所有ChatEngine缓存配置变更需要重新创建
_chat_engines.clear()
return f"✅ 配置已更新\n\n**当前配置**\n- 地址:{current_config['host']}\n- 端口:{current_config['port']}\n- 模型:{current_config['model']}"
else:
return "❌ 保存配置失败,请稍后重试"
def _add_to_queue(self, chat_id: int, user_id: int, content: str) -> None:
"""将消息加入等待队列
Args:
chat_id: 会话ID
user_id: 用户ID
content: 消息内容
"""
if chat_id not in _message_queues:
_message_queues[chat_id] = []
_message_queues[chat_id].append({
"user_id": user_id,
"content": content,
"timestamp": int(time.time())
})
async def _delayed_response(self, chat_id: int) -> None:
"""延迟回答任务
Args:
chat_id: 会话ID
"""
try:
# 等待固定时间窗口
await asyncio.sleep(self.wait_window)
# 检查队列中是否有消息
if chat_id in _message_queues and _message_queues[chat_id]:
# 生成回答
response = await self._generate_response(chat_id)
# 清空队列
_message_queues[chat_id] = []
# 发送回答
if response:
from utils.message import get_message_sender
sender = get_message_sender()
await sender.send_text(response)
# 从pending_tasks中移除任务句柄
if chat_id in _pending_tasks:
del _pending_tasks[chat_id]
except asyncio.CancelledError:
# 任务被取消,正常情况,不需要记录错误
logger.debug(f"延迟任务被取消: chat_id={chat_id}")
if chat_id in _pending_tasks:
del _pending_tasks[chat_id]
except Exception as e:
logger.error(f"延迟回答任务错误: {e}", exc_info=True)
if chat_id in _pending_tasks:
del _pending_tasks[chat_id]
async def _generate_response(self, chat_id: int) -> Optional[str]:
"""使用LLM生成回答
Args:
chat_id: 会话ID
Returns:
回答文本
"""
try:
# 获取队列消息
if chat_id not in _message_queues or not _message_queues[chat_id]:
return None
messages = _message_queues[chat_id].copy()
# 获取ChatEngine实例
chat_engine = self._get_chat_engine(chat_id)
if not chat_engine:
return "❌ AI服务初始化失败请检查配置"
# 将消息按用户角色格式化并添加到ChatMemoryBuffer
# 构建合并的消息内容(包含用户信息)
merged_content = ""
for msg in messages:
user_id = msg['user_id']
role = self._get_user_role(chat_id, user_id)
merged_content += f"[{role}]: {msg['content']}\n"
# 去掉最后的换行
merged_content = merged_content.strip()
# 调用ChatEngine生成回答
# chat_engine是一个字典包含llm, memory, system_prompt
llm = chat_engine['llm']
memory = chat_engine['memory']
system_prompt = chat_engine['system_prompt']
# 构建完整的消息(包含系统提示和历史对话)
full_message = f"{system_prompt}\n\n{merged_content}"
# 使用LLM生成回答同步调用在线程池中执行
response = await asyncio.to_thread(llm.complete, full_message)
# 返回回答文本
return str(response)
except Exception as e:
error_msg = str(e)
logger.error(f"生成AI回答错误: {e}", exc_info=True)
# 获取当前配置以便在错误消息中显示
try:
config = self._load_config()
config_info = f"\n\n当前配置:\n- 地址: {config['host']}\n- 端口: {config['port']}\n- 模型: {config['model']}"
except:
config_info = ""
# 提供更友好的错误消息
if "Server disconnected" in error_msg or "RemoteProtocolError" in error_msg:
config = self._load_config()
test_cmd = f"curl -X POST http://{config.get('host', 'localhost')}:{config.get('port', 11434)}/api/generate -d '{{\"model\": \"{config.get('model', 'qwen3:0.6b')}\", \"prompt\": \"你是谁\", \"stream\": false}}'"
return f"❌ AI服务连接失败请检查\n1. Ollama服务是否已启动在笔记本上\n2. NPS端口转发是否正常工作\n3. 配置的地址是否为服务器IP不是localhost\n4. 模型名称是否正确\n\n测试命令(在服务器上执行):\n{test_cmd}{config_info}\n\n使用 `.aiconfig` 命令检查和修改配置"
elif "ConnectionError" in error_msg or "ConnectTimeout" in error_msg or "actively refused" in error_msg.lower() or "Empty reply" in error_msg:
return f"❌ 无法连接到Ollama服务连接被拒绝\n\n根据NPS日志笔记本上的Ollama服务拒绝了连接。\n\n**解决方案**\n\n1. **检查Ollama是否运行**\n 在笔记本上运行:`tasklist | findstr ollama` 或 `Get-Process | Where-Object {{$_.ProcessName -like '*ollama*'}}`\n\n2. **设置Ollama监听地址**(重要!):\n 在Windows上Ollama默认只监听127.0.0.1需要设置为0.0.0.0才能被NPS访问\n \n 方法A设置环境变量推荐\n 1. 打开系统环境变量设置\n 2. 添加环境变量:`OLLAMA_HOST=0.0.0.0:11434`\n 3. 重启Ollama服务或重启电脑\n \n 方法B在命令行启动时指定\n ```powershell\n $env:OLLAMA_HOST=\"0.0.0.0:11434\"\n ollama serve\n ```\n\n3. **检查Windows防火墙**\n 确保允许11434端口的入站连接\n\n4. **验证监听地址**\n 在笔记本上运行:`netstat -an | findstr 11434`\n 应该看到 `0.0.0.0:11434` 而不是 `127.0.0.1:11434`\n\n5. **测试本地连接**\n 在笔记本上运行:`curl http://localhost:11434/api/tags`\n 应该返回模型列表\n\n提示如果使用NPS转发配置中的host应该是服务器的IP地址不是localhost{config_info}\n\n使用 `.aiconfig host=服务器IP` 修改配置"
elif "timeout" in error_msg.lower():
return f"❌ AI服务响应超时请稍后重试{config_info}"
else:
return f"❌ 生成回答时出错: {error_msg}{config_info}"
def _get_chat_engine(self, chat_id: int) -> Any:
"""获取或创建ChatEngine实例
Args:
chat_id: 会话ID
Returns:
ChatEngine实例
"""
# 检查是否已存在
if chat_id in _chat_engines:
return _chat_engines[chat_id]
try:
# 加载配置
config = self._load_config()
# 导入llama_index模块
from llama_index.llms.ollama import Ollama
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core import ChatPromptTemplate, Settings
# 创建Ollama LLM实例
# 添加超时设置以避免长时间等待
llm = Ollama(
model=config['model'],
base_url=f"http://{config['host']}:{config['port']}",
timeout=120.0 # 120秒超时
)
# 设置全局LLM
Settings.llm = llm
# 创建ChatMemoryBuffer设置足够的token_limit确保保留30+轮对话)
memory = ChatMemoryBuffer.from_defaults(token_limit=8000)
# 从配置文件加载系统提示词(如果存在),否则使用默认值并保存
system_prompt = self._get_system_prompt()
# 创建对话引擎
# 由于llama_index的API可能在不同版本有变化这里使用基本的chat接口
# 实际使用时可能需要根据llama_index的版本调整
chat_engine = {
'llm': llm,
'memory': memory,
'system_prompt': system_prompt
}
# 存储到全局字典
_chat_engines[chat_id] = chat_engine
return chat_engine
except ImportError as e:
logger.error(f"导入llama_index模块失败: {e}")
return None
except Exception as e:
logger.error(f"创建ChatEngine失败: {e}", exc_info=True)
return None
def _get_user_role(self, chat_id: int, user_id: int) -> str:
"""获取用户角色名称(创建或获取映射)
Args:
chat_id: 会话ID
user_id: 用户ID
Returns:
角色名称
"""
# 获取现有映射
user_mapping, user_count = self._get_user_mapping(chat_id)
user_id_str = str(user_id)
# 如果用户已存在,返回角色名称
if user_id_str in user_mapping:
return user_mapping[user_id_str]
# 新用户,分配角色
user_count += 1
role_name = f"用户{user_count}"
user_mapping[user_id_str] = role_name
# 保存到数据库
state_data = {
"user_mapping": user_mapping,
"user_count": user_count
}
self.db.save_game_state(chat_id, 0, 'ai_chat', state_data)
return role_name
def _get_user_mapping(self, chat_id: int) -> tuple[Dict[str, str], int]:
"""获取用户角色映射和计数
Args:
chat_id: 会话ID
Returns:
(用户映射字典, 用户计数)
"""
# 从数据库获取映射
state = self.db.get_game_state(chat_id, 0, 'ai_chat')
if state and state.get('state_data'):
user_mapping = state['state_data'].get('user_mapping', {})
user_count = state['state_data'].get('user_count', 0)
else:
user_mapping = {}
user_count = 0
return user_mapping, user_count
def _get_default_system_prompt(self) -> str:
"""获取默认系统提示词
Returns:
默认系统提示词
"""
return (
"你是WPS游戏机器人的AI助手这是一个多功能的游戏和娱乐机器人系统。"
"这是一个多用户对话场景,不同用户的发言会用不同的角色标识(如'用户1''用户2'等)。"
"你需要理解不同用户的发言内容,并根据上下文给出合适的回复。"
"\n\n"
"**你的身份和职责:**\n"
"1. 你是WPS协作平台的游戏机器人AI助手\n"
"2. 你可以帮助用户了解和使用机器人的各种功能\n"
"3. 你可以回答用户的问题,提供游戏指导和建议\n"
"4. 当用户询问功能时,主动推荐相关游戏和功能\n"
"\n\n"
"**机器人支持的功能:**\n"
"1. 🎲 骰娘系统:`.r XdY` - 掷骰子游戏\n"
"2. ✊ 石头剪刀布:`.rps 石头/剪刀/布` - 对战游戏\n"
"3. 🔮 运势占卜:`.fortune` 或 `.运势` - 查看今日运势\n"
"4. 🔢 猜数字:`.guess start` - 开始猜数字游戏\n"
"5. 📝 问答游戏:`.quiz` - 回答问题挑战\n"
"6. 🀄 成语接龙:`.idiom start` - 成语接龙游戏\n"
"7. ⚫ 五子棋:`.gomoku challenge` - 发起五子棋对战\n"
"8. 💎 积分系统:`.points` - 查看积分,`.checkin` - 每日签到\n"
"9. ⚗️ 炼金系统:`.alchemy` - 消耗积分进行炼金\n"
"10. ⚡️ 冒险系统:`.adventure` - 消耗时间进行冒险\n"
"11. 🎁 积分赠送:`.gift <用户ID> <积分>` - 赠送积分给其他用户\n"
"12. 📊 统计信息:`.stats` - 查看个人游戏统计\n"
"13. ❓ 帮助信息:`.help` - 查看完整的帮助文档\n"
"\n\n"
"**回复指南:**\n"
"- 用自然、友好、热情的方式与用户交流\n"
"- 当用户不知道玩什么时,主动推荐适合的游戏\n"
"- 详细解释功能的使用方法和规则\n"
"- 鼓励用户尝试不同的游戏和功能\n"
"- 如果是多人场景,可以推荐适合多人参与的游戏(如成语接龙、五子棋)\n"
"- 记住用户的偏好和之前的对话内容,提供个性化建议\n"
)
def _get_system_prompt(self) -> str:
"""获取系统提示词(从配置文件加载,如果不存在则使用默认值并保存)
Returns:
系统提示词
"""
config = self._load_config()
# 如果配置中存在系统提示词,直接返回
if 'system_prompt' in config and config['system_prompt']:
return config['system_prompt']
# 否则使用默认值并保存到配置文件
default_prompt = self._get_default_system_prompt()
config['system_prompt'] = default_prompt
self._save_config(config)
return default_prompt
def _load_config(self) -> Dict[str, Any]:
"""从JSON文件加载配置
Returns:
配置字典
"""
# 如果文件不存在,创建默认配置
if not self.config_file.exists():
default_config = {
"host": "localhost",
"port": 11434,
"model": "llama3.1"
}
self._save_config(default_config)
return default_config
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
# 确保所有必需的字段存在
if 'host' not in config:
config['host'] = "localhost"
if 'port' not in config:
config['port'] = 11434
if 'model' not in config:
config['model'] = "llama3.1"
return config
except Exception as e:
logger.error(f"加载配置文件失败: {e}", exc_info=True)
# 返回默认配置
return {
"host": "localhost",
"port": 11434,
"model": "llama3.1"
}
def _save_config(self, config: Dict[str, Any]) -> bool:
"""保存配置到JSON文件
Args:
config: 配置字典
Returns:
是否成功
"""
try:
# 确保目录存在
self.config_file.parent.mkdir(parents=True, exist_ok=True)
# 写入JSON文件
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=4, ensure_ascii=False)
return True
except Exception as e:
logger.error(f"保存配置文件失败: {e}", exc_info=True)
return False
def get_help(self) -> str:
"""获取帮助信息
Returns:
帮助文本
"""
return """## 🤖 AI对话系统帮助
### 基本用法
- `.ai <问题>` - 向AI提问支持多用户对话等待10秒后回答
- `.aiconfig host=xxx port=xxx model=xxx` - 配置Ollama服务地址和模型
### 配置示例
`.aiconfig host=localhost port=11434 model=llama3.1`
### 说明
- 多个用户可以在同一个会话中提问
- 系统会等待10秒收集所有问题后统一回答
- 如果在等待期间有新消息,会重新计时
---
💡 提示确保Ollama服务已启动并配置正确
"""

207
games/alchemy.py Normal file
View File

@@ -0,0 +1,207 @@
"""炼金系统游戏模块"""
import random
import time
import logging
from datetime import datetime
from games.base import BaseGame
from utils.parser import CommandParser
from core.database import get_db
from typing import *
logger = logging.getLogger(__name__)
class AlchemyGame(BaseGame):
"""炼金系统游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.db = get_db()
# 奖品池配置 - 确保数学期望等于消耗积分
self.prize_pool: List[Tuple[int, str, float, str]] = [
# (权重, 类型, 倍率, 描述)
(500, "penalty", 0, "炼金失败"),
(100, "penalty", -1, "炼金爆炸"),
(100, "points", 0.1, "少量积分"),
(390, "points", 0.5, "少量积分"),
(500, "points", 1, "等值积分"),
(390, "points", 2, "丰厚积分"),
(200, "points", 5, "丰厚积分"),
(9, "points", 10, "🌟 巨额积分"),
(1, "points", 100, "💎 传说积分"),
]
self.total_weight: int = 0
for weight,_,_,_ in self.prize_pool:
self.total_weight += weight
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理炼金相关指令
Args:
command: 指令,如 ".alchemy", ".alchemy 10", ".alchemy stats"
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip().lower()
# 炼金说明
if args in ['help', '帮助', 'info']:
return self._get_alchemy_help()
# 默认:炼金抽奖
else:
# 解析消耗积分数量
cost_points = 10 # 默认消耗10积分
if args.isdigit():
cost_points = int(args)
return self._perform_alchemy(user_id, cost_points)
except Exception as e:
logger.error(f"处理炼金指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
def _perform_alchemy(self, user_id: int, cost_points: int) -> str:
"""执行炼金抽奖
Args:
user_id: 用户ID
cost_points: 消耗积分
Returns:
抽奖结果消息
"""
# 检查用户是否正在冒险中
state = self.db.get_game_state(0, user_id, 'adventure')
if state:
try:
state_data = state['state_data']
start_time = state_data.get('start_time', 0)
cost_time = state_data.get('cost_time', 1)
current_time = int(time.time())
end_time = start_time + cost_time * 60
remaining_seconds = end_time - current_time
# 如果冒险已完成,自动清理状态,允许炼金
if remaining_seconds <= 0:
self.db.delete_game_state(0, user_id, 'adventure')
else:
# 冒险未完成,返回错误提示
remaining_minutes = remaining_seconds // 60
remaining_secs = remaining_seconds % 60
if remaining_minutes > 0:
wait_msg = f"{remaining_minutes}{remaining_secs}"
else:
wait_msg = f"{remaining_secs}"
return f"❌ 冒险进行中,无法进行炼金!\n\n还需等待 **{wait_msg}** 才能完成冒险。\n\n💡 提示:冒险期间无法进行炼金,请先完成冒险!"
except Exception as e:
# 状态数据异常,删除损坏状态,允许继续
logger.error(f"冒险状态数据异常: {e}", exc_info=True)
self.db.delete_game_state(0, user_id, 'adventure')
# 检查用户积分是否足够
user_points = self.db.get_user_points(user_id)
if user_points['points'] < cost_points:
return f"❌ 积分不足!需要 {cost_points} 积分,当前可用 {user_points['points']} 积分"
# 选择奖品池
prize_pool = self.prize_pool
# 执行抽奖
reward = self._draw_prize(prize_pool)
reward_points = int(reward['value']*cost_points)
# 消费积分
if not self.db.consume_points(user_id, cost_points, "alchemy", f"炼金抽奖消耗"):
return "❌ 积分消费失败,请稍后重试"
# 处理奖励
if reward['type'] == 'points' and reward['value'] > 0:
# 获得积分奖励
self.db.add_points(user_id, reward_points, "alchemy", f"炼金奖励")
elif reward['type'] == 'penalty' and reward['value'] < 0:
# 负面奖励(扣分)
penalty_points = abs(reward_points)
self.db.consume_points(user_id, penalty_points, "alchemy", f"炼金失败")
# 炼金系统已简化,不再记录历史
# 获取更新后的积分信息
updated_points = self.db.get_user_points(user_id)
# 格式化输出
text = f"## ⚗️ 炼金结果\n\n"
text += f"**消耗积分**{cost_points}\n\n"
if reward['type'] == 'points':
text += f"**{reward['description']}**: 获得{reward_points} 积分\n\n"
elif reward['type'] == 'penalty':
text += f"**{reward['description']}**: 损失 {abs(reward_points)} 积分\n\n"
text += f"**当前积分**{updated_points['points']}\n\n"
text += "---\n\n"
text += "💡 提示:炼金有风险,投资需谨慎!"
return text
def _draw_prize(self, prize_pool: list) -> dict:
"""从奖品池中抽取奖品
Args:
prize_pool: 奖品池
Returns:
奖品信息
"""
# 生成随机数
rand = random.random()*self.total_weight
cumulative_prob = 0.0
for weight, reward_type, reward_value, description in prize_pool:
cumulative_prob += weight
if rand <= cumulative_prob:
return {
'type': reward_type,
'value': reward_value,
'description': description
}
# 兜底返回第一个奖品
return {
'type': prize_pool[0][1],
'value': prize_pool[0][2],
'description': prize_pool[0][3]
}
def _get_alchemy_help(self) -> str:
"""获取炼金帮助信息
Returns:
帮助信息消息
"""
text = f"## ⚗️ 炼金系统\n\n"
text += f"### 基础用法\n"
text += f"- `.alchemy` - 消耗10积分进行炼金\n"
text += f"- `.alchemy cost` - 消耗cost积分进行炼金, 最少1积分\n"
text += f"### 其他功能\n"
text += f"- `.alchemy help` - 查看帮助\n\n"
return text
def get_help(self) -> str:
"""获取帮助信息"""
return self._get_alchemy_help()

View File

@@ -82,6 +82,44 @@ def get_help_message() -> str:
- `.gomoku list` - 列出所有对战
- `.gomoku stats` - 查看战绩
### ⚔️ 三国杀
- `.sgs create` - 创建游戏
- `.sgs join` - 加入游戏
- `.sgs start` - 开始游戏
- `.sgs status` - 查看状态
- `.sgs help` - 查看帮助
### 💎 积分系统
- `.points` - 查看个人积分
- `.积分` - 查看个人积分
- `.checkin` - 每日签到
- `.签到` - 每日签到
- `.打卡` - 每日签到
- `.points leaderboard` - 积分排行榜
### ⚗️ 炼金系统
- `.alchemy` - 消耗10积分进行炼金
- `.炼金` - 消耗10积分进行炼金
- `.alchemy 20` - 消耗20积分进行炼金
- `.alchemy 50` - 消耗50积分进行炼金
### ⚡️ 冒险系统
- `.adventure` - 消耗1分钟进行冒险
- `.冒险` - 消耗1分钟进行冒险
- `.adventure 5` - 消耗5分钟进行冒险
- `.adventure abandon` - 放弃当前冒险,按最低倍率结算已冒险时间
- `.adventure 放弃` - 放弃当前冒险,按最低倍率结算已冒险时间
- `.adventure help` - 查看冒险帮助
### 🎁 积分赠送系统
- `.gift <用户ID> <积分数量> [消息]` - 赠送积分
- `.赠送 <用户ID> <积分数量> [消息]` - 赠送积分
- `.送 <用户ID> <积分数量> [消息]` - 赠送积分
### 🤖 AI对话系统
- `.ai <问题>` - 向AI提问支持多用户对话等待10秒后回答
- `.aiconfig host=xxx port=xxx model=xxx` - 配置Ollama服务地址和模型
### 其他
- `.help` - 显示帮助
- `.stats` - 查看个人统计

View File

@@ -7,6 +7,7 @@ from datetime import datetime
from pathlib import Path
from games.base import BaseGame
from utils.parser import CommandParser
from games.points import PointsGame
logger = logging.getLogger(__name__)
@@ -19,6 +20,7 @@ class FortuneGame(BaseGame):
super().__init__()
self._fortunes = None
self._tarot = None
self.points_game = PointsGame()
def _load_data(self):
"""懒加载运势数据"""
@@ -95,6 +97,11 @@ class FortuneGame(BaseGame):
# 重置随机seed
random.seed()
# 尝试获得积分奖励30%概率)
points_earned = 0
if random.random() < 0.3: # 30%概率获得积分
points_earned = self.points_game.add_fortune_points(user_id)
# 格式化输出
text = f"## 🔮 今日运势\n\n"
text += f"**日期**{today}\n\n"
@@ -103,8 +110,13 @@ class FortuneGame(BaseGame):
text += f"**建议**{fortune['advice']}\n\n"
text += f"**幸运数字**{lucky_number}\n\n"
text += f"**幸运颜色**{lucky_color}\n\n"
# 添加积分奖励信息
if points_earned > 0:
text += f"**🎁 积分奖励**+{points_earned}\n\n"
text += "---\n\n"
text += "💡 提示:运势仅供娱乐参考~"
text += "💡 提示:运势仅供娱乐参考,查看运势有机会获得积分奖励"
return text

268
games/gift.py Normal file
View File

@@ -0,0 +1,268 @@
"""积分赠送系统游戏模块"""
import logging
from datetime import datetime
from games.base import BaseGame
from utils.parser import CommandParser
from core.database import get_db
logger = logging.getLogger(__name__)
class GiftGame(BaseGame):
"""积分赠送系统游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.db = get_db()
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理积分赠送相关指令
Args:
command: 指令,如 ".gift 123 50 生日快乐", ".gift sent", ".gift received"
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip()
# 赠送帮助
if args in ['help', '帮助']:
return self._get_gift_help()
# 默认:执行赠送
else:
return self._process_gift_command(args, user_id)
except Exception as e:
logger.error(f"处理积分赠送指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
def _process_gift_command(self, args: str, sender_id: int) -> str:
"""处理赠送指令
Args:
args: 指令参数
sender_id: 发送者ID
Returns:
处理结果消息
"""
# 解析参数:.gift <receiver_identifier> <points> [message]
parts = args.split(maxsplit=2)
if len(parts) < 2:
return "❌ 指令格式错误!\n\n正确格式:`.gift <用户名或ID> <积分数量> [附赠消息]`\n\n示例:\n`.gift 张三 50 生日快乐`\n`.gift 123 50`\n`.gift 456 100`"
# 解析积分数量
try:
points = int(parts[1])
message = parts[2] if len(parts) > 2 else None
except ValueError:
return "❌ 积分数量必须是数字!"
# 检查第一部分是用户名还是ID
receiver_input = parts[0]
if receiver_input.isdigit():
# 是数字作为用户ID处理
receiver_id = int(receiver_input)
else:
# 是用户名,通过数据库查找
user = self.db.get_user_by_name(receiver_input)
if not user:
return f"❌ 未找到用户: {receiver_input}\n\n请确认用户名是否正确或使用用户ID。"
receiver_id = user['user_id']
# 获取接收者名称用于显示
receiver_user = self.db.get_or_create_user(receiver_id)
receiver_name = receiver_user.get('username', f"用户{receiver_id}")
# 获取发送者名称用于显示
sender_user = self.db.get_or_create_user(sender_id)
sender_name = sender_user.get('username', f"用户{sender_id}")
# 验证参数
if points <= 0:
return "❌ 赠送积分数量必须大于0"
if points > 1000:
return "❌ 单次赠送积分不能超过1000"
if sender_id == receiver_id:
return "❌ 不能赠送积分给自己!"
# 检查赠送者积分是否足够
sender_points = self.db.get_user_points(sender_id)
if sender_points['points'] < points:
return f"❌ 积分不足!需要 {points} 积分,当前可用 {sender_points['points']} 积分"
# 执行赠送(消费赠送者积分,增加接收者积分)
if (self.db.consume_points(sender_id, points, "gift_send", f"赠送积分给用户{receiver_id}") and
self.db.add_points(receiver_id, points, "gift_receive", f"收到用户{sender_id}的积分赠送")):
# 获取更新后的积分信息
sender_points_after = self.db.get_user_points(sender_id)
receiver_points_after = self.db.get_user_points(receiver_id)
text = f"## 🎁 积分赠送成功!\n\n"
text += f"**赠送者**{sender_name}\n\n"
text += f"**接收者**{receiver_name}\n\n"
text += f"**赠送积分**{points}\n\n"
if message:
text += f"**附赠消息**{message}\n\n"
text += f"**赠送者剩余积分**{sender_points_after['points']}\n\n"
text += f"**接收者当前积分**{receiver_points_after['points']}\n\n"
text += "---\n\n"
text += "💝 感谢您的慷慨赠送!"
return text
else:
return "❌ 赠送失败请检查积分是否足够或用户ID是否正确。"
def _get_gift_stats(self, user_id: int) -> str:
"""获取赠送统计信息
Args:
user_id: 用户ID
Returns:
统计信息消息
"""
stats = self.db.get_gift_stats(user_id)
if stats['sent_count'] == 0 and stats['received_count'] == 0:
return "📊 你还没有任何积分赠送记录哦~"
text = f"## 🎁 积分赠送统计\n\n"
text += f"**发送统计**\n"
text += f"- 赠送次数:{stats['sent_count']}\n"
text += f"- 赠送积分:{stats['total_sent']}\n\n"
text += f"**接收统计**\n"
text += f"- 接收次数:{stats['received_count']}\n"
text += f"- 接收积分:{stats['total_received']}\n\n"
net_gift = stats['net_gift']
if net_gift > 0:
text += f"**净收益**+{net_gift} 分 🎉\n\n"
elif net_gift < 0:
text += f"**净收益**{net_gift} 分 😅\n\n"
else:
text += f"**净收益**0 分 ⚖️\n\n"
text += "---\n\n"
text += "💡 提示:使用 `.gift sent` 查看发送记录,`.gift received` 查看接收记录"
return text
def _get_gift_records_sent(self, user_id: int, limit: int = 10) -> str:
"""获取发送的赠送记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
记录信息消息
"""
records = self.db.get_gift_records_sent(user_id, limit)
if not records:
return "📝 暂无发送记录"
text = f"## 🎁 发送记录(最近 {len(records)} 条)\n\n"
for record in records:
timestamp = datetime.fromtimestamp(record['created_at']).strftime('%m-%d %H:%M')
receiver_name = record.get('receiver_name', f"用户{record['receiver_id']}")
points = record['points']
message = record.get('message', '')
text += f"**{timestamp}** 赠送 {points} 分给 {receiver_name}\n"
if message:
text += f" 💬 {message}\n"
text += "\n"
return text
def _get_gift_records_received(self, user_id: int, limit: int = 10) -> str:
"""获取接收的赠送记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
记录信息消息
"""
records = self.db.get_gift_records_received(user_id, limit)
if not records:
return "📝 暂无接收记录"
text = f"## 🎁 接收记录(最近 {len(records)} 条)\n\n"
for record in records:
timestamp = datetime.fromtimestamp(record['created_at']).strftime('%m-%d %H:%M')
sender_name = record.get('sender_name', f"用户{record['sender_id']}")
points = record['points']
message = record.get('message', '')
text += f"**{timestamp}** 收到 {sender_name}{points}\n"
if message:
text += f" 💬 {message}\n"
text += "\n"
return text
def _get_gift_help(self) -> str:
"""获取赠送帮助信息
Returns:
帮助信息消息
"""
text = f"## 🎁 积分赠送系统\n\n"
text += f"### 基础用法\n"
text += f"- `.gift <用户名或ID> <积分数量> [附赠消息]` - 赠送积分\n"
text += f"- `.gift stats` - 查看赠送统计\n"
text += f"- `.gift sent` - 查看发送记录\n"
text += f"- `.gift received` - 查看接收记录\n"
text += f"- `.gift help` - 查看帮助\n\n"
text += f"### 赠送规则\n"
text += f"- **积分限制**单次最多赠送1000积分\n"
text += f"- **自赠限制**:不能赠送给自己\n"
text += f"- **积分检查**:必须有足够积分才能赠送\n"
text += f"- **附赠消息**可选最多100字符\n\n"
text += f"### 示例\n"
text += f"```\n"
text += f".gift 张三 50 生日快乐\n"
text += f".gift 123 50 (使用用户ID)\n"
text += f".gift 李四 100 感谢你的帮助\n"
text += f".gift 王五 200\n"
text += f".gift stats\n"
text += f"```\n\n"
text += f"### 说明\n"
text += f"- 支持使用用户名或用户ID进行赠送\n"
text += f"- 使用用户名需要先通过 `.register` 注册名称\n"
text += f"- 所有赠送都有完整记录\n"
text += f"- 赠送和接收都会在积分记录中显示\n"
text += f"- 赠送是单向的,无需对方确认\n"
return text
def get_help(self) -> str:
"""获取帮助信息"""
return self._get_gift_help()

184
games/points.py Normal file
View File

@@ -0,0 +1,184 @@
"""积分系统游戏模块"""
import random
import logging
from datetime import datetime
from games.base import BaseGame
from utils.parser import CommandParser
from core.database import get_db
logger = logging.getLogger(__name__)
class PointsGame(BaseGame):
"""积分系统游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.db = get_db()
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理积分相关指令
Args:
command: 指令,如 ".points", ".checkin", ".leaderboard"
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip().lower()
# 积分排行榜
if args in ['leaderboard', '排行榜', '排行']:
return self._get_leaderboard()
# 默认:每日签到
else:
return self._daily_checkin(user_id)
except Exception as e:
logger.error(f"处理积分指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
def _daily_checkin(self, user_id: int) -> str:
"""每日签到
Args:
user_id: 用户ID
Returns:
签到结果消息
"""
# 固定签到积分
checkin_points = 100
# 检查是否已签到
today = datetime.now().strftime('%Y-%m-%d')
if self.db.check_daily_checkin(user_id, today):
return self._get_user_points(user_id)
# 执行签到
try:
result = self.db.daily_checkin(user_id, checkin_points)
if result:
# 获取用户积分信息
points_info = self.db.get_user_points(user_id)
text = f"## ✅ 签到成功!\n\n"
text += f"**获得积分**+{checkin_points}\n\n"
text += f"**当前积分**{points_info['points']}\n\n"
text += f"📅 签到日期:{today}\n\n"
text += "💡 提示:每天签到可获得固定积分奖励!"
return text
else:
return "❌ 签到失败,请稍后重试"
except Exception as e:
logger.error(f"签到过程中发生错误: {e}", exc_info=True)
return f"❌ 签到过程中发生错误: {str(e)}"
def _get_user_points(self, user_id: int) -> str:
"""获取用户积分信息
Args:
user_id: 用户ID
Returns:
积分信息消息
"""
points_info = self.db.get_user_points(user_id)
text = f"## 💎 个人积分\n\n"
text += f"**当前积分**{points_info['points']}\n\n"
text += "---\n\n"
text += "💡 提示:\n"
text += "• 每日签到可获得 100 积分\n"
text += "• 查看运势可获得随机积分\n"
text += "• 使用 `.points leaderboard` 查看排行榜"
return text
def _get_leaderboard(self, limit: int = 10) -> str:
"""获取积分排行榜
Args:
limit: 限制数量
Returns:
排行榜消息
"""
leaderboard = self.db.get_points_leaderboard(limit)
if not leaderboard:
return "📊 暂无排行榜数据"
text = f"## 🏆 积分排行榜(前 {len(leaderboard)} 名)\n\n"
medals = ["🥇", "🥈", "🥉"] + ["🏅"] * (limit - 3)
for i, user in enumerate(leaderboard):
rank = i + 1
medal = medals[i] if i < len(medals) else "🏅"
username = user.get('username', f"用户{user['user_id']}")
points = user.get('points', 0)
text += f"{medal} **第 {rank} 名** {username}\n"
text += f" 积分:{points}\n\n"
text += "---\n\n"
text += "💡 提示:使用 `.points` 查看个人积分"
return text
def add_fortune_points(self, user_id: int) -> int:
"""为运势游戏添加随机积分奖励
Args:
user_id: 用户ID
Returns:
获得的积分数量
"""
# 随机积分范围50-200分
points = random.randint(50, 200)
if self.db.add_points(user_id, points, "fortune", "运势奖励"):
logger.info(f"用户 {user_id} 通过运势获得 {points} 积分")
return points
else:
logger.error(f"用户 {user_id} 运势积分奖励失败")
return 0
def get_help(self) -> str:
"""获取帮助信息"""
return """## 💎 积分系统
### 基础用法
- `.points` - 查看个人积分
- `.checkin` - 每日签到
- `.points leaderboard` - 积分排行榜
### 积分获取方式
- **每日签到**:固定 10 积分
- **运势占卜**:随机 1-20 积分
- **游戏奖励**:根据游戏表现获得
### 示例
```
.points
.checkin
.points leaderboard
```
### 说明
- 每日签到只能进行一次
- 运势积分每次查看都有机会获得
- 积分系统已简化,不再保留历史记录
"""

534
games/sanguosha.py Normal file
View File

@@ -0,0 +1,534 @@
"""三国杀游戏主控制器"""
import logging
from typing import Optional, Tuple
from games.base import BaseGame
from games.sgs_game import GameState, get_game_manager
from games.sgs_core import Phase, Role
from utils.parser import CommandParser
from core.database import get_db
logger = logging.getLogger(__name__)
class SanguoshaGame(BaseGame):
"""三国杀游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.manager = get_game_manager()
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理游戏指令
Args:
command: 指令
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取指令和参数
cmd, args = CommandParser.extract_command_args(command)
args = args.strip().lower()
# 获取用户信息
user = self.db.get_or_create_user(user_id)
username = user.get('username') or f'用户{user_id}'
# 路由指令
if not args or args == 'help':
return self.get_help()
elif args == 'create' or args == '创建':
return await self._handle_create(chat_id, user_id, username)
elif args == 'join' or args == '加入':
return await self._handle_join(chat_id, user_id, username)
elif args == 'leave' or args == '离开':
return await self._handle_leave(chat_id, user_id)
elif args == 'start' or args == '开始':
return await self._handle_start(chat_id, user_id)
elif args == 'status' or args == '状态':
return await self._handle_status(chat_id, user_id)
elif args == 'players' or args == '玩家':
return await self._handle_players(chat_id)
elif args == 'hand' or args == '手牌':
return await self._handle_hand(chat_id, user_id)
elif args == 'next' or args == '下一阶段':
return await self._handle_next_phase(chat_id, user_id)
elif args.startswith('play ') or args.startswith('出牌 '):
card_name = args.split(maxsplit=1)[1]
return await self._handle_play_card(chat_id, user_id, card_name)
elif args.startswith('use ') or args.startswith('使用 '):
card_name = args.split(maxsplit=1)[1]
return await self._handle_use_card(chat_id, user_id, card_name)
elif args == 'draw' or args == '摸牌':
return await self._handle_draw(chat_id, user_id)
elif args == 'discard' or args == '弃牌':
return await self._handle_discard(chat_id, user_id)
elif args == 'cancel' or args == '取消':
return await self._handle_cancel(chat_id, user_id)
elif args == 'stats' or args == '战绩':
return await self._handle_stats(user_id)
else:
return f"❌ 未知指令: {args}\n\n使用 `.sgs help` 查看帮助"
except Exception as e:
logger.error(f"处理三国杀指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
async def _handle_create(self, chat_id: int, user_id: int, username: str) -> str:
"""创建游戏"""
# 检查是否已有游戏
if self.manager.has_active_game(chat_id):
return "❌ 当前已有进行中的游戏"
# 创建游戏
game = self.manager.create_game(chat_id, user_id, username)
return f"""✅ 三国杀游戏已创建!
**房主**: {username}
**游戏ID**: {game.game_id}
📝 其他玩家使用 `.sgs join` 加入游戏
📝 人数达到 {game.min_players}-{game.max_players} 人后,房主使用 `.sgs start` 开始游戏
当前玩家: 1/{game.max_players}"""
async def _handle_join(self, chat_id: int, user_id: int, username: str) -> str:
"""加入游戏"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏,使用 `.sgs create` 创建游戏"
if game.is_started:
return "❌ 游戏已经开始,无法加入"
if game.get_player_by_id(user_id):
return "❌ 你已经在游戏中了"
if not game.add_player(user_id, username):
return f"❌ 加入失败,游戏已满({game.max_players}人)"
return f"""{username} 加入游戏!
当前玩家: {len(game.players)}/{game.max_players}
玩家列表: {', '.join(p.username for p in game.players)}
{f'📝 人数已满,房主可以使用 `.sgs start` 开始游戏' if len(game.players) >= game.min_players else f'📝 还需要 {game.min_players - len(game.players)} 人才能开始'}"""
async def _handle_leave(self, chat_id: int, user_id: int) -> str:
"""离开游戏"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if game.is_started:
return "❌ 游戏已经开始,无法离开"
player = game.get_player_by_id(user_id)
if not player:
return "❌ 你不在游戏中"
if not game.remove_player(user_id):
return "❌ 离开失败"
# 如果房主离开且还有其他玩家,转移房主
if user_id == game.host_id and game.players:
game.host_id = game.players[0].user_id
return f"{player.username} 离开游戏,房主已转移给 {game.players[0].username}"
# 如果没有玩家了,删除游戏
if not game.players:
self.manager.remove_game(chat_id)
return f"{player.username} 离开游戏,游戏已解散"
return f"{player.username} 离开游戏\n\n当前玩家: {len(game.players)}/{game.max_players}"
async def _handle_start(self, chat_id: int, user_id: int) -> str:
"""开始游戏"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if user_id != game.host_id:
return "❌ 只有房主可以开始游戏"
success, message = game.start_game()
if not success:
return f"{message}"
# 构建游戏开始消息
result = "## 🎮 三国杀游戏开始!\n\n"
# 显示主公信息
lord = game.lord_player
if lord:
result += f"### 👑 主公\n"
result += f"**{lord.username}** - {lord.general.name}{lord.general.kingdom}\n"
result += f"体力: {lord.hp}/{lord.general.max_hp}\n"
result += f"技能: {', '.join(s.name for s in lord.general.skills)}\n\n"
# 显示其他玩家信息(不显示身份)
result += f"### 👥 玩家列表\n"
for idx, player in enumerate(game.players, 1):
if player.role != Role.LORD:
result += f"{idx}. **{player.username}** - {player.general.name}{player.general.kingdom}\n"
result += f" 体力: {player.hp}/{player.general.max_hp}\n"
result += f"\n### 📋 游戏信息\n"
result += f"- 玩家数: {len(game.players)}\n"
result += f"- 当前回合: {game.current_player.username}\n"
result += f"- 当前阶段: {game.current_phase.value}\n\n"
result += "💡 使用 `.sgs status` 查看游戏状态\n"
result += "💡 使用 `.sgs hand` 查看手牌\n"
result += "💡 使用 `.sgs next` 进入下一阶段"
return result
async def _handle_status(self, chat_id: int, user_id: int) -> str:
"""查看游戏状态"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if not game.is_started:
return f"""## 📋 游戏状态 (准备中)
**房主**: {game.players[0].username if game.players else ''}
**玩家数**: {len(game.players)}/{game.max_players}
**玩家列表**: {', '.join(p.username for p in game.players)}
📝 使用 `.sgs start` 开始游戏"""
# 游戏进行中
result = f"## 📋 游戏状态\n\n"
result += f"### 🕒 当前回合\n"
result += f"- 玩家: **{game.current_player.username}**\n"
result += f"- 阶段: **{game.current_phase.value}**\n"
result += f"- 回合数: {game.round_number}\n\n"
result += f"### 👥 玩家状态\n"
for idx, player in enumerate(game.players, 1):
status = "💀" if not player.is_alive else ""
role_display = player.role.value if player.role == Role.LORD or not player.is_alive else "???"
result += f"{idx}. {status} **{player.username}** ({role_display})\n"
result += f" 武将: {player.general.name}{player.general.kingdom}\n"
result += f" 体力: {'❤️' * player.hp}{'🖤' * (player.general.max_hp - player.hp)} {player.hp}/{player.general.max_hp}\n"
result += f" 手牌: {player.hand_count}\n"
if player.equipment:
equip_list = ', '.join(f"{k}:{v.name}" for k, v in player.equipment.items())
result += f" 装备: {equip_list}\n"
result += "\n"
return result
async def _handle_players(self, chat_id: int) -> str:
"""查看玩家列表"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
result = f"## 👥 玩家列表\n\n"
for idx, player in enumerate(game.players, 1):
result += f"{idx}. **{player.username}**"
if game.is_started:
result += f" - {player.general.name}"
if player.role == Role.LORD or not player.is_alive:
result += f"{player.role.value}"
result += "\n"
return result
async def _handle_hand(self, chat_id: int, user_id: int) -> str:
"""查看手牌"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if not game.is_started:
return "❌ 游戏还未开始"
player = game.get_player_by_id(user_id)
if not player:
return "❌ 你不在游戏中"
if not player.is_alive:
return "❌ 你已经阵亡"
if not player.hand_cards:
return "📋 你的手牌为空"
result = f"## 🃏 你的手牌({len(player.hand_cards)}张)\n\n"
# 按类型分组
basic_cards = [c for c in player.hand_cards if c.card_type.value == "基本牌"]
trick_cards = [c for c in player.hand_cards if c.card_type.value == "锦囊牌"]
equip_cards = [c for c in player.hand_cards if c.card_type.value == "装备牌"]
if basic_cards:
result += "### 基本牌\n"
for idx, card in enumerate(basic_cards, 1):
result += f"{idx}. {card}\n"
result += "\n"
if trick_cards:
result += "### 锦囊牌\n"
for idx, card in enumerate(trick_cards, 1):
result += f"{idx}. {card}\n"
result += "\n"
if equip_cards:
result += "### 装备牌\n"
for idx, card in enumerate(equip_cards, 1):
result += f"{idx}. {card}\n"
result += "\n💡 使用 `.sgs play 卡牌名` 出牌"
return result
async def _handle_next_phase(self, chat_id: int, user_id: int) -> str:
"""进入下一阶段"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if not game.is_started:
return "❌ 游戏还未开始"
if game.current_player.user_id != user_id:
return "❌ 不是你的回合"
# 执行阶段转换
new_phase, current_player = game.next_phase()
# 检查游戏是否结束
is_over, winner_role = game.check_game_over()
if is_over:
game.is_finished = True
game.winner_role = winner_role
return await self._handle_game_over(game)
result = f"✅ 进入下一阶段\n\n"
result += f"**当前玩家**: {current_player.username}\n"
result += f"**当前阶段**: {new_phase.value}\n\n"
# 阶段提示
if new_phase == Phase.DRAW:
result += "💡 摸牌阶段,使用 `.sgs draw` 摸牌"
elif new_phase == Phase.PLAY:
result += "💡 出牌阶段,使用 `.sgs play 卡牌名` 出牌"
elif new_phase == Phase.DISCARD:
result += "💡 弃牌阶段,使用 `.sgs discard` 弃牌"
else:
result += "请使用正确的语法"
return result
async def _handle_draw(self, chat_id: int, user_id: int) -> str:
"""摸牌"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if game.current_player.user_id != user_id:
return "❌ 不是你的回合"
if game.current_phase != Phase.DRAW:
return f"❌ 当前不是摸牌阶段(当前: {game.current_phase.value}"
# 摸2张牌
cards = game.deck.draw(2)
game.current_player.hand_cards.extend(cards)
result = f"✅ 摸牌成功!\n\n"
result += f"摸到: {', '.join(str(c) for c in cards)}\n"
result += f"当前手牌数: {game.current_player.hand_count}\n\n"
result += "💡 使用 `.sgs next` 进入出牌阶段"
return result
async def _handle_play_card(self, chat_id: int, user_id: int, card_name: str) -> str:
"""出牌"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if game.current_player.user_id != user_id:
return "❌ 不是你的回合"
if game.current_phase != Phase.PLAY:
return f"❌ 当前不是出牌阶段(当前: {game.current_phase.value}"
player = game.current_player
# 查找卡牌
card = None
for c in player.hand_cards:
if c.name == card_name:
card = c
break
if not card:
return f"❌ 你没有【{card_name}】这张牌"
# 简化处理:直接打出(实际游戏需要选择目标等)
player.remove_card(card)
game.deck.discard([card])
return f"✅ 使用了【{card}\n\n💡 继续出牌或使用 `.sgs next` 进入弃牌阶段"
async def _handle_use_card(self, chat_id: int, user_id: int, card_name: str) -> str:
"""使用卡牌同play_card"""
return await self._handle_play_card(chat_id, user_id, card_name)
async def _handle_discard(self, chat_id: int, user_id: int) -> str:
"""弃牌"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if game.current_player.user_id != user_id:
return "❌ 不是你的回合"
if game.current_phase != Phase.DISCARD:
return f"❌ 当前不是弃牌阶段(当前: {game.current_phase.value}"
player = game.current_player
# 检查是否需要弃牌
max_hand = player.hp
if player.hand_count <= max_hand:
return f"✅ 手牌数({player.hand_count})未超过体力值({max_hand}),无需弃牌\n\n💡 使用 `.sgs next` 结束回合"
# 简化处理:自动弃掉多余的牌
discard_count = player.hand_count - max_hand
discarded = player.hand_cards[:discard_count]
player.hand_cards = player.hand_cards[discard_count:]
game.deck.discard(discarded)
return f"✅ 弃置了 {discard_count} 张牌\n\n💡 使用 `.sgs next` 结束回合"
async def _handle_cancel(self, chat_id: int, user_id: int) -> str:
"""取消游戏"""
game = self.manager.get_game(chat_id)
if not game:
return "❌ 当前没有游戏"
if user_id != game.host_id:
return "❌ 只有房主可以取消游戏"
self.manager.remove_game(chat_id)
return "✅ 游戏已取消"
async def _handle_stats(self, user_id: int) -> str:
"""查看战绩"""
stats = self.db.get_game_stats(user_id, 'sanguosha')
if stats['total_plays'] == 0:
return "三国杀游戏记录"
win_rate = (stats['wins'] / stats['total_plays'] * 100) if stats['total_plays'] > 0 else 0
result ="战绩\n\n"
result += f"- 总局数: {stats['total_plays']}\n"
result += f"- 胜利: {stats['wins']}\n"
result += f"- 失败: {stats['losses']}\n"
result += f"- 胜率: {win_rate:.1f}%\n"
return result
async def _handle_game_over(self, game: GameState) -> str:
"""处理游戏结束"""
result = "## 🎉 游戏结束!\n\n"
if game.winner_role == Role.LORD:
result += "## 🎉公和忠臣获胜!\n\n"
winners = [p for p in game.players if p.role in [Role.LORD, Role.LOYAL]]
losers = [p for p in game.players if p.role in [Role.REBEL, Role.SPY]]
else:
result += "### ⚔️ 反贼获胜!\n\n"
winners = [p for p in game.players if p.role == Role.REBEL]
losers = [p for p in game.players if p.role in [Role.LORD, Role.LOYAL, Role.SPY]]
result += "**获胜方**:\n"
for player in winners:
result += f"- {player.username} ({player.role.value}) - {player.general.name}\n"
# 更新战绩
self.db.update_game_stats(player.user_id, 'sanguosha', win=True)
result += "\n**失败方**:\n"
for player in losers:
result += f"- {player.username} ({player.role.value}) - {player.general.name}\n"
# 更新战绩
self.db.update_game_stats(player.user_id, 'sanguosha', loss=True)
result += f"\n游戏时长: {game.round_number} 回合"
# 清理游戏
self.manager.remove_game(game.chat_id)
return result
def get_help(self) -> str:
"""获取帮助信息"""
return """## 🎮 三国杀游戏帮助
### 游戏准备
- `.sgs create` - 创建游戏房间
- `.sgs join` - 加入游戏
- `.sgs leave` - 离开游戏
- `.sgs start` - 开始游戏(房主)
- `.sgs cancel` - 取消游戏(房主)
### 游戏中
- `.sgs status` - 查看游戏状态
- `.sgs players` - 查看玩家列表
- `.sgs hand` - 查看手牌
- `.sgs next` - 进入下一阶段
- `.sgs draw` - 摸牌(摸牌阶段)
- `.sgs play 卡牌名` - 出牌(出牌阶段)
- `.sgs discard` - 弃牌(弃牌阶段)
### 其他
- `.sgs stats` - 查看个人战绩
- `.sgs help` - 显示帮助
### 游戏规则
1. **身份**: 主公、忠臣、反贼、内奸
2. **胜利条件**:
- 主公+忠臣: 消灭所有反贼和内奸
- 反贼: 击杀主公
- 内奸: 成为最后存活的人
3. **回合流程**:
- 准备阶段 → 判定阶段 → 摸牌阶段 → 出牌阶段 → 弃牌阶段 → 结束阶段
### 卡牌类型
- **基本牌**: 杀、闪、桃
- **锦囊牌**: 决斗、过河拆桥、顺手牵羊、南蛮入侵、万箭齐发等
- **装备牌**: 武器、防具、马
---
💡 提示:游戏支持 2-8 人,建议 5-8 人游戏体验最佳
"""

529
games/sgs_core.py Normal file
View File

@@ -0,0 +1,529 @@
"""三国杀游戏核心逻辑模块"""
import logging
from typing import List, Dict, Optional, Set
from enum import Enum
from dataclasses import dataclass, field
import random
logger = logging.getLogger(__name__)
class CardType(Enum):
"""卡牌类型"""
BASIC = "基本牌"
TRICK = "锦囊牌"
EQUIPMENT = "装备牌"
class CardSuit(Enum):
"""卡牌花色"""
SPADE = "" # 黑桃
HEART = "" # 红桃
CLUB = "" # 梅花
DIAMOND = "" # 方块
class CardColor(Enum):
"""卡牌颜色"""
RED = "红色"
BLACK = "黑色"
class Role(Enum):
"""角色身份"""
LORD = "主公"
LOYAL = "忠臣"
REBEL = "反贼"
SPY = "内奸"
class Phase(Enum):
"""回合阶段"""
PREPARE = "准备阶段"
JUDGE = "判定阶段"
DRAW = "摸牌阶段"
PLAY = "出牌阶段"
DISCARD = "弃牌阶段"
END = "结束阶段"
@dataclass
class Card:
"""卡牌"""
name: str # 卡牌名称
card_type: CardType # 卡牌类型
suit: CardSuit # 花色
number: int # 点数 (1-13)
description: str = "" # 描述
@property
def color(self) -> CardColor:
"""获取卡牌颜色"""
if self.suit in [CardSuit.HEART, CardSuit.DIAMOND]:
return CardColor.RED
return CardColor.BLACK
def __str__(self) -> str:
"""字符串表示"""
return f"{self.suit.value}{self.number} {self.name}"
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"name": self.name,
"type": self.card_type.value,
"suit": self.suit.value,
"number": self.number,
"color": self.color.value,
"description": self.description
}
@dataclass
class Skill:
"""技能"""
name: str # 技能名称
description: str # 技能描述
skill_type: str = "主动" # 技能类型: 主动/锁定/限定/觉醒
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"name": self.name,
"description": self.description,
"type": self.skill_type
}
@dataclass
class General:
"""武将"""
name: str # 武将名称
max_hp: int # 体力上限
skills: List[Skill] # 技能列表
kingdom: str = "" # 势力
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"name": self.name,
"max_hp": self.max_hp,
"kingdom": self.kingdom,
"skills": [skill.to_dict() for skill in self.skills]
}
@dataclass
class Player:
"""玩家"""
user_id: int # 用户ID
username: str # 用户名
general: Optional[General] = None # 武将
role: Optional[Role] = None # 身份
hp: int = 0 # 当前体力
hand_cards: List[Card] = field(default_factory=list) # 手牌
equipment: Dict[str, Card] = field(default_factory=dict) # 装备区
judge_area: List[Card] = field(default_factory=list) # 判定区
is_alive: bool = True # 是否存活
is_chained: bool = False # 是否横置
def __post_init__(self):
"""初始化后处理"""
if self.general and self.hp == 0:
self.hp = self.general.max_hp
@property
def hand_count(self) -> int:
"""手牌数量"""
return len(self.hand_cards)
@property
def attack_range(self) -> int:
"""攻击距离"""
weapon = self.equipment.get("武器")
if weapon:
# 根据武器名称返回距离
weapon_ranges = {
"诸葛连弩": 1,
"青釭剑": 2,
"雌雄双股剑": 2,
"青龙偃月刀": 3,
"丈八蛇矛": 3,
"方天画戟": 4,
"麒麟弓": 5
}
return weapon_ranges.get(weapon.name, 1)
return 1
def add_card(self, card: Card):
"""添加手牌"""
self.hand_cards.append(card)
def remove_card(self, card: Card) -> bool:
"""移除手牌"""
if card in self.hand_cards:
self.hand_cards.remove(card)
return True
return False
def take_damage(self, damage: int) -> bool:
"""受到伤害
Returns:
是否死亡
"""
self.hp -= damage
if self.hp <= 0:
self.is_alive = False
return True
return False
def recover(self, amount: int):
"""回复体力"""
if self.general:
self.hp = min(self.hp + amount, self.general.max_hp)
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"user_id": self.user_id,
"username": self.username,
"general": self.general.to_dict() if self.general else None,
"role": self.role.value if self.role else None,
"hp": self.hp,
"max_hp": self.general.max_hp if self.general else 0,
"hand_count": self.hand_count,
"equipment": {k: v.to_dict() for k, v in self.equipment.items()},
"is_alive": self.is_alive,
"is_chained": self.is_chained
}
class CardDeck:
"""牌堆"""
def __init__(self):
"""初始化牌堆"""
self.cards: List[Card] = []
self.discard_pile: List[Card] = []
self._init_standard_deck()
def _init_standard_deck(self):
"""初始化标准牌堆(简化版)"""
# 杀 (30张)
for suit, numbers in [
(CardSuit.SPADE, [7, 8, 8, 9, 9, 10, 10]),
(CardSuit.CLUB, [2, 3, 4, 5, 6, 7, 8, 8, 9, 9, 10, 10, 11]),
(CardSuit.HEART, [10, 10, 11]),
(CardSuit.DIAMOND, [6, 7, 8, 9, 10, 13])
]:
for num in numbers:
self.cards.append(Card("", CardType.BASIC, suit, num, "对攻击范围内的一名角色造成1点伤害"))
# 闪 (15张)
for suit, numbers in [
(CardSuit.HEART, [2, 2, 13]),
(CardSuit.DIAMOND, [2, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 11])
]:
for num in numbers:
self.cards.append(Card("", CardType.BASIC, suit, num, "抵消一张【杀】的效果"))
# 桃 (8张)
for suit, numbers in [
(CardSuit.HEART, [3, 4, 6, 7, 8, 9, 12]),
(CardSuit.DIAMOND, [12])
]:
for num in numbers:
self.cards.append(Card("", CardType.BASIC, suit, num, "回复1点体力"))
# 锦囊牌
# 无懈可击 (3张)
for suit, num in [(CardSuit.SPADE, 11), (CardSuit.CLUB, 12), (CardSuit.CLUB, 13)]:
self.cards.append(Card("无懈可击", CardType.TRICK, suit, num, "抵消一张锦囊牌的效果"))
# 决斗 (3张)
for suit, num in [(CardSuit.SPADE, 1), (CardSuit.CLUB, 1), (CardSuit.DIAMOND, 1)]:
self.cards.append(Card("决斗", CardType.TRICK, suit, num, "与目标角色拼点失败者受到1点伤害"))
# 过河拆桥 (6张)
for suit, numbers in [
(CardSuit.SPADE, [3, 4, 12]),
(CardSuit.CLUB, [3, 4]),
(CardSuit.HEART, [12])
]:
for num in numbers:
self.cards.append(Card("过河拆桥", CardType.TRICK, suit, num, "弃置目标角色的一张牌"))
# 顺手牵羊 (5张)
for suit, numbers in [
(CardSuit.SPADE, [3, 4, 11]),
(CardSuit.DIAMOND, [3, 4])
]:
for num in numbers:
self.cards.append(Card("顺手牵羊", CardType.TRICK, suit, num, "获得目标角色的一张牌"))
# 南蛮入侵 (3张)
for suit, num in [(CardSuit.SPADE, 7), (CardSuit.SPADE, 13), (CardSuit.CLUB, 7)]:
self.cards.append(Card("南蛮入侵", CardType.TRICK, suit, num, "所有其他角色需打出【杀】否则受到1点伤害"))
# 万箭齐发 (1张)
self.cards.append(Card("万箭齐发", CardType.TRICK, CardSuit.HEART, 1, "所有其他角色需打出【闪】否则受到1点伤害"))
# 桃园结义 (1张)
self.cards.append(Card("桃园结义", CardType.TRICK, CardSuit.HEART, 1, "所有角色回复1点体力"))
# 五谷丰登 (2张)
for num in [3, 4]:
self.cards.append(Card("五谷丰登", CardType.TRICK, CardSuit.HEART, num, "所有角色依次获得一张牌"))
# 装备牌(简化版,只添加几种)
# 诸葛连弩
self.cards.append(Card("诸葛连弩", CardType.EQUIPMENT, CardSuit.CLUB, 1, "武器攻击范围1出牌阶段可以使用任意张【杀】"))
self.cards.append(Card("诸葛连弩", CardType.EQUIPMENT, CardSuit.DIAMOND, 1, "武器攻击范围1出牌阶段可以使用任意张【杀】"))
# 青釭剑
self.cards.append(Card("青釭剑", CardType.EQUIPMENT, CardSuit.SPADE, 6, "武器攻击范围2无视目标防具"))
# 八卦阵
self.cards.append(Card("八卦阵", CardType.EQUIPMENT, CardSuit.SPADE, 2, "防具,判定为红色时视为使用了【闪】"))
self.cards.append(Card("八卦阵", CardType.EQUIPMENT, CardSuit.CLUB, 2, "防具,判定为红色时视为使用了【闪】"))
# 的卢
self.cards.append(Card("的卢", CardType.EQUIPMENT, CardSuit.CLUB, 5, "+1马其他角色计算与你的距离+1"))
# 赤兔
self.cards.append(Card("赤兔", CardType.EQUIPMENT, CardSuit.HEART, 5, "-1马你计算与其他角色的距离-1"))
# 洗牌
self.shuffle()
def shuffle(self):
"""洗牌"""
random.shuffle(self.cards)
logger.info(f"牌堆已洗牌,共 {len(self.cards)} 张牌")
def draw(self, count: int = 1) -> List[Card]:
"""摸牌
Args:
count: 摸牌数量
Returns:
摸到的牌列表
"""
if len(self.cards) < count:
# 牌不够,将弃牌堆洗入牌堆
self.cards.extend(self.discard_pile)
self.discard_pile.clear()
self.shuffle()
drawn = self.cards[:count]
self.cards = self.cards[count:]
return drawn
def discard(self, cards: List[Card]):
"""弃牌"""
self.discard_pile.extend(cards)
class GeneralPool:
"""武将池"""
def __init__(self):
"""初始化武将池"""
self.generals: List[General] = []
self._init_standard_generals()
def _init_standard_generals(self):
"""初始化标准武将(简化版)"""
# 刘备
self.generals.append(General(
name="刘备",
max_hp=4,
kingdom="",
skills=[
Skill("仁德", "出牌阶段你可以将任意张手牌交给其他角色若你给出的牌达到两张或更多你回复1点体力", "主动"),
Skill("激将", "主公技,当你需要使用或打出【杀】时,你可以令其他蜀势力角色打出一张【杀】(视为由你使用或打出)", "主动")
]
))
# 关羽
self.generals.append(General(
name="关羽",
max_hp=4,
kingdom="",
skills=[
Skill("武圣", "你可以将一张红色牌当【杀】使用或打出", "主动")
]
))
# 张飞
self.generals.append(General(
name="张飞",
max_hp=4,
kingdom="",
skills=[
Skill("咆哮", "锁定技,出牌阶段,你使用【杀】无次数限制", "锁定")
]
))
# 诸葛亮
self.generals.append(General(
name="诸葛亮",
max_hp=3,
kingdom="",
skills=[
Skill("观星", "准备阶段你可以观看牌堆顶的X张牌X为存活角色数且至多为5将任意数量的牌置于牌堆顶其余的牌置于牌堆底", "主动"),
Skill("空城", "锁定技,当你没有手牌时,你不能成为【杀】或【决斗】的目标", "锁定")
]
))
# 赵云
self.generals.append(General(
name="赵云",
max_hp=4,
kingdom="",
skills=[
Skill("龙胆", "你可以将【杀】当【闪】、【闪】当【杀】使用或打出", "主动")
]
))
# 曹操
self.generals.append(General(
name="曹操",
max_hp=4,
kingdom="",
skills=[
Skill("奸雄", "当你受到伤害后,你可以获得对你造成伤害的牌", "主动"),
Skill("护驾", "主公技,当你需要使用或打出【闪】时,你可以令其他魏势力角色打出一张【闪】(视为由你使用或打出)", "主动")
]
))
# 司马懿
self.generals.append(General(
name="司马懿",
max_hp=3,
kingdom="",
skills=[
Skill("反馈", "当你受到1点伤害后你可以获得伤害来源的一张牌", "主动"),
Skill("鬼才", "在任意角色的判定牌生效前,你可以打出一张手牌代替之", "主动")
]
))
# 夏侯惇
self.generals.append(General(
name="夏侯惇",
max_hp=4,
kingdom="",
skills=[
Skill("刚烈", "当你受到伤害后你可以进行判定若结果不为♥则伤害来源选择一项弃置两张手牌或受到你造成的1点伤害", "主动")
]
))
# 甄姬
self.generals.append(General(
name="甄姬",
max_hp=3,
kingdom="",
skills=[
Skill("洛神", "准备阶段,你可以进行判定:当黑色判定牌生效后,你获得之。若结果为黑色,你可以重复此流程", "主动"),
Skill("倾国", "你可以将一张黑色手牌当【闪】使用或打出", "主动")
]
))
# 孙权
self.generals.append(General(
name="孙权",
max_hp=4,
kingdom="",
skills=[
Skill("制衡", "出牌阶段限一次,你可以弃置任意张牌,然后摸等量的牌", "主动"),
Skill("救援", "主公技,锁定技,其他吴势力角色对你使用【桃】时,该角色摸一张牌", "锁定")
]
))
# 周瑜
self.generals.append(General(
name="周瑜",
max_hp=3,
kingdom="",
skills=[
Skill("英姿", "摸牌阶段,你可以额外摸一张牌", "主动"),
Skill("反间", "出牌阶段限一次你可以令一名其他角色选择一种花色然后该角色获得你的一张手牌并展示之若此牌与所选花色不同你对其造成1点伤害", "主动")
]
))
# 吕蒙
self.generals.append(General(
name="吕蒙",
max_hp=4,
kingdom="",
skills=[
Skill("克己", "若你于出牌阶段内没有使用或打出过【杀】,你可以跳过此回合的弃牌阶段", "主动")
]
))
# 黄盖
self.generals.append(General(
name="黄盖",
max_hp=4,
kingdom="",
skills=[
Skill("苦肉", "出牌阶段你可以失去1点体力然后摸两张牌", "主动")
]
))
# 吕布
self.generals.append(General(
name="吕布",
max_hp=4,
kingdom="",
skills=[
Skill("无双", "锁定技,当你使用【杀】指定一个目标后,该角色需依次使用两张【闪】才能抵消此【杀】;当你使用【决斗】指定一个目标后,该角色每次响应此【决斗】需依次打出两张【杀】", "锁定")
]
))
# 貂蝉
self.generals.append(General(
name="貂蝉",
max_hp=3,
kingdom="",
skills=[
Skill("离间", "出牌阶段限一次,你可以弃置一张牌并选择两名男性角色,后选择的角色视为对先选择的角色使用一张【决斗】(此【决斗】不能被【无懈可击】响应)", "主动"),
Skill("闭月", "结束阶段,你可以摸一张牌", "主动")
]
))
# 华佗
self.generals.append(General(
name="华佗",
max_hp=3,
kingdom="",
skills=[
Skill("急救", "你的回合外,你可以将一张红色牌当【桃】使用", "主动"),
Skill("青囊", "出牌阶段限一次你可以弃置一张手牌并令一名角色回复1点体力", "主动")
]
))
def get_random_generals(self, count: int, exclude: List[str] = None) -> List[General]:
"""随机获取武将
Args:
count: 数量
exclude: 排除的武将名称列表
Returns:
武将列表
"""
available = [g for g in self.generals if not exclude or g.name not in exclude]
if len(available) < count:
return available
return random.sample(available, count)
def get_general_by_name(self, name: str) -> Optional[General]:
"""根据名称获取武将"""
for general in self.generals:
if general.name == name:
return general
return None

337
games/sgs_game.py Normal file
View File

@@ -0,0 +1,337 @@
"""三国杀游戏状态管理"""
import logging
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, field
import random
import time
from games.sgs_core import (
Player, Card, CardDeck, GeneralPool, Role, Phase,
CardType, CardSuit, General
)
logger = logging.getLogger(__name__)
@dataclass
class GameState:
"""游戏状态"""
game_id: str # 游戏ID
chat_id: int # 会话ID
host_id: int # 房主ID
players: List[Player] = field(default_factory=list) # 玩家列表
deck: CardDeck = field(default_factory=CardDeck) # 牌堆
current_player_idx: int = 0 # 当前玩家索引
current_phase: Phase = Phase.PREPARE # 当前阶段
round_number: int = 1 # 回合数
is_started: bool = False # 是否已开始
is_finished: bool = False # 是否已结束
winner_role: Optional[Role] = None # 获胜身份
created_at: int = field(default_factory=lambda: int(time.time()))
# 游戏配置
max_players: int = 8 # 最大玩家数
min_players: int = 2 # 最小玩家数简化版标准是5人
# 临时状态
pending_action: Optional[Dict] = None # 等待的操作
action_queue: List[Dict] = field(default_factory=list) # 操作队列
@property
def current_player(self) -> Optional[Player]:
"""获取当前玩家"""
if 0 <= self.current_player_idx < len(self.players):
return self.players[self.current_player_idx]
return None
@property
def alive_players(self) -> List[Player]:
"""获取存活玩家"""
return [p for p in self.players if p.is_alive]
@property
def lord_player(self) -> Optional[Player]:
"""获取主公"""
for player in self.players:
if player.role == Role.LORD:
return player
return None
def get_player_by_id(self, user_id: int) -> Optional[Player]:
"""根据用户ID获取玩家"""
for player in self.players:
if player.user_id == user_id:
return player
return None
def get_next_alive_player(self, from_idx: int = None) -> Optional[Player]:
"""获取下一个存活玩家"""
if from_idx is None:
from_idx = self.current_player_idx
idx = (from_idx + 1) % len(self.players)
checked = 0
while checked < len(self.players):
if self.players[idx].is_alive:
return self.players[idx]
idx = (idx + 1) % len(self.players)
checked += 1
return None
def add_player(self, user_id: int, username: str) -> bool:
"""添加玩家
Returns:
是否成功
"""
if self.is_started:
return False
if len(self.players) >= self.max_players:
return False
if self.get_player_by_id(user_id):
return False
player = Player(user_id=user_id, username=username)
self.players.append(player)
logger.info(f"玩家 {username} 加入游戏")
return True
def remove_player(self, user_id: int) -> bool:
"""移除玩家(仅限游戏未开始时)"""
if self.is_started:
return False
player = self.get_player_by_id(user_id)
if player:
self.players.remove(player)
logger.info(f"玩家 {player.username} 离开游戏")
return True
return False
def start_game(self) -> Tuple[bool, str]:
"""开始游戏
Returns:
(是否成功, 消息)
"""
if self.is_started:
return False, "游戏已经开始"
if len(self.players) < self.min_players:
return False, f"人数不足,至少需要 {self.min_players}"
if len(self.players) > self.max_players:
return False, f"人数过多,最多 {self.max_players}"
# 分配身份
self._assign_roles()
# 选择武将
self._assign_generals()
# 初始化体力
for player in self.players:
if player.general:
player.hp = player.general.max_hp
# 主公额外+1体力上限
if player.role == Role.LORD:
player.hp += 1
if player.general:
player.general.max_hp += 1
# 发初始手牌
self._deal_initial_cards()
# 确定起始玩家(主公先手)
for idx, player in enumerate(self.players):
if player.role == Role.LORD:
self.current_player_idx = idx
break
self.is_started = True
self.current_phase = Phase.PREPARE
logger.info(f"游戏开始,共 {len(self.players)} 名玩家")
return True, "游戏开始!"
def _assign_roles(self):
"""分配身份"""
player_count = len(self.players)
# 根据人数分配身份
role_distribution = {
2: [Role.LORD, Role.REBEL],
3: [Role.LORD, Role.REBEL, Role.SPY],
4: [Role.LORD, Role.LOYAL, Role.REBEL, Role.SPY],
5: [Role.LORD, Role.LOYAL, Role.REBEL, Role.REBEL, Role.SPY],
6: [Role.LORD, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY],
7: [Role.LORD, Role.LOYAL, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY],
8: [Role.LORD, Role.LOYAL, Role.LOYAL, Role.REBEL, Role.REBEL, Role.REBEL, Role.REBEL, Role.SPY]
}
roles = role_distribution.get(player_count, [Role.LORD] + [Role.REBEL] * (player_count - 1))
random.shuffle(roles)
for player, role in zip(self.players, roles):
player.role = role
logger.info(f"玩家 {player.username} 的身份是 {role.value}")
def _assign_generals(self):
"""分配武将"""
general_pool = GeneralPool()
# 主公先选
lord = self.lord_player
if lord:
# 主公从3个武将中选择这里简化为随机
lord_options = general_pool.get_random_generals(3)
lord.general = lord_options[0] # 简化:直接选第一个
logger.info(f"主公 {lord.username} 选择了武将 {lord.general.name}")
# 其他玩家随机分配
used_generals = [lord.general.name] if lord and lord.general else []
for player in self.players:
if player.role != Role.LORD:
generals = general_pool.get_random_generals(1, exclude=used_generals)
if generals:
player.general = generals[0]
used_generals.append(player.general.name)
logger.info(f"玩家 {player.username} 获得武将 {player.general.name}")
def _deal_initial_cards(self):
"""发初始手牌"""
for player in self.players:
# 每人发4张手牌
cards = self.deck.draw(4)
player.hand_cards.extend(cards)
logger.info(f"玩家 {player.username} 获得 {len(cards)} 张初始手牌")
def next_phase(self) -> Tuple[Phase, Optional[Player]]:
"""进入下一阶段
Returns:
(新阶段, 当前玩家)
"""
current = self.current_player
if not current:
return self.current_phase, None
# 阶段流转
phase_order = [
Phase.PREPARE,
Phase.JUDGE,
Phase.DRAW,
Phase.PLAY,
Phase.DISCARD,
Phase.END
]
current_idx = phase_order.index(self.current_phase)
if current_idx < len(phase_order) - 1:
# 进入下一阶段
self.current_phase = phase_order[current_idx + 1]
else:
# 回合结束,轮到下一个玩家
next_player = self.get_next_alive_player()
if next_player:
for idx, p in enumerate(self.players):
if p.user_id == next_player.user_id:
self.current_player_idx = idx
break
self.current_phase = Phase.PREPARE
self.round_number += 1
return self.current_phase, self.current_player
def check_game_over(self) -> Tuple[bool, Optional[Role]]:
"""检查游戏是否结束
Returns:
(是否结束, 获胜身份)
"""
lord = self.lord_player
# 主公死亡
if not lord or not lord.is_alive:
# 反贼胜利
return True, Role.REBEL
# 检查是否所有反贼和内奸都死亡
rebels_alive = any(p.is_alive and p.role == Role.REBEL for p in self.players)
spies_alive = any(p.is_alive and p.role == Role.SPY for p in self.players)
if not rebels_alive and not spies_alive:
# 主公和忠臣胜利
return True, Role.LORD
return False, None
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"game_id": self.game_id,
"chat_id": self.chat_id,
"host_id": self.host_id,
"players": [p.to_dict() for p in self.players],
"current_player_idx": self.current_player_idx,
"current_phase": self.current_phase.value,
"round_number": self.round_number,
"is_started": self.is_started,
"is_finished": self.is_finished,
"winner_role": self.winner_role.value if self.winner_role else None,
"created_at": self.created_at
}
class GameManager:
"""游戏管理器"""
def __init__(self):
"""初始化"""
self.games: Dict[int, GameState] = {} # chat_id -> GameState
def create_game(self, chat_id: int, host_id: int, host_name: str) -> GameState:
"""创建游戏"""
game_id = f"sgs_{chat_id}_{int(time.time())}"
game = GameState(
game_id=game_id,
chat_id=chat_id,
host_id=host_id
)
game.add_player(host_id, host_name)
self.games[chat_id] = game
logger.info(f"创建游戏: {game_id}")
return game
def get_game(self, chat_id: int) -> Optional[GameState]:
"""获取游戏"""
return self.games.get(chat_id)
def remove_game(self, chat_id: int):
"""移除游戏"""
if chat_id in self.games:
del self.games[chat_id]
logger.info(f"移除游戏: chat_id={chat_id}")
def has_active_game(self, chat_id: int) -> bool:
"""是否有活跃游戏"""
game = self.get_game(chat_id)
return game is not None and not game.is_finished
# 全局游戏管理器
_game_manager: Optional[GameManager] = None
def get_game_manager() -> GameManager:
"""获取全局游戏管理器"""
global _game_manager
if _game_manager is None:
_game_manager = GameManager()
return _game_manager

View File

@@ -18,5 +18,9 @@ psutil==7.1.2
# 拼音处理
pypinyin==0.51.0
# AI对话框架
llama-index-core>=0.10.0
llama-index-llms-ollama>=0.1.0
# 注意使用Python标准库sqlite3不引入SQLAlchemy

View File

@@ -76,14 +76,20 @@ async def callback_receive(request: Request):
# 发送回复
if response_text:
sender = get_message_sender()
# 根据内容选择消息类型
if response_text.startswith('#'):
# Markdown格式
await sender.send_markdown(response_text)
# AI 对话:统一按 Markdown 发送(按任务决策)
if game_type == 'ai_chat':
try:
await sender.send_markdown(response_text)
except Exception as send_md_err:
logger.error(f"发送Markdown消息失败改用文本发送: {send_md_err}")
await sender.send_text(response_text)
else:
# 普通文本
await sender.send_text(response_text)
# 其他模块保持原有启发式:以 # 开头视为 Markdown否则文本
if response_text.startswith('#'):
await sender.send_markdown(response_text)
else:
await sender.send_text(response_text)
return JSONResponse({"result": "ok"})
@@ -117,6 +123,10 @@ async def handle_command(game_type: str, command: str,
from games.base import get_stats_message
return get_stats_message(user_id)
# 注册系统
if game_type == 'register':
return await handle_register_command(command, chat_id, user_id)
# 骰娘游戏
if game_type == 'dice':
from games.dice import DiceGame
@@ -158,6 +168,51 @@ async def handle_command(game_type: str, command: str,
from games.gomoku import GomokuGame
game = GomokuGame()
return await game.handle(command, chat_id, user_id)
# 三国杀
if game_type == 'sanguosha':
from games.sanguosha import SanguoshaGame
game = SanguoshaGame()
return await game.handle(command, chat_id, user_id)
# 积分系统
if game_type == 'points':
from games.points import PointsGame
game = PointsGame()
return await game.handle(command, chat_id, user_id)
# 炼金系统
if game_type == 'alchemy':
from games.alchemy import AlchemyGame
game = AlchemyGame()
return await game.handle(command, chat_id, user_id)
# 冒险系统
if game_type == 'adventure':
from games.adventure import AdventureGame
game = AdventureGame()
return await game.handle(command, chat_id, user_id)
# 积分赠送系统
if game_type == 'gift':
from games.gift import GiftGame
game = GiftGame()
return await game.handle(command, chat_id, user_id)
# 复述功能
if game_type == 'say':
# 提取参数并原样返回
_, args = CommandParser.extract_command_args(command)
args = args.strip()
if not args:
return "用法:.say 你想让我说的话\n别名:.说 / .复述"
return args
# AI对话系统
if game_type == 'ai_chat':
from games.ai_chat import AIChatGame
game = AIChatGame()
return await game.handle(command, chat_id, user_id)
# 未知游戏类型
logger.warning(f"未知游戏类型: {game_type}")
@@ -167,3 +222,40 @@ async def handle_command(game_type: str, command: str,
logger.error(f"处理游戏指令异常: {e}", exc_info=True)
return f"❌ 处理指令时出错: {str(e)}"
async def handle_register_command(command: str, chat_id: int, user_id: int) -> str:
"""处理注册命令
Args:
command: 完整指令 ".register name"
chat_id: 会话ID
user_id: 用户ID
Returns:
注册结果消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip()
# 验证参数
if not args:
return "❌ 请提供要注册的名称!\n\n正确格式:`.register <名称>`\n\n示例:\n`.register 张三`\n`.register 小明`"
if len(args) > 20:
return "❌ 名称过长最多支持20个字符。"
# 更新用户名称
db = get_db()
success = db.update_user_name(user_id, args)
if success:
return f"✅ 注册成功!\n\n**您的名称**{args}\n\n之后您可以使用这个名称参与各种游戏和功能。"
else:
return "❌ 注册失败!请稍后重试。"
except Exception as e:
logger.error(f"处理注册指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"

150
start.bat Normal file
View File

@@ -0,0 +1,150 @@
@echo off
REM WPS Bot Game Windows启动脚本
setlocal enabledelayedexpansion
REM 默认配置
set DEFAULT_WEBHOOK_URL=
set DEFAULT_HOST=0.0.0.0
set DEFAULT_PORT=11000
set DEFAULT_WORKERS=1
set DEFAULT_LOG_LEVEL=info
REM 显示帮助信息
:show_help
echo WPS Bot Game Windows启动脚本
echo.
echo 用法: %0 [选项]
echo.
echo 选项:
echo -w, --webhook-url URL 设置WPS Webhook URL
echo -H, --host HOST 服务器主机地址 (默认: %DEFAULT_HOST%)
echo -p, --port PORT 服务器端口 (默认: %DEFAULT_PORT%)
echo -W, --workers NUM 工作进程数 (默认: %DEFAULT_WORKERS%)
echo -l, --log-level LEVEL 日志级别 (默认: %DEFAULT_LOG_LEVEL%)
echo -h, --help 显示此帮助信息
echo.
echo 示例:
echo %0 -w "https://xz.wps.cn/api/v1/webhook/send?key=your_key"
echo %0 -w "https://xz.wps.cn/api/v1/webhook/send?key=your_key" -p 8080
echo %0 --webhook-url "https://xz.wps.cn/api/v1/webhook/send?key=your_key" --port 8080 --log-level debug
goto :eof
REM 初始化变量
set WEBHOOK_URL=%DEFAULT_WEBHOOK_URL%
set HOST=%DEFAULT_HOST%
set PORT=%DEFAULT_PORT%
set WORKERS=%DEFAULT_WORKERS%
set LOG_LEVEL=%DEFAULT_LOG_LEVEL%
REM 解析命令行参数
:parse_args
if "%~1"=="" goto :start_app
if "%~1"=="-w" (
set WEBHOOK_URL=%~2
shift
shift
goto :parse_args
)
if "%~1"=="--webhook-url" (
set WEBHOOK_URL=%~2
shift
shift
goto :parse_args
)
if "%~1"=="-H" (
set HOST=%~2
shift
shift
goto :parse_args
)
if "%~1"=="--host" (
set HOST=%~2
shift
shift
goto :parse_args
)
if "%~1"=="-p" (
set PORT=%~2
shift
shift
goto :parse_args
)
if "%~1"=="--port" (
set PORT=%~2
shift
shift
goto :parse_args
)
if "%~1"=="-W" (
set WORKERS=%~2
shift
shift
goto :parse_args
)
if "%~1"=="--workers" (
set WORKERS=%~2
shift
shift
goto :parse_args
)
if "%~1"=="-l" (
set LOG_LEVEL=%~2
shift
shift
goto :parse_args
)
if "%~1"=="--log-level" (
set LOG_LEVEL=%~2
shift
shift
goto :parse_args
)
if "%~1"=="-h" (
call :show_help
exit /b 0
)
if "%~1"=="--help" (
call :show_help
exit /b 0
)
echo 未知参数: %~1
call :show_help
exit /b 1
:start_app
REM 检查Python环境
python --version >nul 2>&1
if errorlevel 1 (
echo 错误: 未找到Python
exit /b 1
)
REM 检查依赖
python -c "import fastapi, uvicorn" >nul 2>&1
if errorlevel 1 (
echo 错误: 缺少必要的Python依赖
echo 请运行: pip install -r requirements.txt
exit /b 1
)
REM 构建启动命令
set CMD=python app.py --host %HOST% --port %PORT% --workers %WORKERS% --log-level %LOG_LEVEL%
if not "%WEBHOOK_URL%"=="" (
set CMD=%CMD% --webhook-url "%WEBHOOK_URL%"
)
REM 显示启动信息
echo 启动WPS Bot Game...
echo 主机: %HOST%
echo 端口: %PORT%
echo 工作进程: %WORKERS%
echo 日志级别: %LOG_LEVEL%
if not "%WEBHOOK_URL%"=="" (
echo Webhook URL: %WEBHOOK_URL%
)
echo.
REM 启动应用
%CMD%

104
start.sh Normal file
View File

@@ -0,0 +1,104 @@
#!/bin/bash
# WPS Bot Game 启动脚本
# 默认配置
DEFAULT_WEBHOOK_URL=""
DEFAULT_HOST="0.0.0.0"
DEFAULT_PORT="11000"
DEFAULT_WORKERS="1"
DEFAULT_LOG_LEVEL="info"
# 显示帮助信息
show_help() {
echo "WPS Bot Game 启动脚本"
echo ""
echo "用法: $0 [选项]"
echo ""
echo "选项:"
echo " -w, --webhook-url URL 设置WPS Webhook URL"
echo " -H, --host HOST 服务器主机地址 (默认: $DEFAULT_HOST)"
echo " -p, --port PORT 服务器端口 (默认: $DEFAULT_PORT)"
echo " -W, --workers NUM 工作进程数 (默认: $DEFAULT_WORKERS)"
echo " -l, --log-level LEVEL 日志级别 (默认: $DEFAULT_LOG_LEVEL)"
echo " -h, --help 显示此帮助信息"
echo ""
echo "示例:"
echo " $0 -w 'https://xz.wps.cn/api/v1/webhook/send?key=your_key'"
echo " $0 -w 'https://xz.wps.cn/api/v1/webhook/send?key=your_key' -p 8080"
echo " $0 --webhook-url 'https://xz.wps.cn/api/v1/webhook/send?key=your_key' --port 8080 --log-level debug"
}
# 解析命令行参数
WEBHOOK_URL="$DEFAULT_WEBHOOK_URL"
HOST="$DEFAULT_HOST"
PORT="$DEFAULT_PORT"
WORKERS="$DEFAULT_WORKERS"
LOG_LEVEL="$DEFAULT_LOG_LEVEL"
while [[ $# -gt 0 ]]; do
case $1 in
-w|--webhook-url)
WEBHOOK_URL="$2"
shift 2
;;
-H|--host)
HOST="$2"
shift 2
;;
-p|--port)
PORT="$2"
shift 2
;;
-W|--workers)
WORKERS="$2"
shift 2
;;
-l|--log-level)
LOG_LEVEL="$2"
shift 2
;;
-h|--help)
show_help
exit 0
;;
*)
echo "未知参数: $1"
show_help
exit 1
;;
esac
done
# 检查Python环境
if ! command -v python3 &> /dev/null; then
echo "错误: 未找到python3"
exit 1
fi
# 检查依赖
if ! python3 -c "import fastapi, uvicorn" &> /dev/null; then
echo "错误: 缺少必要的Python依赖"
echo "请运行: pip install -r requirements.txt"
exit 1
fi
# 构建启动命令
CMD="python3 app.py --host $HOST --port $PORT --workers $WORKERS --log-level $LOG_LEVEL"
if [ -n "$WEBHOOK_URL" ]; then
CMD="$CMD --webhook-url '$WEBHOOK_URL'"
fi
# 显示启动信息
echo "启动WPS Bot Game..."
echo "主机: $HOST"
echo "端口: $PORT"
echo "工作进程: $WORKERS"
echo "日志级别: $LOG_LEVEL"
if [ -n "$WEBHOOK_URL" ]; then
echo "Webhook URL: $WEBHOOK_URL"
fi
echo ""
# 启动应用
eval $CMD

View File

@@ -2,7 +2,7 @@
import httpx
import logging
from typing import Dict, Any, Optional
from config import WEBHOOK_URL
from config import GetWebhookURL
logger = logging.getLogger(__name__)
@@ -10,12 +10,14 @@ logger = logging.getLogger(__name__)
class MessageSender:
"""消息发送器"""
def __init__(self, webhook_url: str = WEBHOOK_URL):
def __init__(self, webhook_url: Optional[str] = None):
"""初始化消息发送器
Args:
webhook_url: Webhook URL
"""
if webhook_url is None:
webhook_url = GetWebhookURL()
self.webhook_url = webhook_url
self.client: Optional[httpx.AsyncClient] = None
@@ -128,5 +130,8 @@ def get_message_sender() -> MessageSender:
global _sender_instance
if _sender_instance is None:
_sender_instance = MessageSender()
else:
# 更新Webhook URL以确保使用最新的值
_sender_instance.webhook_url = GetWebhookURL()
return _sender_instance

View File

@@ -11,6 +11,10 @@ class CommandParser:
# 指令映射表
COMMAND_MAP = {
# 用户注册系统(必须在骰娘之前)
'.register': 'register',
'.注册': 'register',
# 骰娘
'.r': 'dice',
'.roll': 'dice',
@@ -39,6 +43,39 @@ class CommandParser:
'.gomoku': 'gomoku',
'.五子棋': 'gomoku',
'.gobang': 'gomoku',
# 三国杀
'.sgs': 'sanguosha',
'.三国杀': 'sanguosha',
# 积分系统
'.points': 'points',
'.积分': 'points',
'.checkin': 'points',
'.签到': 'points',
'.打卡': 'points',
# 炼金系统
'.alchemy': 'alchemy',
'.炼金': 'alchemy',
# 冒险系统
'.adventure': 'adventure',
'.冒险': 'adventure',
# 积分赠送系统
'.gift': 'gift',
'.赠送': 'gift',
'.送': 'gift',
# AI对话系统
'.ai': 'ai_chat',
'.aiconfig': 'ai_chat',
# 复述
'.say': 'say',
'.说': 'say',
'.复述': 'say',
# 帮助
'.help': 'help',
@@ -50,7 +87,7 @@ class CommandParser:
}
# 机器人名称模式(用于从@消息中提取)
AT_PATTERN = re.compile(r'@\s*\S+\s+(.+)', re.DOTALL)
AT_PATTERN = re.compile(r'@[^\s]+\s+(.+)', re.DOTALL)
@classmethod
def parse(cls, content: str) -> Optional[Tuple[str, str]]:
@@ -70,11 +107,26 @@ class CommandParser:
if at_match:
content = at_match.group(1).strip()
# 检查是否以指令开头
for cmd_prefix, game_type in cls.COMMAND_MAP.items():
if content.startswith(cmd_prefix):
# 返回游戏类型和完整指令
return game_type, content
# 拦截全角空格与全角标点(不允许)
# 范围包含:全角空格\u3000、全角标点\uFF01-\uFF60、兼容区\uFFE0-\uFFEE
if re.search(r"[\u3000\uFF01-\uFF60\uFFE0-\uFFEE]", content):
logger.debug(f"包含全角字符,忽略: {content}")
return None
# 大小写不敏感匹配(仅用于匹配,不改变返回的原始内容)
content_lower = content.lower()
# 使用最长前缀优先,避免 .r 误匹配 .roll 等更长前缀
command_keys_sorted = sorted(cls.COMMAND_MAP.keys(), key=len, reverse=True)
for cmd_prefix in command_keys_sorted:
if content_lower.startswith(cmd_prefix.lower()):
return cls.COMMAND_MAP[cmd_prefix], content
# 特殊处理:.ai 和 .aiconfig 指令支持参数
if content.startswith('.ai '):
return 'ai_chat', content
if content.startswith('.aiconfig '):
return 'ai_chat', content
# 没有匹配的指令
logger.debug(f"未识别的指令: {content}")

53
verify_webhook.py Normal file
View File

@@ -0,0 +1,53 @@
#!/usr/bin/env python3
"""验证Webhook URL动态更新功能"""
import os
import sys
import importlib
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def verify_webhook_url():
"""验证Webhook URL动态更新功能"""
print("验证Webhook URL动态更新功能...")
# 1. 导入配置模块
import config
original_url = config.WEBHOOK_URL
print(f"原始Webhook URL: {original_url}")
# 2. 设置新的环境变量
new_url = "https://verified.test.com/webhook?key=verified_key"
os.environ['WEBHOOK_URL'] = new_url
print(f"设置新的环境变量: {new_url}")
# 3. 重新加载配置模块
importlib.reload(config)
updated_url = config.WEBHOOK_URL
print(f"更新后的Webhook URL: {updated_url}")
# 4. 验证更新是否成功
if updated_url == new_url:
print("✅ Webhook URL动态更新功能正常!")
return True
else:
print(f"❌ Webhook URL动态更新功能异常! 期望: {new_url}, 实际: {updated_url}")
return False
if __name__ == "__main__":
print("=" * 50)
print("WPS Bot Game Webhook URL动态更新验证")
print("=" * 50)
success = verify_webhook_url()
print("\n" + "=" * 50)
if success:
print("🎉 验证通过! 现在可以使用 -w 参数指定Webhook URL了!")
print("\n使用方法:")
print("python app.py --webhook-url 'https://your-webhook-url'")
print("python app.py -w 'https://your-webhook-url'")
else:
print("❌ 验证失败! 需要进一步修复!")
print("=" * 50)