41 Commits

Author SHA1 Message Date
a70a53383d 修复狼人杀游戏白天不播报死亡名单的bug 2025-11-12 17:44:36 +08:00
79924a713d 删除积分赠送上限 2025-11-11 16:27:32 +08:00
ed6406cfc9 修改炼金期望收益为1 2025-11-10 15:17:53 +08:00
3d89dbf8f4 修复狼人投票bug 2025-11-10 10:37:43 +08:00
97692b120a 1.status现在列出玩家列表2.修复狼人投票bug 2025-11-10 10:28:08 +08:00
c7a4d3d047 新增空刀 2025-11-07 11:41:37 +08:00
58e6ee851c 1.解决狼人的杀技能立即生效且消耗的bug2.狼人杀人后不应该播报全局消息 2025-11-07 11:36:46 +08:00
aef45eb9a4 尝试修复逻辑错误 2025-11-07 11:07:57 +08:00
8eeec67730 新增日志流程用以解决游戏状态的bug 2025-11-07 10:48:54 +08:00
6c2227debe 取消对全角标点的拦截 2025-11-06 15:21:38 +08:00
ninemine
7c4d0a0ef2 新增热重载 2025-11-05 17:32:08 +08:00
4027771a65 改进阶段提示和自动流转功能 2025-11-04 17:43:27 +08:00
ninemine
9ed8fffcf7 修复自动脚本 2025-11-03 19:58:52 +08:00
bcf93e910a 添加狼人杀游戏系统:支持开房加入、身份分配私聊、技能使用和私聊功能 2025-11-03 12:30:02 +08:00
8487e6e931 新增指令私发功能 2025-11-03 11:29:46 +08:00
aaff6cee86 1.私聊功能通过 2025-11-03 11:21:24 +08:00
6e63b361e4 去除私聊时向主url发送消息的功能 2025-11-03 10:21:26 +08:00
d88edc31fc 1.新增私聊接口2.新增.talk指令 2025-11-03 10:14:38 +08:00
ff709eadca 冒险任务输出格式优化 2025-10-31 18:01:33 +08:00
156a0e5752 更新启动脚本 2025-10-31 17:42:11 +08:00
e99d8f4914 1.调整冒险任务的单位2.提升冒险任务的收益 2025-10-31 17:37:02 +08:00
c1e3082f08 合并赌场系统分支 2025-10-31 17:26:30 +08:00
3f3c21c3d6 修复停牌操作的字典key错误 2025-10-31 16:27:15 +08:00
b07b044035 21点游戏规则更新 2025-10-31 15:57:26 +08:00
9e16774d2f 修复21点中的结算错误 2025-10-31 15:39:53 +08:00
17da2e6111 修复数据库错误 2025-10-31 12:19:00 +08:00
1f10799562 实现21点与轮盘游戏 2025-10-31 12:09:07 +08:00
cef684f64b Save 2025-10-31 11:58:35 +08:00
7962852685 feat: 鎵╁睍鏁版嵁搴撴敮鎸佽疆鐩樺拰21鐐规父鎴?- 娣诲姞鍒楀瓨鍦ㄦ€ф鏌ヨ緟鍔╂柟娉?- 鎵╁睍casino_sessions鍜宑asino_bets琛ㄦ坊鍔犳柊瀛楁 - 鍒涘缓casino_blackjack_hands琛ㄥ瓨鍌?1鐐规墜鐗?- 淇敼create_casino_session鏀寔鍗曞満闄愬埗鍜屾柊瀛楁 - 鎵╁睍create_casino_bet鏀寔杞洏鍜?1鐐逛笓鐢ㄥ瓧娈?- 娣诲姞21鐐规墜鐗岀鐞嗘柟娉曢泦 2025-10-31 11:46:29 +08:00
cc97374e98 同步get_user_display_name 2025-10-31 11:16:38 +08:00
a62d5d66b7 t branch
Merge branch 'task/add-idiom-chain-game_2025-10-28_1'
2025-10-31 11:10:55 +08:00
19cde88acf 更新统一接口BaseGame.db.get_user_display_name获取用户名称 2025-10-31 11:07:41 +08:00
6f05ca98f1 修复冒险完成后不回收奖励就进行炼金会丢失奖励的问题 2025-10-31 10:30:29 +08:00
5a8a4ac026 比大小游戏修改 2025-10-30 17:27:15 +08:00
12aac846cc 新增赌场系统 2025-10-30 16:11:50 +08:00
4bddd4339f 发现的加分覆盖签到与长时间任务被清理的错误, 已经在数据库修复中尝试解决 2025-10-30 15:03:00 +08:00
927c16e1fc 新增say指令 2025-10-30 14:27:38 +08:00
8ffd261bb0 1.修改ai返回格式为markdown2.新增冒险放弃指令 2025-10-30 11:57:46 +08:00
3d4f754a0a Merge branch 'main' of http://www.liubai.site:3000/ninemine/WPSBot 2025-10-30 11:05:27 +08:00
0a60a7fc4b 修复指令解析错误 2025-10-30 11:05:08 +08:00
82dc616495 Merge feature: AI瀵硅瘽鍔熻兘瀹炵幇瀹屾垚 2025-10-30 01:11:29 +08:00
24 changed files with 6133 additions and 90 deletions

3
.gitignore vendored
View File

@@ -183,4 +183,5 @@ cython_debug/
.vscode/ .vscode/
# Database # Database
data/bot.db data/bot.db
liubai_web.pid

View File

@@ -107,6 +107,39 @@ WPSBotGame/
- 阻碍因素:无 - 阻碍因素:无
- 状态:成功 - 状态:成功
## 2025-10-30_00:00:00
- 已修改:
- games/adventure.py新增放弃指令分支新增 `_abandon_adventure`,更新帮助)
- games/base.py在全局帮助中添加放弃用法
- 更改:
1. 新增冒险放弃功能:
- 支持指令:`.adventure abandon``.adventure 放弃`
- 结算规则:最低倍率 × 已冒险分钟向下取整至少1分钟
- 发放奖励后删除状态
2. 帮助信息更新:
- 本地帮助与全局帮助均加入放弃用法说明
- 原因:允许用户在冒险过程中主动放弃并按最低倍率获得奖励
- 阻碍因素:无
- 状态:成功
## 2025-10-31_10:27:59
- 已修改:
- games/alchemy.py修复冒险任务完成后自动删除状态导致奖励丢失的bug
- 更改:
1. **Bug修复**:修复冒险任务完成后奖励丢失问题
- **问题**:在 `_perform_alchemy` 中,当检测到冒险任务已完成时,代码会自动删除冒险状态(`self.db.delete_game_state`),但没有发放奖励,导致用户奖励丢失
- **修复**:移除自动删除逻辑,改为提示用户先使用 `.adventure` 回收奖励
- **修改前**:冒险完成后自动删除状态,允许炼金(导致奖励丢失)
- **修改后**:冒险完成后提示用户先回收奖励,不允许炼金,确保奖励只能通过 `.adventure` 命令回收
2. **行为变更**
- 冒险进行中:提示剩余时间,不允许炼金(保持不变)
- 冒险已完成:提示先回收奖励,不允许炼金(修复后)
- 用户使用 `.adventure`:发放奖励并删除状态(保持不变)
- 状态已删除:可以正常炼金(保持不变)
- 原因修复冒险任务完成后自动删除状态导致奖励丢失的严重bug确保用户必须先主动回收奖励才能继续其他操作
- 阻碍因素:无
- 状态:成功
# 详细实施记录 # 详细实施记录
## 文件修改清单 ## 文件修改清单
@@ -135,7 +168,7 @@ WPSBotGame/
- 使用 `get_game_state(0, user_id, 'adventure')` 查询状态 - 使用 `get_game_state(0, user_id, 'adventure')` 查询状态
- 如果存在状态: - 如果存在状态:
* 计算剩余时间 * 计算剩余时间
* 如果已完成:自动删除状态,允许继续 * 如果已完成:提示用户先使用 `.adventure` 回收奖励不允许炼金2025-10-31修复避免奖励丢失移除自动删除逻辑
* 如果未完成返回错误消息显示剩余时间X分Y秒 * 如果未完成返回错误消息显示剩余时间X分Y秒
- 异常处理:捕获状态数据异常,自动清理损坏状态 - 异常处理:捕获状态数据异常,自动清理损坏状态
@@ -191,7 +224,7 @@ state_data = {
### 游戏互斥机制 ### 游戏互斥机制
- 炼金前检查:查询冒险状态 - 炼金前检查:查询冒险状态
- 如果冒险进行中:返回错误,显示剩余时间 - 如果冒险进行中:返回错误,显示剩余时间
- 如果冒险已完成:自动清理状态,允许炼金 - 如果冒险已完成:提示用户先使用 `.adventure` 回收奖励,不允许炼金(修复后:确保奖励不会丢失)
- 状态异常:自动清理,允许继续操作 - 状态异常:自动清理,允许继续操作
# 最终审查 # 最终审查
@@ -205,6 +238,7 @@ state_data = {
- ✅ 游戏互斥:冒险期间阻止炼金操作 - ✅ 游戏互斥:冒险期间阻止炼金操作
- ✅ 指令注册:`.adventure` 和 `.冒险` 指令正常工作 - ✅ 指令注册:`.adventure` 和 `.冒险` 指令正常工作
- ✅ 帮助信息:显示在全局帮助中 - ✅ 帮助信息:显示在全局帮助中
- ✅ 冒险放弃:`.adventure abandon` / `.adventure 放弃` 按最低倍率结算已冒险分钟并清理状态
## 代码质量 ## 代码质量
- ✅ 所有语法检查通过 - ✅ 所有语法检查通过

View File

@@ -364,3 +364,65 @@ llama-index-llms-ollama>=0.1.0
## 实施与计划匹配度 ## 实施与计划匹配度
实施与计划完全匹配 ✅ 实施与计划完全匹配 ✅
## 补充分析Markdown 渲染与发送通道2025-10-30
### 现状观察
- `routers/callback.py` 中仅当 `response_text.startswith('#')` 时才通过 `send_markdown()` 发送,否则使用 `send_text()`。这意味着即使 AI 返回了合法的 Markdown但不以 `#` 开头例如代码块、列表、表格、普通段落等也会被按纯文本通道发送导致下游WPS 侧)不进行 Markdown 渲染。
- `games/ai_chat.py` 的 `_generate_response()` 直接返回 `str(response)`,未对内容类型进行标注或判定,上层仅依赖首字符为 `#` 的启发式判断来选择发送通道。
- `utils/message.py` 已具备 `send_markdown()` 与 `send_text()` 两种发送方式,对应 `{"msgtype":"markdown"}` 与 `{"msgtype":"text"}` 消息结构;当前缺少自动识别 Markdown 的逻辑。
### 影响
- 当 AI 返回包含 Markdown 元素但非标题(不以 `#` 开头)的内容时,用户端看到的是未渲染的原始 Markdown 文本,表现为“格式不能成功排版”。
### 待确认问题(不含解决方案,需产品/实现口径)
1. 目标平台WPS 机器人)对 Markdown 的要求是否仅需 `msgtype=markdown` 即可渲染?是否存在必须以标题开头的限制?
2. 期望策略:
- 是否希望“.ai 的所有回复”统一走 Markdown 通道?
- 还是需要基于 Markdown 特征进行判定(如代码块、列表、链接、表格、行内格式等)?
3. 兼容性:若统一改为 Markdown 通道,是否会影响既有纯文本展示(例如换行、转义、表情)?
4. 其他指令模块是否也可能返回 Markdown若有是否一并纳入同一策略
### 相关代码参照点(路径)
- `routers/callback.py`:回复通道选择逻辑(基于 `startswith('#')`
- `games/ai_chat.py`AI 回复内容生成与返回(直接返回字符串)
- `utils/message.py``send_markdown()` 与 `send_text()` 的消息结构
### 决策结论与范围2025-10-30
- 分支策略:不创建新分支,继续在当前任务上下文内推进。
- 发送策略:`.ai` 产生的回复统一按 Markdown 发送。
- 影响范围:仅限 AI 对话功能(`.ai`/`ai_chat`),不扩展到其他指令模块。
# 任务进度(补充)
## [2025-10-30_??:??:??] 标注 Markdown 渲染问题(记录现状与待确认项)
- 已修改:
- `.tasks/2025-10-29_3_ai_chat.md`补充“Markdown 渲染与发送通道”分析与待确认清单(仅问题陈述,无解决方案)。
- 更改:
- 明确当前仅以标题开头触发 Markdown 发送的启发式导致部分 Markdown 未被渲染。
- 原因:
- 用户反馈“AI 返回内容支持 Markdown但当前直接当作文本返回导致无法正确排版”。
- 阻碍因素:
- 目标平台的 Markdown 渲染细节与统一策略选择待确认。
- 状态:
- 未确认(等待策略口径与平台渲染规范确认)。
## [2025-10-30_11:40:31] 执行AI 回复统一按 Markdown 发送(仅限 AI
- 已修改:
- `routers/callback.py`:在 `callback_receive()` 的发送阶段,当 `game_type == 'ai_chat'` 且存在 `response_text` 时,无条件调用 `send_markdown(response_text)`;若发送异常,记录日志并回退到 `send_text(response_text)`;其他指令模块继续沿用 `startswith('#')` 的启发式逻辑。
- 更改:
- 使 `.ai` 产生的回复在 WPS 端稳定触发 Markdown 渲染,不再依赖以 `#` 开头。
- 原因:
- 对齐“统一按 Markdown 发送(仅限 AI”的决策解决 Markdown 文本被当作纯文本发送导致的排版问题。
- 阻碍因素:
- 暂无。
- 状态:
- 成功。

View File

@@ -0,0 +1,531 @@
# 背景
文件名2025-10-30_1_add_casino_games.md
创建于2025-10-30_15:16:56
创建者admin
主分支main
任务分支task/add_casino_games_2025-01-14_1
Yolo模式Off
# 任务描述
在项目中新增赌场常见游戏,支持多个玩家下注等功能,使用积分系统模拟真实的赌场环境。
要求:
- 指令格式:`.赌场 <游戏类型> <参数>`
- 采用模块化设计,便于扩展多种赌场游戏
- 支持多人同时下注
- 集成现有积分系统
- 记录下注和收益数据
# 项目概览
基于WPS协作开放平台的自定义机器人游戏系统。使用FastAPI + SQLite架构已有完善的积分系统和多款游戏五子棋、成语接龙等。需要通过模块化设计添加赌场游戏功能。
# 分析
## 现有系统分析
1. **积分系统**:已实现 `add_points()``consume_points()` 方法
2. **游戏基类**`BaseGame` 提供统一的接口
3. **路由系统**:通过 `CommandParser` 解析指令,在 `callback.py` 中路由
4. **数据库**SQLite已有用户表、游戏状态表、统计表
## 需要新增的内容
1. **数据库表**
- `casino_bets`:记录所有下注
- `casino_results`:记录游戏结果和结算
- `casino_games`:记录游戏房间(可选)
2. **游戏模块**
- `games/casino.py`:主赌场模块
- 第一期支持:大小游戏
- 第二期计划:轮盘、二十一点等
3. **指令映射**
- `.赌场` -> casino 游戏类型
- 子指令:`轮盘``大小``21点`
## 设计要点
1. **模块化设计**:每种赌场游戏作为独立类
2. **下注流程**:创建房间 -> 玩家下注 -> 结算 -> 分发奖励
3. **安全性**:下注前检查积分,结算时原子性操作
4. **多玩家支持**:以 chat_id 为单位创建游戏房间
# 提议的解决方案
## 指令设计
采用 `.赌场 <游戏类型> <操作> <参数>` 的模块化结构:
### 大小游戏
- **庄家开启游戏**`.赌场 大小 open <最小下注> <最大下注> <赔率>`
- 示例:`.赌场 大小 open 10 100 2.0` 最小10分最大100分赔率2.0倍)
- **玩家下注**`.赌场 大小 bet <大小/小> <下注金额>`
- 示例:`.赌场 大小 bet 大 50` 下注50分压大
- 示例:`.赌场 大小 bet 小 30` 下注30分压小
- **查看状态**`.赌场 大小 status`
- **庄家结算**`.赌场 大小 settle <结果>`
- 示例:`.赌场 大小 settle 大` (开大)
- 示例:`.赌场 大小 settle 小` (开小)
### 轮盘游戏(二期实现)
- 暂不实现,等大小游戏完善后再扩展
### 21点游戏二期实现
- 暂不实现,等大小游戏完善后再扩展
## 游戏流程
1. 庄家开启游戏(指定下注限额和赔率参数)
2. 玩家下注(可多人同时参与)
3. 庄家确认结算(手动触发结果)
4. 系统自动分发奖励
## 数据库设计
### 新增表casino_bets下注记录表
```sql
CREATE TABLE IF NOT EXISTS casino_bets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
user_id INTEGER NOT NULL,
bet_type TEXT NOT NULL, -- '大' 或 '小'
amount INTEGER NOT NULL,
multiplier REAL NOT NULL, -- 赔率
status TEXT DEFAULT 'pending', -- pending/settled/cancelled
result TEXT, -- 游戏结果
win_amount INTEGER, -- 赢得金额
created_at INTEGER NOT NULL,
settled_at INTEGER,
FOREIGN KEY (user_id) REFERENCES users(user_id)
)
```
### 新增表casino_sessions游戏会话表
```sql
CREATE TABLE IF NOT EXISTS casino_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
banker_id INTEGER NOT NULL, -- 庄家ID
min_bet INTEGER NOT NULL,
max_bet INTEGER NOT NULL,
multiplier REAL NOT NULL,
house_fee REAL DEFAULT 0.05, -- 抽水率默认5%
status TEXT DEFAULT 'open', -- open/settling/closed
created_at INTEGER NOT NULL,
settled_at INTEGER,
UNIQUE(chat_id, game_type, status)
)
```
### 索引
- `casino_bets(chat_id, game_type, status)` - 快速查询待结算下注
- `casino_sessions(chat_id, game_type, status)` - 快速查询活跃游戏
## 方案对比
### 方案A纯数据库表方案推荐
**优点**
- 数据结构清晰,便于查询统计
- 支持历史记录追踪
- 并发安全,利用数据库事务
- 易于扩展复杂查询
**缺点**
- 需要维护额外的表结构
- 稍微复杂一些
**决策**:采用此方案
### 方案Bgame_states + JSON方案
**优点**
- 复用现有系统
- 实现简单
**缺点**
- 难以进行复杂统计查询
- JSON解析性能较差
- 数据格式不够规范化
## 核心实现细节
### 1. 游戏流程控制
- **开启游戏**检查是否已有活跃游戏同一chat_id只能有一个进行中的游戏
- **下注限制**检查session状态、下注金额范围、玩家积分
- **结算控制**只有庄家可以结算结算后自动关闭session
### 2. 下注流程
1. 检查是否有活跃的session
2. 检查下注金额是否符合min/max限制
3. 检查用户积分是否充足
4. 扣除下注金额consume_points
5. 记录下注到casino_bets表
### 3. 结算流程
1. 验证是否为庄家操作
2. 查询所有pending状态的下注
3. 计算每个玩家的输赢
4. 使用数据库事务确保原子性:
- 更新bets状态
- 发放/扣除积分
- 更新session状态
5. 返回结算报告
### 4. 抽水机制
- **抽水率**5%可配置存储在session.house_fee中
- **抽水时机**:从玩家的赢得金额中扣除
- **抽水归属**:归系统所有(不返还给庄家)
- **计算方式**
- 玩家赢得 = 下注金额 × 赔率
- 实际发放 = 赢得金额 × (1 - 抽水率)
- 抽水金额 = 赢得金额 × 抽水率
### 5. 错误处理
- 下注时积分不足:给出明确提示
- 重复下注:允许(可下多注)
- 非法下注金额:给出范围提示
- 非庄家尝试结算:拒绝
## 安全性
- 下注前检查积分
- 结算时使用数据库事务保证原子性
- 抽水机制保护庄家(虽然抽水归系统)
- 验证庄家身份
- 防止重复结算
# 详细实施计划
## 文件1: core/database.py
### 修改函数: init_tables()
在现有表创建之后约第130行添加赌场相关表的创建
位置:在 `user_points` 表创建之后约第130行添加
```python
# 赌场下注记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS casino_bets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
user_id INTEGER NOT NULL,
bet_type TEXT NOT NULL,
amount INTEGER NOT NULL,
multiplier REAL NOT NULL,
status TEXT DEFAULT 'pending',
result TEXT,
win_amount INTEGER,
created_at INTEGER NOT NULL,
settled_at INTEGER,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 赌场游戏会话表
cursor.execute("""
CREATE TABLE IF NOT EXISTS casino_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
banker_id INTEGER NOT NULL,
min_bet INTEGER NOT NULL,
max_bet INTEGER NOT NULL,
multiplier REAL NOT NULL,
house_fee REAL DEFAULT 0.05,
status TEXT DEFAULT 'open',
created_at INTEGER NOT NULL,
settled_at INTEGER,
UNIQUE(chat_id, game_type, status)
)
""")
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_casino_bets
ON casino_bets(chat_id, game_type, status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_casino_sessions
ON casino_sessions(chat_id, game_type, status)
""")
```
### 新增函数: create_casino_session()
函数签名:
```python
def create_casino_session(self, chat_id: int, game_type: str, banker_id: int,
min_bet: int, max_bet: int, multiplier: float,
house_fee: float = 0.05) -> int:
```
功能创建新的赌场游戏会话返回session_id
### 新增函数: get_active_casino_session()
函数签名:
```python
def get_active_casino_session(self, chat_id: int, game_type: str) -> Optional[Dict]:
```
功能:获取活跃的游戏会话
### 新增函数: create_casino_bet()
函数签名:
```python
def create_casino_bet(self, chat_id: int, game_type: str, user_id: int,
bet_type: str, amount: int, multiplier: float) -> int:
```
功能创建下注记录返回bet_id
### 新增函数: get_pending_bets()
函数签名:
```python
def get_pending_bets(self, chat_id: int, game_type: str) -> List[Dict]:
```
功能:获取待结算的下注列表
### 新增函数: settle_casino_bets()
函数签名:
```python
def settle_casino_bets(self, chat_id: int, game_type: str, result: str,
banker_id: int) -> Dict:
```
功能结算所有下注返回结算详情字典winners, losers, total_win等
### 新增函数: close_casino_session()
函数签名:
```python
def close_casino_session(self, chat_id: int, game_type: str):
```
功能:关闭游戏会话
## 文件2: games/casino.py新建
### 类: CasinoGame
继承自 `BaseGame`
### 方法: __init__()
初始化数据库连接
### 方法: async handle(command, chat_id, user_id) -> str
主处理函数,解析指令并调用相应的处理方法
解析逻辑:
- 提取命令参数,格式:`.赌场 <游戏类型> <操作> <参数>`
- 识别游戏类型(第一期只支持"大小"
- 分发到相应的处理方法
### 方法: async _handle_bigsmall(command, args, chat_id, user_id) -> str
处理大小游戏的各种操作
支持的操作:
- open: 开启游戏
- bet: 下注
- status: 查看状态
- settle: 结算
### 方法: async _open_bigsmall(args, chat_id, user_id) -> str
庄家开启大小游戏
参数解析:`<最小下注> <最大下注> <赔率>`
参数验证和限制
### 方法: async _bet_bigsmall(args, chat_id, user_id) -> str
玩家下注
参数解析:`<大小/小> <下注金额>`
检查session、金额范围、用户积分
### 方法: async _status_bigsmall(chat_id, game_type) -> str
查看当前游戏状态
### 方法: async _settle_bigsmall(args, chat_id, user_id) -> str
庄家结算游戏
参数解析:`<大/小>`
验证庄家身份,结算所有下注
### 方法: get_help() -> str
返回帮助信息
## 文件3: utils/parser.py
### 修改: COMMAND_MAP
添加赌场指令映射:
```python
# 赌场系统
'.赌场': 'casino',
'.casino': 'casino',
```
## 文件4: routers/callback.py
### 修改: async handle_command()
在AI对话系统之后约第209行添加
```python
# 赌场系统
if game_type == 'casino':
from games.casino import CasinoGame
game = CasinoGame()
return await game.handle(command, chat_id, user_id)
```
## 文件5: games/base.py
### 修改: get_help_message()
在积分赠送系统之后添加赌场游戏帮助:
```python
### 🎰 赌场系统
- `.赌场 大小 open <最小> <最大> <赔率>` - 庄家开启大小游戏
- `.赌场 大小 bet </> <金额>` - 下注
- `.赌场 大小 status` - 查看状态
- `.赌场 大小 settle </>` - 庄家结算
```
## 实施清单
1. 修改 `core/database.py``init_tables()` 方法,添加赌场表创建和索引
2.`core/database.py` 中添加 `create_casino_session()` 方法
3.`core/database.py` 中添加 `get_active_casino_session()` 方法
4.`core/database.py` 中添加 `create_casino_bet()` 方法
5.`core/database.py` 中添加 `get_pending_bets()` 方法
6.`core/database.py` 中添加 `settle_casino_bets()` 方法
7.`core/database.py` 中添加 `close_casino_session()` 方法
8. 创建文件 `games/casino.py`,定义 `CasinoGame`
9.`games/casino.py` 中实现 `__init__()` 方法
10.`games/casino.py` 中实现 `async handle()` 方法
11.`games/casino.py` 中实现 `async _handle_bigsmall()` 方法
12.`games/casino.py` 中实现 `async _open_bigsmall()` 方法
13.`games/casino.py` 中实现 `async _bet_bigsmall()` 方法
14.`games/casino.py` 中实现 `async _status_bigsmall()` 方法
15.`games/casino.py` 中实现 `async _settle_bigsmall()` 方法
16.`games/casino.py` 中实现 `get_help()` 方法
17. 修改 `utils/parser.py`,在 COMMAND_MAP 中添加赌场指令映射
18. 修改 `routers/callback.py`,在 `handle_command()` 中添加赌场路由
19. 修改 `games/base.py`,在 `get_help_message()` 中添加赌场帮助信息
20. 测试所有功能点,确保无错误
# 当前执行步骤:"2. 详细技术规划完成,等待进入实现阶段"
# 任务进度
[2025-10-30_15:16:56]
- 已修改:创建任务文件 `.tasks/2025-10-30_1_add_casino_games.md`
- 更改:创建任务分支 `task/add_casino_games_2025-01-14_1` 和任务文件
- 原因按照RIPER-5协议建立工作基础
- 阻碍因素:无
- 状态:成功
[2025-10-30_16:30:00](预估时间)
- 已修改:完成详细技术规划
- 更改:设计数据库表结构、游戏流程、抽水机制等细节
- 原因:为实施阶段提供详细技术规范
- 阻碍因素:无
- 状态:成功
[2025-10-30_16:07:57]
- 已修改core/database.py, games/casino.py, utils/parser.py, routers/callback.py, games/base.py
- 更改完成所有实施步骤1-19
- 添加赌场表创建和索引
- 实现6个数据库方法create_casino_session, get_active_casino_session, create_casino_bet, get_pending_bets, settle_casino_bets, close_casino_session
- 创建完整的CasinoGame类实现大小游戏所有功能
- 注册指令映射和路由
- 添加帮助信息
- 原因:按照详细实施计划完成全部功能开发
- 阻碍因素:无
- 状态:成功
[2025-10-30_17:20:00](预估时间)
- 已修改games/casino.py
- 更改:修改结算逻辑,从庄家指定结果改为系统随机生成
- 移除庄家输入种子/结果的参数
- 使用random.random()生成随机结果50%大/50%小)
- 更新帮助信息settle命令不再需要参数
- 原因:用户反馈庄家不应该能够操控游戏结果,庄家也是玩家
- 阻碍因素:无
- 状态:成功
[2025-10-30_17:26:19]
- 已修改games/casino.py, games/base.py
- 更改:添加庄家放弃游戏功能
- 新增_cancel_bigsmall()方法处理放弃逻辑
- 放弃时返还所有玩家下注
- 关闭会话并标记下注为cancelled
- 添加cancel命令支持cancel/放弃/关闭)
- 更新帮助信息和base.py中的帮助
- 原因:用户要求庄家可以放弃本轮游戏
- 阻碍因素:无
- 状态:成功
[2025-10-31_11:35:18]
- 已修改core/database.py
- 更改扩展数据库支持轮盘和21点游戏
- 添加列存在性检查辅助方法_column_exists, _add_column_if_not_exists
- 扩展casino_sessions表添加current_phase和blackjack_multiplier字段兼容性检查
- 扩展casino_bets表添加bet_category、bet_number、bet_value、hand_status字段兼容性检查
- 创建casino_blackjack_hands表存储21点游戏手牌数据
- 修改create_casino_session()支持单场限制检查get_any_active_casino_session和新字段
- 扩展create_casino_bet()支持轮盘和21点专用字段参数
- 添加21点手牌管理方法create_blackjack_hand、get_blackjack_hand、update_blackjack_hand、get_all_blackjack_hands
- 原因为轮盘和21点游戏提供数据库支持确保字段分离和向后兼容
- 阻碍因素:无
- 状态:成功
[2025-10-31_15:15:08]
- 已修改core/database.py
- 更改修复大小游戏结算时的UNIQUE约束冲突问题
- 移除casino_sessions表的UNIQUE(chat_id, game_type, status)约束
- 原因status='closed'时需要允许多条历史记录UNIQUE约束阻止了结算时更新status
- 添加兼容性迁移逻辑检测旧版本表结构自动重建表以移除UNIQUE约束
- 迁移时复制所有历史数据,处理外键关系(临时禁用/启用外键检查)
- 单场限制通过应用层逻辑get_any_active_casino_session保证
- 原因:用户测试大小游戏结算时遇到"UNIQUE constraint failed"错误
- 阻碍因素:无
- 状态:成功
[2025-10-31_15:15:08]
- 已修改core/database.py
- 更改修复21点游戏结算逻辑问题
- 修正losers统计逻辑将条件从`not player_is_busted and player_points != banker_points`改为`player_points != banker_points`
- 原因原条件排除了爆牌玩家导致爆牌玩家未被统计到losers列表
- 修正数据库更新逻辑:明确区分三种情况
- 赢家:发放奖励并更新数据库
- 平局player_points == banker_points已返还下注更新数据库
- 输家else分支包括爆牌和点数小于庄家更新数据库
- 改进结果字符串显示:包含玩家和庄家的状态信息(爆牌、黑杰克等)
- 例如:"庄家19点 vs 玩家爆牌" 或 "庄家19点 vs 玩家20点黑杰克"
- 原因用户测试21点游戏时发现3人游戏中只有1个赢家被结算1个爆牌玩家和1个平局玩家未被结算
- 阻碍因素:无
- 状态:成功
[2025-10-31_17:24:08]
- 已修改games/casino.py
- 更改重构21点游戏指令流程改为更符合标准的玩法
- 修改_open_blackjack改为`.赌场 21点 open <底注> <黑杰克倍数>`移除max_bet参数
- 新增_join_blackjack添加`.赌场 21点 join`指令,玩家加入游戏时扣除底注,检查积分是否足够
- 修改_bet_blackjack改为加注功能仅在playing阶段可用加注金额必须不低于底注
- 修改_deal_blackjack实现标准发牌顺序先玩家1张→庄家明牌→玩家第2张→庄家暗牌庄家隐藏一张暗牌
- 修改_status_blackjack游戏阶段隐藏庄家暗牌只显示明牌结算后显示完整手牌
- 修改_stand_blackjack检查所有玩家是否都已完成停牌或爆牌如果所有玩家都完成则自动触发结算
- 修改_hit_blackjack如果爆牌后所有玩家都完成也自动触发结算
- 更新_get_blackjack_help反映新的指令流程和规则
- 原因:用户要求新的指令流程:启动(open)→加入(join)→发牌(deal)→操作(hit/stand/bet加注)→自动结算
- 阻碍因素:无
- 状态:成功
[2025-10-31_17:24:08]
- 已修改games/casino.py
- 更改:修复停牌和要牌功能中的字典键访问错误
- 修复_hit_blackjack中自动结算检查`player_hand['hand_status']`改为`player_hand['status']`
- 修复_stand_blackjack中自动结算检查`player_hand['hand_status']`改为`player_hand['status']`
- 原因:`get_all_blackjack_hands`返回的字典结构为`{user_id: {'cards': [...], 'status': ...}}`,应使用`status`而不是`hand_status`
- 原因用户测试停牌功能时遇到KeyError: 'hand_status'错误
- 阻碍因素:无
- 状态:成功
# 最终审查
待完成

View File

@@ -0,0 +1,93 @@
# 背景
文件名2025-10-31_1_change-adventure-time-to-seconds.md
创建于2025-10-31_17:30:37
创建者admin
主分支main
任务分支main
Yolo模式Off
# 任务描述
将冒险系统的计时单位从分钟更改为秒。需要修改所有相关的时间计算、验证逻辑、显示文本和帮助信息。
## 后续需求
支持时间格式输入(如 `1h30m10s`),并在显示时按时分秒格式显示,避免只显示秒数过于冗余。
# 项目概览
修改 `games/adventure.py` 文件,将冒险时间单位从分钟改为秒。
# 分析
当前代码中所有时间相关逻辑都以分钟为单位:
- 默认值1分钟
- 参数验证至少1分钟
- 时间计算:`cost_time * 60` 转换为秒
- 显示文本:所有提示信息都显示"分钟"
- 奖励计算:倍率 × 分钟数
- 放弃冒险:按分钟计算已消耗时间
需要将所有相关逻辑改为秒为单位。
# 提议的解决方案
将所有时间单位从分钟改为秒:
1. 修改默认值和参数验证
2. 移除时间计算中的 `* 60` 转换
3. 更新所有显示文本
4. 简化剩余时间显示逻辑(直接显示秒)
5. 更新帮助信息
# 当前执行步骤:"已完成"
# 任务进度
## 2025-10-31_17:32:32
- 已修改games/adventure.py
- 更改:
1. 修改默认值注释:从"1分钟"改为"1秒"
2. 修改函数文档cost_time参数从"(分钟)"改为"(秒)"
3. 修改参数验证:错误提示从"至少需要1分钟"改为"至少需要1秒"
4. 移除时间转换:将`end_time = start_time + saved_cost_time * 60`改为`end_time = start_time + saved_cost_time`
5. 更新显示文本:所有"分钟"改为"秒"(冒险结果、冒险进行中、冒险开始、冒险放弃)
6. 简化剩余时间显示:移除分钟/秒的转换逻辑,直接显示秒数
7. 修改放弃冒险逻辑时间计算改为直接使用秒数elapsed_seconds移除分钟转换
8. 更新帮助信息:所有"分钟"改为"秒"
- 原因:将冒险系统计时单位从分钟改为秒,使时间控制更精确
- 阻碍因素:无
- 状态:成功
## 2025-10-31_17:35:06
- 已修改games/adventure.py
- 更改:
1. 添加 `re` 模块导入,用于正则表达式解析
2. 新增 `_parse_time_string` 方法:解析时间格式字符串,支持以下格式:
- 纯数字(按秒):`60` -> 60秒
- 时分秒组合:`1h30m10s` -> 5410秒
- 分钟秒组合:`30m10s` -> 1810秒
- 只有小时:`1h` -> 3600秒
- 只有分钟:`30m` -> 1800秒
- 只有秒:`10s` -> 10秒
3. 新增 `_format_time` 方法:将秒数格式化为 "X时X分X秒" 格式自动省略为0的部分
4. 修改 `handle` 方法:使用 `_parse_time_string` 解析时间参数,提供格式错误提示
5. 更新所有时间显示位置:
- 冒险结果:使用 `_format_time` 格式化消耗时间
- 冒险进行中:使用 `_format_time` 格式化剩余时间和总时长
- 冒险开始:使用 `_format_time` 格式化持续时间
- 冒险放弃:使用 `_format_time` 格式化已计入时间
6. 更新帮助信息:添加时间格式说明和示例
- 原因:支持更灵活的时间输入格式,提升用户体验;时间显示按时分秒格式,避免冗长的秒数显示
- 阻碍因素:无
- 状态:成功
## 2025-10-31_17:49:24
- 已修改games/adventure.py
- 更改:
1. 修复预计完成时间显示问题:
- 原问题:只显示小时时刻(`%H:%M:%S`),跨天的冒险无法正确显示,且秒数显示不够明确
- 第一次尝试根据冒险时长是否超过24小时判断不准确
- 最终解决方案:根据完成时间是否跨天来判断
- 跨天或跨年:显示完整日期时间 `YYYY-MM-DD HH:MM:SS`(包含年月日和时分秒)
- 同一天:显示时间 `HH:MM:SS`(包含时分秒)
- 原因:修复跨天冒险无法正确显示完成时间的问题,只要跨天就显示完整日期,确保秒数清晰显示
- 阻碍因素:无
- 状态:成功
# 最终审查

View File

@@ -0,0 +1,480 @@
# 背景
文件名2025-11-03_1_user-webhook-url.md
创建于2025-11-03_09:38:30
创建者admin
主分支main
任务分支task/user-webhook-url_2025-11-03_1
Yolo模式Off
# 任务描述
在WPS Bot Game项目中添加用户专属webhook URL功能允许每个用户注册自己的个人webhook URL作为私聊途径。
## 核心需求
1. 用户可以通过 `.register url <url>` 指令注册个人webhook URL
2. 私聊消息发送功能将被封装为API接口供其他系统调用
3. 提供检测用户是否具有个人URL的接口用于系统运行时确保参与用户都能被私聊
4. 服务器启动时使用的webhook URL称为主URL私聊用的URL称为个人URL
## 术语定义
- **主URL**: 服务器启动时使用的webhook URL用于群聊消息发送
- **个人URL**: 用户注册的专属webhook URL用于私聊消息发送
## 功能要求
1. **注册功能**: 支持 `.register url <url>` 指令注册/更新个人URL
2. **私聊接口**: 封装私聊消息发送功能为API接口暂不对用户开放命令
3. **检测接口**: 提供单个和批量检测用户是否有个人URL的接口
4. **数据库支持**: 在users表中添加webhook_url字段
# 项目概览
## 项目结构
```
WPSBotGame/
├── app.py # FastAPI主应用
├── config.py # 配置管理
├── core/
│ ├── database.py # SQLite数据库操作
│ ├── middleware.py # 中间件
│ └── models.py # 数据模型
├── routers/
│ ├── callback.py # Callback路由处理
│ ├── health.py # 健康检查
│ └── private.py # 私聊相关API新增
├── games/ # 游戏模块
│ └── ... # 各种游戏
└── utils/
├── parser.py # 指令解析
└── message.py # 消息发送
```
# 分析
## 当前状态
1. `users` 表已有基础字段user_id, username, created_at, last_active
2. `routers/callback.py` 中已有 `.register` 命令处理名称注册
3. `utils/message.py` 中的 `MessageSender` 类使用全局webhook URL发送消息
4. 数据库已支持动态添加列(`_add_column_if_not_exists`方法)
5. `init_tables()` 方法在表创建后会进行兼容性检查,使用 `_add_column_if_not_exists` 安全添加新列
## 关键技术点
1. **数据库层**:
-`init_tables()`中使用`_add_column_if_not_exists`添加`webhook_url`字段TEXT类型可为NULL
- 确保兼容性:如果表已存在且没有该列,会自动添加
- 添加`set_user_webhook_url(user_id, webhook_url)`方法
- 添加`get_user_webhook_url(user_id)`方法
- 添加`has_webhook_url(user_id)`方法
- 添加`check_users_webhook_urls(user_ids)`批量检查方法
2. **注册命令扩展**:
- 修改`handle_register_command`支持`.register url <url>`子命令
- 保留原有的`.register <name>`功能
- URL验证基本格式检查
3. **私聊消息发送**:
- 封装私聊消息发送功能到`utils/message.py`
- 创建`send_private_message(user_id, content, msg_type='text')`函数
- 如果用户有个人URL则使用个人URL否则返回错误
4. **API接口**:
- 创建`routers/private.py`路由文件
- `POST /api/private/send` - 发送私聊消息
- `GET /api/private/check/{user_id}` - 检查单个用户是否有个人URL
- `POST /api/private/check-batch` - 批量检查多个用户
# 提议的解决方案
## 方案概述
1. **数据库扩展**: 在users表添加webhook_url字段并实现相关CRUD方法
2. **注册命令扩展**: 扩展`.register`命令支持`url`子命令
3. **私聊功能封装**: 创建私聊消息发送工具函数
4. **API接口**: 创建私聊相关的RESTful API接口
## 设计决策
- 个人URL存储在users表中与用户信息关联
- 私聊功能暂不提供用户命令仅作为API接口供系统调用
- URL验证采用基本格式检查http/https开头
- 批量检查接口支持传入用户ID列表返回每个用户的URL状态
# 当前执行步骤:"3. 执行阶段完成"
实施清单:
1. 在core/database.py的init_tables()方法末尾添加webhook_url字段兼容性检查
2. 在core/database.py中添加set_user_webhook_url方法
3. 在core/database.py中添加get_user_webhook_url方法
4. 在core/database.py中添加has_webhook_url方法
5. 在core/database.py中添加check_users_webhook_urls方法
6. 在core/models.py文件末尾添加PrivateMessageRequest模型
7. 在core/models.py中添加CheckBatchRequest模型
8. 在core/models.py中添加CheckBatchResponse模型
9. 在core/models.py的导入中添加List类型
10. 修改routers/callback.py的handle_register_command函数支持url子命令
11. 在utils/message.py文件末尾添加send_private_message函数
12. 创建新文件routers/private.py包含所有私聊相关API接口
13. 在app.py中导入private路由模块
14. 在app.py中注册private路由
# 详细实施计划
## 文件1: core/database.py
### 修改点1: 在init_tables()方法中添加webhook_url字段兼容性检查
**位置**: 在`init_tables()`方法的末尾第324行`logger.info("数据库表初始化完成")`之前
**修改内容**:
```python
# 兼容性检查为users表添加webhook_url字段
self._add_column_if_not_exists('users', 'webhook_url', 'TEXT')
```
### 修改点2: 添加set_user_webhook_url方法
**位置**: 在`# ===== 用户相关操作 =====`部分,`update_user_name`方法之后约第414行之后
**方法签名**:
```python
def set_user_webhook_url(self, user_id: int, webhook_url: str) -> bool:
"""设置用户webhook URL
Args:
user_id: 用户ID
webhook_url: Webhook URL
Returns:
是否成功
"""
```
**实现逻辑**:
- 使用try-except包装
- 确保用户存在调用get_or_create_user
- UPDATE users SET webhook_url = ? WHERE user_id = ?
- 记录成功/失败日志
- 返回True/False异常时返回False
### 修改点3: 添加get_user_webhook_url方法
**位置**: 紧接`set_user_webhook_url`方法之后
**方法签名**:
```python
def get_user_webhook_url(self, user_id: int) -> Optional[str]:
"""获取用户webhook URL
Args:
user_id: 用户ID
Returns:
Webhook URL如果不存在返回None
"""
```
**实现逻辑**:
- SELECT webhook_url FROM users WHERE user_id = ?
- 如果查询结果为None返回None
- 如果webhook_url为None或空字符串返回None
- 否则返回URL字符串
### 修改点4: 添加has_webhook_url方法
**位置**: 紧接`get_user_webhook_url`方法之后
**方法签名**:
```python
def has_webhook_url(self, user_id: int) -> bool:
"""检查用户是否有个人webhook URL
Args:
user_id: 用户ID
Returns:
是否有个人URL
"""
```
**实现逻辑**:
- 调用get_user_webhook_url
- 检查返回值是否不为None且不为空字符串
### 修改点5: 添加check_users_webhook_urls方法批量检查
**位置**: 紧接`has_webhook_url`方法之后
**方法签名**:
```python
def check_users_webhook_urls(self, user_ids: List[int]) -> Dict[int, bool]:
"""批量检查用户是否有个人webhook URL
Args:
user_ids: 用户ID列表
Returns:
字典 {user_id: has_url}
"""
```
**实现逻辑**:
- 如果user_ids为空返回空字典
- 使用IN子句查询SELECT user_id, webhook_url FROM users WHERE user_id IN (?)
- 构建结果字典初始化为所有user_id为False
- 遍历查询结果如果webhook_url不为None且不为空字符串则设为True
- 返回结果字典
## 文件2: routers/callback.py
### 修改点1: 修改handle_register_command函数支持url子命令
**位置**: 第226-260行的`handle_register_command`函数
**修改内容**:
- 提取命令和参数后,检查第一个参数是否为"url"
- 如果是"url"提取URL参数验证URL格式http/https开头调用`db.set_user_webhook_url`
- 如果不是"url",保持原有逻辑(注册名称)
- 更新帮助信息,包含两种用法
**新的函数逻辑**:
```python
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip()
# 检查是否为url子命令
parts = args.split(maxsplit=1)
if parts and parts[0].lower() == 'url':
# 处理URL注册
if len(parts) < 2:
return "❌ 请提供webhook URL\n\n正确格式:`.register url <URL>`\n\n示例:\n`.register url https://example.com/webhook?key=xxx`"
webhook_url = parts[1].strip()
# URL验证
if not webhook_url.startswith(('http://', 'https://')):
return "❌ URL格式无效必须以 http:// 或 https:// 开头。"
# 设置URL
db = get_db()
success = db.set_user_webhook_url(user_id, webhook_url)
if success:
return f"✅ Webhook URL注册成功\n\n**您的个人URL**{webhook_url}\n\n私聊消息将发送到此URL。"
else:
return "❌ 注册失败!请稍后重试。"
else:
# 原有的名称注册逻辑
...
```
## 文件3: utils/message.py
### 修改点1: 添加send_private_message函数
**位置**: 在文件末尾,`get_message_sender`函数之后
**函数签名**:
```python
async def send_private_message(user_id: int, content: str, msg_type: str = 'text') -> bool:
"""发送私聊消息到用户个人webhook URL
Args:
user_id: 目标用户ID
content: 消息内容
msg_type: 消息类型 ('text' 或 'markdown')
Returns:
是否发送成功如果用户没有个人URL则返回False
"""
```
**实现逻辑**:
- 从数据库获取用户webhook URL
- 如果URL不存在记录日志并返回False
- 创建MessageSender实例使用用户的个人URL
- 根据msg_type调用send_text或send_markdown
- 返回发送结果
## 文件4: core/models.py (新增数据模型)
### 修改点1: 添加PrivateMessageRequest模型
**位置**: 文件末尾
**模型定义**:
```python
class PrivateMessageRequest(BaseModel):
"""私聊消息请求模型"""
user_id: int = Field(..., description="目标用户ID")
content: str = Field(..., description="消息内容")
msg_type: str = Field(default="text", description="消息类型: text 或 markdown")
```
### 修改点2: 添加CheckBatchRequest模型
**位置**: 紧接PrivateMessageRequest之后
**模型定义**:
```python
class CheckBatchRequest(BaseModel):
"""批量检查请求模型"""
user_ids: List[int] = Field(..., description="用户ID列表")
```
### 修改点3: 添加CheckBatchResponse模型
**位置**: 紧接CheckBatchRequest之后
**模型定义**:
```python
class CheckBatchResponse(BaseModel):
"""批量检查响应模型"""
results: Dict[int, bool] = Field(..., description="用户ID到是否有URL的映射")
```
**注意**: core/models.py需要添加`from typing import List`导入(如果尚未导入)
## 文件5: routers/private.py (新建文件)
### 文件结构:
```python
"""私聊相关API路由"""
import logging
from typing import List, Dict
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
from core.database import get_db
from core.models import PrivateMessageRequest, CheckBatchRequest, CheckBatchResponse
from utils.message import send_private_message
logger = logging.getLogger(__name__)
router = APIRouter()
```
### 接口1: POST /api/private/send
**位置**: router定义之后
**函数签名**:
```python
@router.post("/private/send")
async def send_private(request: PrivateMessageRequest):
"""发送私聊消息
请求体:
{
"user_id": 123456,
"content": "消息内容",
"msg_type": "text" // 可选,默认为"text"
}
"""
```
**实现逻辑**:
- 验证msg_type必须是"text"或"markdown"否则返回400错误
- 调用send_private_message
- 如果返回False用户没有个人URL或发送失败返回400错误和相应消息
- 成功则返回JSONResponse({"success": True, "message": "消息发送成功"})
### 接口2: GET /api/private/check/{user_id}
**位置**: send_private之后
**函数签名**:
```python
@router.get("/private/check/{user_id}")
async def check_user_webhook(user_id: int):
"""检查用户是否有个人webhook URL"""
```
**实现逻辑**:
- 调用db.has_webhook_url(user_id)
- 返回JSONResponse({"user_id": user_id, "has_webhook_url": bool})
### 接口3: POST /api/private/check-batch
**位置**: check_user_webhook之后
**函数签名**:
```python
@router.post("/private/check-batch")
async def check_users_webhook_batch(request: CheckBatchRequest):
"""批量检查用户是否有个人webhook URL
请求体:
{
"user_ids": [123456, 789012, ...]
}
"""
```
**实现逻辑**:
- 调用db.check_users_webhook_urls(request.user_ids)
- 返回CheckBatchResponse(results=...)
## 文件6: app.py
### 修改点1: 导入private路由
**位置**: 第13行导入语句中
**修改内容**:
```python
from routers import callback, health, private
```
### 修改点2: 注册private路由
**位置**: 第75-76行路由注册部分
**修改内容**:
```python
app.include_router(callback.router, prefix="/api", tags=["callback"])
app.include_router(health.router, tags=["health"])
app.include_router(private.router, prefix="/api", tags=["private"])
```
# 任务进度
[2025-11-03_09:45:56]
- 已修改:
1. core/database.py - 添加webhook_url字段兼容性检查和4个数据库方法
2. core/models.py - 添加3个API数据模型和List类型导入
3. routers/callback.py - 扩展handle_register_command支持url子命令
4. utils/message.py - 添加send_private_message函数
5. routers/private.py - 新建文件包含3个私聊相关API接口
6. app.py - 导入并注册private路由
- 更改:
1. 在users表中添加webhook_url字段支持兼容性检查
2. 实现用户webhook URL的CRUD操作设置、获取、检查、批量检查
3. 扩展.register命令支持`.register url <url>`子命令
4. 封装私聊消息发送功能为独立函数
5. 创建私聊相关的RESTful API接口发送、单个检查、批量检查
6. 注册新的API路由到FastAPI应用
- 原因:
实现用户专属webhook URL注册和私聊消息发送功能为其他系统提供API接口调用
- 阻碍因素:
- 状态:成功
[2025-11-03_后续]
- 已修改:
1. utils/parser.py - 添加.talk和.私聊指令映射
2. routers/callback.py - 添加handle_talk_command函数实现私聊指令
- 更改:
1. 添加.talk <username> <content>指令,允许用户通过用户名发送私聊消息
2. 实现用户名和URL验证确保目标用户已注册名称和个人URL
3. 私聊消息发送成功时不向主URL发送提示消息保持私密性
- 原因:
实现用户可用的私聊功能,作为私聊功能的开始
- 阻碍因素:
- 状态:成功(测试通过)
# 最终审查
待审查阶段完成...

View File

@@ -0,0 +1,395 @@
# 背景
文件名2025-11-03_2_werewolf-game.md
创建于2025-11-03_12:20:10
创建者admin
主分支main
任务分支task/werewolf_2025-11-03_1
Yolo模式Off
# 任务描述
在WPS Bot Game项目中添加狼人杀游戏系统支持6-12人游戏包含身份分配、私聊功能、技能使用等核心功能。
## 核心需求
1. 支持6-12人狼人杀游戏配置2-4狼 1预言家 1女巫 2-6平民
2. 主持人开房:`.狼人杀 open`
3. 玩家加入:`.狼人杀 join`必须注册用户名和个人URL
4. 开始游戏:`.狼人杀 start`,自动分配身份并通过私聊发送
5. 私聊功能:`.狼人杀 <玩家代号> <消息>`
6. 狼人群聊:`.狼人杀 狼人 <消息>`
7. 技能系统:杀、验、救、毒
8. 游戏状态查询:`.狼人杀 status`
9. 结束游戏:`.狼人杀 end`
## 游戏规则
**人数配置**
- 6人2狼 1预言家 1女巫 2平民
- 8人2狼 1预言家 1女巫 4平民
- 10人3狼 1预言家 1女巫 5平民
- 12人4狼 1预言家 1女巫 6平民
**胜利条件**
- 狼人阵营:杀死所有神职和平民
- 好人阵营:消灭所有狼人
**技能**
- 狼人:每晚投票刀人
- 预言家:每晚查验一个玩家身份
- 女巫:拥有一瓶解药(仅可使用一次)和一瓶毒药(仅可使用一次)
- 平民:无特殊技能
# 项目概览
## 项目结构
```
WPSBotGame/
├── app.py # FastAPI主应用
├── config.py # 配置管理
├── core/
│ ├── database.py # SQLite数据库操作
│ ├── middleware.py # 中间件
│ └── models.py # 数据模型
├── routers/
│ ├── callback.py # Callback路由处理
│ ├── health.py # 健康检查
│ └── private.py # 私聊相关API
├── games/ # 游戏模块
│ ├── werewolf.py # 狼人杀游戏(新增)
│ └── ... # 其他游戏
└── utils/
├── parser.py # 指令解析
└── message.py # 消息发送
```
# 分析
## 当前状态
1. 已有私聊功能支持用户可注册个人webhook URL
2. 已有游戏架构BaseGame基类、数据库状态管理
3. 已有成语接龙等多人类游戏参考
4. 数据库支持game_states表存储游戏状态
## 关键技术点
1. **数据库层**:
- 使用game_states表存储游戏状态user_id=0表示群级别状态
- 通过state_data JSON字段存储玩家列表、身份、阶段等信息
2. **私聊系统**:
- 利用现有的send_private_message函数
- 身份信息、技能结果等通过私聊发送
- 支持发送者标识显示
3. **游戏状态管理**:
- 游戏状态存储在state_data中
- 包含:玩家列表、身份映射、狼人列表、技能使用记录等
4. **技能系统**:
- 狼人刀人:投票机制
- 预言家验人:私聊返回结果
- 女巫用药:限制使用次数
5. **指令路由**:
- 在parser.py注册.werewolf和.狼人杀指令
- 在callback.py中导入并注册游戏处理器
# 提议的解决方案
## 方案概述
1. **创建狼人杀游戏类**`games/werewolf.py`继承BaseGame
2. **状态数据结构**使用JSON存储在state_data中
3. **身份分配**:随机分配角色,狼人互相认识
4. **私聊通知**:游戏开始时通过私聊发送身份信息
5. **技能系统**:支持杀、验、救、毒四种技能
6. **指令注册**:添加解析和路由支持
## 设计决策
- 使用user_id=0存储群级别游戏状态参考成语接龙
- 通过私聊发送敏感信息(身份、技能结果)
- 简化实现:主持人手动推进阶段(暂不实现自动流转)
- 使用数字代号1-N标识玩家
- 支持狼人群聊功能
# 当前执行步骤:"实施完成"
# 详细实施计划
## 文件1: games/werewolf.py新建文件
### 主要方法
1. **handle()** - 主处理逻辑,指令路由
2. **get_help()** - 帮助信息
3. **_open_game()** - 主持人开房
4. **_join_game()** - 玩家加入
5. **_start_game()** - 开始游戏,分配身份
6. **_send_identities()** - 私聊发送身份信息
7. **_private_chat()** - 玩家私聊
8. **_wolf_group_chat()** - 狼人群聊
9. **_handle_skill()** - 技能处理
10. **_wolf_kill()** - 狼人刀人
11. **_seer_check()** - 预言家验人
12. **_witch_save()** - 女巫救人
13. **_witch_poison()** - 女巫毒人
14. **_show_status()** - 显示游戏状态
15. **_end_game()** - 结束游戏
### 数据结构设计
```python
state_data = {
'creator_id': int, # 主持人ID
'status': str, # 'open', 'playing', 'finished'
'players': [
{
'user_id': int,
'name': str, # 注册的用户名
'id': int, # 游戏内代号 1-N
'role': str, # 'wolf', 'seer', 'witch', 'civilian'
'alive': bool,
'id_label': str # "1号玩家"
}
],
'phase': str, # 当前阶段
'round': int, # 当前回合数
'wolves': [int], # 狼人ID列表
'kill_target': int, # 狼人票决目标
'seer_result': {}, # 预言家验人结果
'witch_save': bool, # 女巫是否救人
'witch_poison': int, # 女巫毒杀目标
'discussed': False, # 讨论阶段是否完成
'wolf_know_each_other': False
}
```
## 文件2: utils/parser.py
### 修改点:添加指令映射
在COMMAND_MAP中添加
```python
'.werewolf': 'werewolf',
'.狼人杀': 'werewolf',
```
## 文件3: routers/callback.py
### 修改点:添加路由处理
在handle_command函数中添加
```python
# 狼人杀系统
if game_type == 'werewolf':
from games.werewolf import WerewolfGame
game = WerewolfGame()
return await game.handle(command, chat_id, user_id)
```
## 文件4: games/base.py
### 修改点:添加帮助信息
在get_help_message()函数中添加狼人杀帮助说明
# 任务进度
[2025-11-03_12:20:10]
- 已修改:
1. games/werewolf.py - 新建狼人杀游戏类,实现所有核心功能
2. utils/parser.py - 添加.werewolf和.狼人杀指令映射
3. routers/callback.py - 添加狼人杀路由处理
4. games/base.py - 添加狼人杀帮助信息
- 更改:
1. 创建完整的狼人杀游戏系统
2. 支持开房、加入、开始、私聊、技能使用等所有核心功能
3. 实现6-12人游戏配置和角色分配
4. 集成私聊系统发送身份信息
5. 支持狼人群聊功能
6. 添加帮助信息和指令注册
- 原因:
实现完整的狼人杀游戏系统,支持多人游戏、身份隐藏、技能使用等核心功能
- 阻碍因素:
- 状态:成功
[2025-11-04_17:41:14]
- 已修改:
1. games/werewolf.py - 改进阶段提示和自动流转功能
- 更改:
1. 添加阶段名称映射系统phase_configs定义各阶段的中文名称、行动角色和指令说明
2. 实现阶段管理方法_get_phase_description()、_get_next_phase()、_advance_phase()
3. 改进游戏开始提示,明确显示"第一夜 - 狼人行动阶段"和操作指令
4. 实现自动阶段流转:狼人刀人后自动进入预言家阶段,预言家验人后自动进入女巫阶段
5. 新增女巫跳过功能:支持`.werewolf 跳过`指令,女巫可以选择不行动
6. 改进状态显示_show_status()现在显示中文阶段名称、当前行动角色和操作指令
7. 更新身份说明和帮助信息,添加女巫跳过选项说明
8. 各技能方法添加阶段验证,确保在正确的阶段使用技能
- 原因:
解决用户反馈的游戏阶段不明显的问题,让玩家清楚知道当前是什么阶段、谁应该行动、下一步是什么阶段
- 阻碍因素:
- 状态:成功
[2025-11-07_10:59:09]
- 已修改:
1. games/werewolf.py - 支持在私聊中使用狼人杀技能方案2实施
- 更改:
1. 新增 _find_player_game(user_id) 方法根据玩家ID查找其参与的游戏
2. 修改 _handle_skill() 方法,支持从私聊中使用技能指令
3. 修改 _wolf_group_chat() 方法,支持从私聊中发送狼人群聊
4. 修改 _private_chat() 方法,支持从私聊中发送玩家私聊
5. 修改 _witch_pass() 方法,支持从私聊中跳过女巫行动
6. 添加日志输出,显示在私聊中使用功能的情况
7. 实现逻辑先尝试用当前chat_id查找游戏群聊场景找不到则通过user_id查找玩家游戏私聊场景
- 原因:
解决私聊中无法使用技能的问题。游戏在群里创建,但玩家需要在私聊中使用技能以保密。
之前的设计只能在游戏所在群使用技能,现在支持在私聊中使用,查找玩家参与的游戏并操作。
- 阻碍因素:
- 状态:成功
[2025-11-07_11:06:58]
- 已修改:
1. games/werewolf.py - 改为全局唯一游戏模式不再按chat_id区分
- 更改:
1. 修改 _get_game_state() 方法查询全局唯一游戏而非根据chat_id查询
2. 新增 _get_game_chat_id() 方法获取全局游戏所在的chat_id
3. 简化 _find_player_game() 方法,使用全局游戏查询
4. 修改 _open_game() 方法,检查全局是否已有游戏(而非仅检查当前群)
5. 简化所有需要查找玩家游戏的方法_handle_skill、_wolf_group_chat、_private_chat、_witch_pass
6. 保持数据库兼容性chat_id列仍然存在并记录游戏创建的群但查询时忽略
7. 实现逻辑:所有查询都获取全局最新的一个狼人杀游戏,不再区分群组
- 原因:
Bot全局只需要一个狼人杀游戏不同群的玩家可以参与同一个游戏。
简化逻辑消除按chat_id区分的复杂性同时保持数据库结构兼容。
- 阻碍因素:
- 状态:成功
[2025-11-07_11:13:56]
- 已修改:
1. games/werewolf.py - 改进狼人投票机制
- 更改:
1. 在游戏状态数据中添加 wolf_votes 字段,记录每个狼人的投票
2. 修改 _wolf_kill() 方法,实现完整的投票流程:
- 记录每个狼人的投票(支持改票)
- 检查是否所有存活狼人都已投票
- 未全部投票时提示等待其他狼人
- 全部投票后统计票数
- 票数唯一时确定目标并推进阶段
- 平票时清除投票记录并要求重新投票
3. 优化投票提示信息,显示投票进度和结果
- 原因:
解决之前"只要一个狼人投票就立即刀人"的问题。
现在要求所有狼人都投票,统计票数最多的目标,平票则重新投票,符合狼人杀游戏规则。
- 阻碍因素:
- 状态:成功
[2025-11-07_11:22:53]
- 已修改:
1. games/werewolf.py - 狼人投票结果保密处理
- 更改:
1. 修改狼人投票反馈机制,不在群里播报投票目标
2. 单个狼人投票时,通过私聊确认投票,群消息只显示"投票已记录"
3. 投票平票时,通过私聊通知狼人具体平票目标,群消息只显示"平票"
4. 投票完成时,通过私聊通知所有狼人击杀目标,群消息只显示"投票完成"
5. 所有敏感信息(投票目标、平票详情、击杀决定)均通过私聊发送给狼人
- 原因:
符合狼人杀游戏规则,狼人刀人的决定应该保密,不能在群里公开播报。
只有狼人自己知道投票情况和击杀目标,其他玩家在天亮时才知道结果。
- 阻碍因素:
- 状态:成功
[2025-11-07_11:40:44]
- 已修改:
1. games/werewolf.py - 新增空刀机制
- 更改:
1. 允许狼人投票给0号表示空刀不杀人
2. 修改 _wolf_kill() 方法对target_id=0不验证目标存在性
3. 修改投票确认消息,区分空刀和正常投票
4. 修改票数统计逻辑投票0不计入击杀目标统计
5. 新增全部空刀处理如果所有狼人都空刀kill_target设为0平安夜
6. 修改 _advance_phase() 为async进入女巫阶段时私聊通知女巫刀人情况
7. 女巫有解药时可知道今晚是否有人被刀0号为平安夜
- 原因:
符合狼人杀游戏规则,狼人可以选择空刀(不杀人)。
女巫在有解药时需要知道今晚是否有人被刀以决定是否使用解药。
- 阻碍因素:
- 状态:成功
[2025-11-10_10:20:38]
- 已修改:
1. games/werewolf.py - 调整status指令的房间名单展示
- 更改:
1. 房间开放阶段现在显示已加入玩家的房内ID与用户名
2. 游戏进行阶段的玩家状态显示同时包含房内ID和用户名
- 原因:
提升`.werewolf status`指令提供的信息量,方便玩家识别房间成员
- 阻碍因素:
- 状态:未确认
[2025-11-10_10:27:07]
- 已修改:
1. games/werewolf.py - 统一狼人投票记录的键类型
- 更改:
1. `_wolf_kill()` 读取和保存 `wolf_votes` 时转换为字符串键
2. 统计投票进度时将键重新转换为整数,确保与 `alive_wolves` 对齐
- 原因:
修复狼人全部投票后仍提示有人未投票的问题
- 阻碍因素:
- 状态:未确认
[2025-11-10_10:36:39]
- 已修改:
1. games/werewolf.py - 修复狼人投票流程与阶段推进
- 更改:
1. `_wolf_kill()` 读取 `wolf_votes` 时统一转换为整数键,存储与统计均使用整数
2. `_wolf_kill()``_seer_check()``_witch_save()``_witch_poison()``_witch_pass()` 中的 `_advance_phase` 调用改为 `await`
- 原因:
修复狼人投票完成后仍判定未投票、平安夜提示错误及阶段推进信息显示 `<coroutine>` 的问题
- 阻碍因素:
- 状态:未确认
# 最终审查
待审查阶段完成...

3
app.py
View File

@@ -10,7 +10,7 @@ import asyncio
from config import APP_CONFIG, SESSION_TIMEOUT, SetWebhookURL, GetWebhookURL from config import APP_CONFIG, SESSION_TIMEOUT, SetWebhookURL, GetWebhookURL
from core.middleware import ConcurrencyLimitMiddleware from core.middleware import ConcurrencyLimitMiddleware
from core.database import get_db from core.database import get_db
from routers import callback, health from routers import callback, health, private
# 配置日志 # 配置日志
logging.basicConfig( logging.basicConfig(
@@ -74,6 +74,7 @@ app.add_middleware(ConcurrencyLimitMiddleware)
# 注册路由 # 注册路由
app.include_router(callback.router, prefix="/api", tags=["callback"]) app.include_router(callback.router, prefix="/api", tags=["callback"])
app.include_router(health.router, tags=["health"]) app.include_router(health.router, tags=["health"])
app.include_router(private.router, prefix="/api", tags=["private"])
@app.get("/") @app.get("/")

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
"""数据模型定义""" """数据模型定义"""
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typing import Optional, Dict, Any from typing import Optional, Dict, Any, List
class CallbackRequest(BaseModel): class CallbackRequest(BaseModel):
@@ -76,3 +76,20 @@ class QuizGameState(GameState):
attempts: int = Field(0, description="尝试次数") attempts: int = Field(0, description="尝试次数")
max_attempts: int = Field(3, description="最大尝试次数") max_attempts: int = Field(3, description="最大尝试次数")
class PrivateMessageRequest(BaseModel):
"""私聊消息请求模型"""
user_id: int = Field(..., description="目标用户ID")
content: str = Field(..., description="消息内容")
msg_type: str = Field(default="text", description="消息类型: text 或 markdown")
class CheckBatchRequest(BaseModel):
"""批量检查请求模型"""
user_ids: List[int] = Field(..., description="用户ID列表")
class CheckBatchResponse(BaseModel):
"""批量检查响应模型"""
results: Dict[int, bool] = Field(..., description="用户ID到是否有URL的映射")

View File

@@ -2,6 +2,7 @@
import random import random
import time import time
import logging import logging
import re
from datetime import datetime from datetime import datetime
from games.base import BaseGame from games.base import BaseGame
from utils.parser import CommandParser from utils.parser import CommandParser
@@ -22,19 +23,86 @@ class AdventureGame(BaseGame):
# 奖品池配置 # 奖品池配置
self.prize_pool: List[Tuple[int, float, str]] = [ self.prize_pool: List[Tuple[int, float, str]] = [
# (权重, 倍率, 描述) # (权重, 倍率, 描述)
(500, 0.5, "少量积分"), (300, 1, "少量积分"),
(350, 1, "中等积分"), (250, 2, "中等积分"),
(200, 2, "大量积分"), (200, 2, "大量积分"),
(100, 5, "丰厚积分"), (150, 5, "丰厚积分"),
(50, 10, "丰厚积分"), (100, 10, "丰厚积分"),
(10, 100, "🌟 巨额积分"), (50, 100, "🌟 巨额积分"),
(1, 1000, "💎 传说积分"), (10, 1000, "💎 传说积分"),
] ]
self.total_weight: int = 0 self.total_weight: int = 0
for weight,_,_ in self.prize_pool: for weight,_,_ in self.prize_pool:
self.total_weight += weight self.total_weight += weight
def _parse_time_string(self, time_str: str) -> int:
"""解析时间字符串,支持 h/m/s 格式
支持的格式示例:
- "1h30m10s" -> 5410秒
- "30m" -> 1800秒
- "10s" -> 10秒
- "1h30m" -> 5400秒
- "3600" -> 3600秒纯数字按秒处理
Args:
time_str: 时间字符串
Returns:
解析后的秒数如果解析失败返回None
"""
if not time_str:
return None
# 如果是纯数字,直接返回
if time_str.isdigit():
return int(time_str)
# 使用正则表达式匹配 h/m/s 格式,确保整个字符串匹配
pattern = r'^(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?$'
match = re.match(pattern, time_str.lower())
if not match:
return None
hours = int(match.group(1) or 0)
minutes = int(match.group(2) or 0)
seconds = int(match.group(3) or 0)
# 如果所有值都是0返回None
if hours == 0 and minutes == 0 and seconds == 0:
return None
total_seconds = hours * 3600 + minutes * 60 + seconds
return total_seconds
def _format_time(self, seconds: int) -> str:
"""将秒数格式化为 "X时X分X秒" 格式
Args:
seconds: 秒数
Returns:
格式化的时间字符串,如 "1时30分10秒""30分10秒""10秒"
"""
if seconds < 0:
seconds = 0
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
parts = []
if hours > 0:
parts.append(f"{hours}")
if minutes > 0:
parts.append(f"{minutes}")
if secs > 0 or not parts:
parts.append(f"{secs}")
return "".join(parts)
async def handle(self, command: str, chat_id: int, user_id: int) -> str: async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理冒险相关指令 """处理冒险相关指令
@@ -56,12 +124,20 @@ class AdventureGame(BaseGame):
if args in ['help', '帮助', 'info']: if args in ['help', '帮助', 'info']:
return self._get_adventure_help() return self._get_adventure_help()
# 默认冒险耗时1分钟 # 放弃当前冒险(按最低倍率结算已冒险时间)
if args in ['abandon', '放弃']:
return await self._abandon_adventure(chat_id, user_id)
# 默认冒险耗时1秒
else: else:
# 解析消耗时间 # 解析消耗时间
cost_time = 1 # 默认消耗1分钟 cost_time = 1 # 默认消耗1
if args.isdigit(): if args:
cost_time = int(args) parsed_time = self._parse_time_string(args)
if parsed_time is not None:
cost_time = parsed_time
else:
return f"❌ 时间格式错误!请使用以下格式:\n- 纯数字(秒):`.adventure 60`\n- 时分秒格式:`.adventure 1h30m10s`\n- 分钟秒格式:`.adventure 30m10s`\n- 只有秒:`.adventure 10s`"
return await self._perform_adventure(chat_id, user_id, cost_time) return await self._perform_adventure(chat_id, user_id, cost_time)
@@ -75,14 +151,14 @@ class AdventureGame(BaseGame):
Args: Args:
chat_id: 会话ID使用0作为用户级标识 chat_id: 会话ID使用0作为用户级标识
user_id: 用户ID user_id: 用户ID
cost_time: 消耗时间(分钟 cost_time: 消耗时间(
Returns: Returns:
抽奖结果消息 抽奖结果消息
""" """
# 参数验证 # 参数验证
if cost_time < 1: if cost_time < 1:
return "❌ 冒险时间至少需要1分钟" return "❌ 冒险时间至少需要1"
# 查询冒险状态使用chat_id=0表示用户级状态 # 查询冒险状态使用chat_id=0表示用户级状态
state = self.db.get_game_state(0, user_id, 'adventure') state = self.db.get_game_state(0, user_id, 'adventure')
@@ -94,7 +170,7 @@ class AdventureGame(BaseGame):
state_data = state['state_data'] state_data = state['state_data']
start_time = state_data.get('start_time', 0) start_time = state_data.get('start_time', 0)
saved_cost_time = state_data.get('cost_time', 1) saved_cost_time = state_data.get('cost_time', 1)
end_time = start_time + saved_cost_time * 60 end_time = start_time + saved_cost_time
remaining_seconds = end_time - current_time remaining_seconds = end_time - current_time
# 情况1.1:冒险已完成(时间已到或过期) # 情况1.1:冒险已完成(时间已到或过期)
@@ -116,8 +192,9 @@ class AdventureGame(BaseGame):
updated_points = self.db.get_user_points(user_id) updated_points = self.db.get_user_points(user_id)
# 格式化输出 # 格式化输出
time_str = self._format_time(saved_cost_time)
text = f"## ⚡️ 冒险结果\n\n" text = f"## ⚡️ 冒险结果\n\n"
text += f"**消耗时间**: {saved_cost_time} 分钟\n\n" text += f"**消耗时间**: {time_str}\n\n"
text += f"**{reward['description']}**: 获得 {reward_points} 积分\n\n" text += f"**{reward['description']}**: 获得 {reward_points} 积分\n\n"
text += f"**当前积分**: {updated_points['points']}\n\n" text += f"**当前积分**: {updated_points['points']}\n\n"
text += "---\n\n" text += "---\n\n"
@@ -126,17 +203,12 @@ class AdventureGame(BaseGame):
return text return text
# 情况1.2:冒险未完成,返回等待提示 # 情况1.2:冒险未完成,返回等待提示
remaining_minutes = remaining_seconds // 60 wait_msg = self._format_time(remaining_seconds)
remaining_secs = remaining_seconds % 60 saved_time_str = self._format_time(saved_cost_time)
if remaining_minutes > 0:
wait_msg = f"{remaining_minutes}{remaining_secs}"
else:
wait_msg = f"{remaining_secs}"
text = f"## ⚡️ 冒险进行中\n\n" text = f"## ⚡️ 冒险进行中\n\n"
text += f"你正在进行一次冒险,还需等待 **{wait_msg}** 才能完成。\n\n" text += f"你正在进行一次冒险,还需等待 **{wait_msg}** 才能完成。\n\n"
text += f"**当前冒险时长**: {saved_cost_time} 分钟\n\n" text += f"**当前冒险时长**: {saved_time_str}\n\n"
text += "---\n\n" text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,请耐心等待!" text += "💡 提示:冒险期间无法进行炼金,请耐心等待!"
@@ -158,18 +230,88 @@ class AdventureGame(BaseGame):
self.db.save_game_state(0, user_id, 'adventure', state_data) self.db.save_game_state(0, user_id, 'adventure', state_data)
# 计算预计完成时间 # 计算预计完成时间
end_time = current_time + cost_time * 60 end_time = current_time + cost_time
end_datetime = datetime.fromtimestamp(end_time) end_datetime = datetime.fromtimestamp(end_time)
end_time_str = end_datetime.strftime('%H:%M:%S') current_datetime = datetime.fromtimestamp(current_time)
# 判断是否跨天:如果完成日期和当前日期不同,或跨年,则显示完整日期时间
if (end_datetime.date() != current_datetime.date() or
end_datetime.year != current_datetime.year):
end_time_str = end_datetime.strftime('%Y-%m-%d %H:%M:%S')
else:
end_time_str = end_datetime.strftime('%H:%M:%S')
cost_time_str = self._format_time(cost_time)
text = f"## ⚡️ 冒险开始\n\n" text = f"## ⚡️ 冒险开始\n\n"
text += f"你已经开始了冒险之旅,本次冒险将持续 **{cost_time}** 分钟\n\n" text += f"你已经开始了冒险之旅,本次冒险将持续 **{cost_time_str}**。\n\n"
text += f"**预计完成时间**: {end_time_str}\n\n" text += f"**预计完成时间**: {end_time_str}\n\n"
text += "---\n\n" text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,完成后使用 `.adventure` 获取奖励!" text += "💡 提示:冒险期间无法进行炼金,完成后使用 `.adventure` 获取奖励!"
return text return text
async def _abandon_adventure(self, chat_id: int, user_id: int) -> str:
"""放弃当前冒险,按最低倍率结算已冒险时间
Args:
chat_id: 会话ID使用0作为用户级标识
user_id: 用户ID
Returns:
放弃结果消息
"""
try:
# 查询冒险状态
state = self.db.get_game_state(0, user_id, 'adventure')
if not state:
return "❌ 当前没有进行中的冒险,可使用 `.adventure` 开始新的冒险。"
state_data = state.get('state_data', {})
start_time = state_data.get('start_time')
cost_time = state_data.get('cost_time')
if start_time is None or cost_time is None:
# 状态异常,清理并提示
self.db.delete_game_state(0, user_id, 'adventure')
return "⚠️ 冒险状态异常已清理,请使用 `.adventure` 重新开始。"
current_time = int(time.time())
elapsed_seconds = max(0, current_time - int(start_time))
if elapsed_seconds < 1:
elapsed_seconds = 1
# 计算最低倍率
try:
min_multiplier = min(m for _, m, _ in self.prize_pool)
except Exception:
# 兜底若奖池异常按0.5处理
min_multiplier = 0.5
reward_points = int(min_multiplier * elapsed_seconds)
if reward_points < 0:
reward_points = 0
# 发放奖励并清理状态
if reward_points > 0:
self.db.add_points(user_id, reward_points, "adventure", "冒险放弃奖励")
self.db.delete_game_state(0, user_id, 'adventure')
# 查询当前积分
updated_points = self.db.get_user_points(user_id)
# 输出
elapsed_time_str = self._format_time(elapsed_seconds)
text = f"## ⚡️ 冒险放弃\n\n"
text += f"**已计入时间**: {elapsed_time_str}\n\n"
text += f"**最低倍率**: {min_multiplier}\n\n"
text += f"**获得积分**: {reward_points}\n\n"
text += f"**当前积分**: {updated_points['points']}\n\n"
text += "---\n\n"
text += "💡 提示:可随时使用 `.adventure` 再次踏上冒险之旅!"
return text
except Exception as e:
logger.error(f"放弃冒险时出错: {e}", exc_info=True)
# 失败时不影响原状态,返回提示
return f"❌ 放弃冒险失败:{str(e)}"
def _draw_prize(self, prize_pool: list) -> dict: def _draw_prize(self, prize_pool: list) -> dict:
"""从奖品池中抽取奖品 """从奖品池中抽取奖品
@@ -205,10 +347,15 @@ class AdventureGame(BaseGame):
""" """
text = f"## ⚡️ 冒险系统\n\n" text = f"## ⚡️ 冒险系统\n\n"
text += f"### 基础用法\n" text += f"### 基础用法\n"
text += f"- `.adventure` - 消耗1分钟进行冒险\n" text += f"- `.adventure` - 消耗1进行冒险\n"
text += f"- `.adventure time` - 消耗time分钟进行冒险, 最少一分钟\n" text += f"- `.adventure 60` - 消耗60秒进行冒险\n"
text += f"- `.adventure 1h30m10s` - 消耗1小时30分10秒进行冒险\n"
text += f"- `.adventure 30m` - 消耗30分钟进行冒险\n"
text += f"- `.adventure 10s` - 消耗10秒进行冒险\n\n"
text += f"**时间格式说明**:支持时分秒组合,如 `1h30m10s`、`30m`、`10s`,也支持纯数字(按秒计算)。\n\n"
text += f"### 其他功能\n" text += f"### 其他功能\n"
text += f"- `.adventure abandon` - 放弃当前冒险,按最低倍率结算已冒险时间\n"
text += f"- `.adventure 放弃` - 放弃当前冒险,按最低倍率结算已冒险时间\n"
text += f"- `.adventure help` - 查看帮助\n\n" text += f"- `.adventure help` - 查看帮助\n\n"
return text return text

View File

@@ -23,12 +23,12 @@ class AlchemyGame(BaseGame):
self.prize_pool: List[Tuple[int, str, float, str]] = [ self.prize_pool: List[Tuple[int, str, float, str]] = [
# (权重, 类型, 倍率, 描述) # (权重, 类型, 倍率, 描述)
(500, "penalty", 0, "炼金失败"), (500, "penalty", 0, "炼金失败"),
(100, "penalty", -1, "炼金爆炸"), (160, "penalty", -1, "炼金爆炸"),
(100, "points", 0.1, "少量积分"), (110, "points", 0.1, "少量积分"),
(390, "points", 0.5, "少量积分"), (390, "points", 0.5, "少量积分"),
(500, "points", 1, "等值积分"), (500, "points", 1, "等值积分"),
(390, "points", 2, "丰厚积分"), (390, "points", 2, "丰厚积分"),
(200, "points", 5, "丰厚积分"), (136, "points", 5, "丰厚积分"),
(9, "points", 10, "🌟 巨额积分"), (9, "points", 10, "🌟 巨额积分"),
(1, "points", 100, "💎 传说积分"), (1, "points", 100, "💎 传说积分"),
] ]
@@ -92,9 +92,9 @@ class AlchemyGame(BaseGame):
end_time = start_time + cost_time * 60 end_time = start_time + cost_time * 60
remaining_seconds = end_time - current_time remaining_seconds = end_time - current_time
# 如果冒险已完成,自动清理状态,允许炼金 # 如果冒险已完成,提示用户先回收奖励,不允许炼金
if remaining_seconds <= 0: if remaining_seconds <= 0:
self.db.delete_game_state(0, user_id, 'adventure') return f"❌ 你有待回收的冒险奖励!\n\n💡 请先使用 `.adventure` 回收冒险奖励后再进行炼金。"
else: else:
# 冒险未完成,返回错误提示 # 冒险未完成,返回错误提示
remaining_minutes = remaining_seconds // 60 remaining_minutes = remaining_seconds // 60

View File

@@ -100,6 +100,8 @@ def get_help_message() -> str:
- `.adventure` - 消耗1分钟进行冒险 - `.adventure` - 消耗1分钟进行冒险
- `.冒险` - 消耗1分钟进行冒险 - `.冒险` - 消耗1分钟进行冒险
- `.adventure 5` - 消耗5分钟进行冒险 - `.adventure 5` - 消耗5分钟进行冒险
- `.adventure abandon` - 放弃当前冒险,按最低倍率结算已冒险时间
- `.adventure 放弃` - 放弃当前冒险,按最低倍率结算已冒险时间
- `.adventure help` - 查看冒险帮助 - `.adventure help` - 查看冒险帮助
### 🎁 积分赠送系统 ### 🎁 积分赠送系统
@@ -111,6 +113,43 @@ def get_help_message() -> str:
- `.ai <问题>` - 向AI提问支持多用户对话等待10秒后回答 - `.ai <问题>` - 向AI提问支持多用户对话等待10秒后回答
- `.aiconfig host=xxx port=xxx model=xxx` - 配置Ollama服务地址和模型 - `.aiconfig host=xxx port=xxx model=xxx` - 配置Ollama服务地址和模型
### 🎰 赌场系统
**大小游戏**
- `.赌场 大小 open <最小> <最大> <赔率>` - 庄家开启大小游戏
- `.赌场 大小 bet <大/小> <金额>` - 下注
- `.赌场 大小 status` - 查看状态
- `.赌场 大小 settle` - 庄家结算(系统随机)
- `.赌场 大小 cancel` - 庄家放弃游戏(返还下注)
**轮盘游戏**
- `.赌场 轮盘 open <最小> <最大>` - 庄家开启轮盘游戏
- `.赌场 轮盘 bet <类型> <选项> <金额>` - 下注(数字/颜色/奇偶/大小/区间)
- `.赌场 轮盘 status` - 查看状态
- `.赌场 轮盘 settle` - 庄家结算系统随机0-36
- `.赌场 轮盘 cancel` - 庄家放弃游戏(返还下注)
**21点游戏**
- `.赌场 21点 open <最小> <最大> [黑杰克倍数]` - 庄家开启21点游戏
- `.赌场 21点 bet <金额>` - 下注
- `.赌场 21点 deal` - 庄家发牌
- `.赌场 21点 hit` - 玩家要牌
- `.赌场 21点 stand` - 玩家停牌
- `.赌场 21点 status` - 查看状态
- `.赌场 21点 settle` - 庄家结算
- `.赌场 21点 cancel` - 庄家放弃游戏(返还下注)
### 🐺 狼人杀
- `.狼人杀 open` - 主持人创建房间
- `.狼人杀 join` - 加入游戏
- `.狼人杀 start` - 主持人开始游戏
- `.狼人杀 <id> <消息>` - 私聊指定玩家
- `.狼人杀 狼人 <消息>` - 狼人群聊
- `.狼人杀 杀 <id>` - 狼人投票杀人
- `.狼人杀 验 <id>` - 预言家验人
- `.狼人杀 救 <id>` - 女巫救人
- `.狼人杀 毒 <id>` - 女巫毒人
- `.狼人杀 status` - 查看状态
### 其他 ### 其他
- `.help` - 显示帮助 - `.help` - 显示帮助
- `.stats` - 查看个人统计 - `.stats` - 查看个人统计

1473
games/casino.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -82,20 +82,15 @@ class GiftGame(BaseGame):
receiver_id = user['user_id'] receiver_id = user['user_id']
# 获取接收者名称用于显示 # 获取接收者名称用于显示
receiver_user = self.db.get_or_create_user(receiver_id) receiver_name = self.db.get_user_display_name(receiver_id)
receiver_name = receiver_user.get('username', f"用户{receiver_id}")
# 获取发送者名称用于显示 # 获取发送者名称用于显示
sender_user = self.db.get_or_create_user(sender_id) sender_name = self.db.get_user_display_name(sender_id)
sender_name = sender_user.get('username', f"用户{sender_id}")
# 验证参数 # 验证参数
if points <= 0: if points <= 0:
return "❌ 赠送积分数量必须大于0" return "❌ 赠送积分数量必须大于0"
if points > 1000:
return "❌ 单次赠送积分不能超过1000"
if sender_id == receiver_id: if sender_id == receiver_id:
return "❌ 不能赠送积分给自己!" return "❌ 不能赠送积分给自己!"
@@ -184,7 +179,7 @@ class GiftGame(BaseGame):
for record in records: for record in records:
timestamp = datetime.fromtimestamp(record['created_at']).strftime('%m-%d %H:%M') timestamp = datetime.fromtimestamp(record['created_at']).strftime('%m-%d %H:%M')
receiver_name = record.get('receiver_name', f"用户{record['receiver_id']}") receiver_name = self.db.get_user_display_name(record['receiver_id'])
points = record['points'] points = record['points']
message = record.get('message', '') message = record.get('message', '')
@@ -214,7 +209,7 @@ class GiftGame(BaseGame):
for record in records: for record in records:
timestamp = datetime.fromtimestamp(record['created_at']).strftime('%m-%d %H:%M') timestamp = datetime.fromtimestamp(record['created_at']).strftime('%m-%d %H:%M')
sender_name = record.get('sender_name', f"用户{record['sender_id']}") sender_name = self.db.get_user_display_name(record['sender_id'])
points = record['points'] points = record['points']
message = record.get('message', '') message = record.get('message', '')

View File

@@ -379,10 +379,12 @@ class IdiomGame(BaseGame):
text += f"**当前链长**{state_data['chain_length']}\n\n" text += f"**当前链长**{state_data['chain_length']}\n\n"
user_count = state_data['participants'][str(user_id)] user_count = state_data['participants'][str(user_id)]
text += f"@用户{user_id} 成功次数:{user_count}\n\n" user_display_name = self.db.get_user_display_name(user_id)
text += f"@{user_display_name} 成功次数:{user_count}\n\n"
if mentioned_user_id: if mentioned_user_id:
text += f"已指定 @用户{mentioned_user_id} 接龙\n\n" mentioned_display_name = self.db.get_user_display_name(mentioned_user_id)
text += f"已指定 @{mentioned_display_name} 接龙\n\n"
else: else:
text += "任何人都可以接龙\n\n" text += "任何人都可以接龙\n\n"
@@ -416,7 +418,8 @@ class IdiomGame(BaseGame):
state_data['next_user_id'] = next_user_id state_data['next_user_id'] = next_user_id
self.db.save_game_state(chat_id, 0, 'idiom', state_data) self.db.save_game_state(chat_id, 0, 'idiom', state_data)
return f"✅ 已指定 @用户{next_user_id} 接龙" next_user_display_name = self.db.get_user_display_name(next_user_id)
return f"✅ 已指定 @{next_user_display_name} 接龙"
def _reject_idiom(self, chat_id: int, user_id: int, idiom: str) -> str: def _reject_idiom(self, chat_id: int, user_id: int, idiom: str) -> str:
"""裁判拒绝词语 """裁判拒绝词语
@@ -509,7 +512,8 @@ class IdiomGame(BaseGame):
# 下一位 # 下一位
if state_data.get('next_user_id'): if state_data.get('next_user_id'):
text += f"**下一位**@用户{state_data['next_user_id']}\n\n" next_user_display_name = self.db.get_user_display_name(state_data['next_user_id'])
text += f"**下一位**@{next_user_display_name}\n\n"
else: else:
text += f"**下一位**:任何人都可以接龙\n\n" text += f"**下一位**:任何人都可以接龙\n\n"
@@ -522,7 +526,8 @@ class IdiomGame(BaseGame):
reverse=True reverse=True
) )
for idx, (uid, count) in enumerate(sorted_participants[:5], 1): for idx, (uid, count) in enumerate(sorted_participants[:5], 1):
text += f"{idx}. @用户{uid} - {count}\n" user_display_name = self.db.get_user_display_name(int(uid))
text += f"{idx}. @{user_display_name} - {count}\n"
text += "\n" text += "\n"
# 最近成语 # 最近成语
@@ -588,7 +593,8 @@ class IdiomGame(BaseGame):
reverse=True reverse=True
) )
for idx, (uid, count) in enumerate(sorted_participants, 1): for idx, (uid, count) in enumerate(sorted_participants, 1):
text += f"{idx}. @用户{uid} - {count}\n" user_display_name = self.db.get_user_display_name(int(uid))
text += f"{idx}. @{user_display_name} - {count}\n"
# 更新统计 # 更新统计
try: try:
for _ in range(count): for _ in range(count):

View File

@@ -126,7 +126,7 @@ class PointsGame(BaseGame):
for i, user in enumerate(leaderboard): for i, user in enumerate(leaderboard):
rank = i + 1 rank = i + 1
medal = medals[i] if i < len(medals) else "🏅" medal = medals[i] if i < len(medals) else "🏅"
username = user.get('username', f"用户{user['user_id']}") username = self.db.get_user_display_name(user['user_id'])
points = user.get('points', 0) points = user.get('points', 0)
text += f"{medal} **第 {rank} 名** {username}\n" text += f"{medal} **第 {rank} 名** {username}\n"

1261
games/werewolf.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,6 @@
"""Callback路由处理""" """Callback路由处理"""
import logging import logging
import re
from fastapi import APIRouter, Request from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
@@ -48,6 +49,15 @@ async def callback_receive(request: Request):
game_type, command = parse_result game_type, command = parse_result
logger.info(f"识别指令: game_type={game_type}, command={command}") logger.info(f"识别指令: game_type={game_type}, command={command}")
# 检查是否包含 @s 参数(私聊标志)
use_private_url = False
# 使用正则表达式匹配独立的 @s 参数(前后有空格或字符串边界)
if re.search(r'\s+@s\s+|\s+@s$|^@s\s+|^@s$', command):
use_private_url = True
# 从命令中移除 @s 参数,保持其他参数不变
command = re.sub(r'\s+@s(\s+|$)|^@s\s+', ' ', command).strip()
logger.info(f"检测到 @s 参数将优先使用个人URL发送反馈清理后的命令: {command}")
# 检查限流 # 检查限流
rate_limiter = get_rate_limiter() rate_limiter = get_rate_limiter()
if not rate_limiter.is_allowed(): if not rate_limiter.is_allowed():
@@ -75,15 +85,75 @@ async def callback_receive(request: Request):
# 发送回复 # 发送回复
if response_text: if response_text:
sender = get_message_sender() # 如果使用了 @s 参数优先发送到个人URL
if use_private_url:
# 根据内容选择消息类型 db = get_db()
if response_text.startswith('#'): user_webhook_url = db.get_user_webhook_url(callback_data.creator)
# Markdown格式
await sender.send_markdown(response_text) if user_webhook_url:
# 有个人URL发送到个人URL
from utils.message import send_private_message
# 判断消息类型
if game_type == 'ai_chat':
msg_type = 'markdown'
elif response_text.startswith('#'):
msg_type = 'markdown'
else:
msg_type = 'text'
success = await send_private_message(
user_id=callback_data.creator,
content=response_text,
msg_type=msg_type
)
if not success:
# 如果私聊发送失败回退到主URL
logger.warning(f"个人URL发送失败回退到主URL: user_id={callback_data.creator}")
sender = get_message_sender()
if game_type == 'ai_chat':
try:
await sender.send_markdown(response_text)
except Exception as send_md_err:
logger.error(f"发送Markdown消息失败改用文本发送: {send_md_err}")
await sender.send_text(response_text)
else:
if response_text.startswith('#'):
await sender.send_markdown(response_text)
else:
await sender.send_text(response_text)
# 成功发送到个人URL不向主URL发送
else:
# 没有个人URL回退到主URL
logger.info(f"用户 {callback_data.creator} 没有注册个人URL使用主URL发送")
sender = get_message_sender()
if game_type == 'ai_chat':
try:
await sender.send_markdown(response_text)
except Exception as send_md_err:
logger.error(f"发送Markdown消息失败改用文本发送: {send_md_err}")
await sender.send_text(response_text)
else:
if response_text.startswith('#'):
await sender.send_markdown(response_text)
else:
await sender.send_text(response_text)
else: else:
# 普通文本 # 没有 @s 参数正常发送到主URL
await sender.send_text(response_text) sender = get_message_sender()
# AI 对话:统一按 Markdown 发送(按任务决策)
if game_type == 'ai_chat':
try:
await sender.send_markdown(response_text)
except Exception as send_md_err:
logger.error(f"发送Markdown消息失败改用文本发送: {send_md_err}")
await sender.send_text(response_text)
else:
# 其他模块保持原有启发式:以 # 开头视为 Markdown否则文本
if response_text.startswith('#'):
await sender.send_markdown(response_text)
else:
await sender.send_text(response_text)
return JSONResponse({"result": "ok"}) return JSONResponse({"result": "ok"})
@@ -187,12 +257,37 @@ async def handle_command(game_type: str, command: str,
game = GiftGame() game = GiftGame()
return await game.handle(command, chat_id, user_id) return await game.handle(command, chat_id, user_id)
# 复述功能
if game_type == 'say':
# 提取参数并原样返回
_, args = CommandParser.extract_command_args(command)
args = args.strip()
if not args:
return "用法:.say 你想让我说的话\n别名:.说 / .复述"
return args
# 私聊功能
if game_type == 'talk':
return await handle_talk_command(command, chat_id, user_id)
# AI对话系统 # AI对话系统
if game_type == 'ai_chat': if game_type == 'ai_chat':
from games.ai_chat import AIChatGame from games.ai_chat import AIChatGame
game = AIChatGame() game = AIChatGame()
return await game.handle(command, chat_id, user_id) return await game.handle(command, chat_id, user_id)
# 赌场系统
if game_type == 'casino':
from games.casino import CasinoGame
game = CasinoGame()
return await game.handle(command, chat_id, user_id)
# 狼人杀系统
if game_type == 'werewolf':
from games.werewolf import WerewolfGame
game = WerewolfGame()
return await game.handle(command, chat_id, user_id)
# 未知游戏类型 # 未知游戏类型
logger.warning(f"未知游戏类型: {game_type}") logger.warning(f"未知游戏类型: {game_type}")
return "❌ 未知的游戏类型" return "❌ 未知的游戏类型"
@@ -206,7 +301,7 @@ async def handle_register_command(command: str, chat_id: int, user_id: int) -> s
"""处理注册命令 """处理注册命令
Args: Args:
command: 完整指令 ".register name" command: 完整指令 ".register name"".register url <url>"
chat_id: 会话ID chat_id: 会话ID
user_id: 用户ID user_id: 用户ID
@@ -220,21 +315,114 @@ async def handle_register_command(command: str, chat_id: int, user_id: int) -> s
# 验证参数 # 验证参数
if not args: if not args:
return "❌ 请提供要注册的名称\n\n正确格式:`.register <名称>`\n\n示例:\n`.register 张三`\n`.register 小明`" return "❌ 请提供要注册的内容\n\n正确格式:\n`.register <名称>` - 注册用户名\n`.register url <URL>` - 注册webhook URL\n\n示例:\n`.register 张三`\n`.register url https://example.com/webhook?key=xxx`"
if len(args) > 20: # 检查是否为url子命令
return "❌ 名称过长最多支持20个字符。" parts = args.split(maxsplit=1)
if parts and parts[0].lower() == 'url':
# 更新用户名称 # 处理URL注册
db = get_db() if len(parts) < 2:
success = db.update_user_name(user_id, args) return "❌ 请提供webhook URL\n\n正确格式:`.register url <URL>`\n\n示例:\n`.register url https://example.com/webhook?key=xxx`"
if success: webhook_url = parts[1].strip()
return f"✅ 注册成功!\n\n**您的名称**{args}\n\n之后您可以使用这个名称参与各种游戏和功能。"
# URL验证
if not webhook_url.startswith(('http://', 'https://')):
return "❌ URL格式无效必须以 http:// 或 https:// 开头。"
# 设置URL
db = get_db()
success = db.set_user_webhook_url(user_id, webhook_url)
if success:
return f"✅ Webhook URL注册成功\n\n**您的个人URL**{webhook_url}\n\n私聊消息将发送到此URL。"
else:
return "❌ 注册失败!请稍后重试。"
else: else:
return "❌ 注册失败!请稍后重试。" # 原有的名称注册逻辑
if len(args) > 20:
return "❌ 名称过长最多支持20个字符。"
# 更新用户名称
db = get_db()
success = db.update_user_name(user_id, args)
if success:
return f"✅ 注册成功!\n\n**您的名称**{args}\n\n之后您可以使用这个名称参与各种游戏和功能。"
else:
return "❌ 注册失败!请稍后重试。"
except Exception as e: except Exception as e:
logger.error(f"处理注册指令错误: {e}", exc_info=True) logger.error(f"处理注册指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}" return f"❌ 处理指令出错: {str(e)}"
async def handle_talk_command(command: str, chat_id: int, user_id: int) -> str:
"""处理私聊命令
Args:
command: 完整指令 ".talk <username> <content>"
chat_id: 会话ID
user_id: 发送者用户ID
Returns:
处理结果消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip()
# 验证参数
if not args:
return "❌ 请提供用户名和消息内容!\n\n正确格式:`.talk <用户名> <消息内容>`\n\n示例:\n`.talk 张三 你好,想和你聊聊`\n`.talk 李四 这是一条私聊消息`"
# 解析username和content第一个单词是username剩余部分是content
parts = args.split(maxsplit=1)
if len(parts) < 2:
return "❌ 请提供用户名和消息内容!\n\n正确格式:`.talk <用户名> <消息内容>`\n\n示例:\n`.talk 张三 你好,想和你聊聊`"
target_username = parts[0].strip()
content = parts[1].strip()
if not target_username:
return "❌ 用户名不能为空!\n\n正确格式:`.talk <用户名> <消息内容>`"
if not content:
return "❌ 消息内容不能为空!\n\n正确格式:`.talk <用户名> <消息内容>`"
# 通过用户名查找目标用户
db = get_db()
target_user = db.get_user_by_name(target_username)
if not target_user:
return f"❌ 找不到用户名为「{target_username}」的用户!\n\n提示:目标用户需要使用 `.register <名称>` 注册用户名。"
target_user_id = target_user['user_id']
# 检查目标用户是否有注册名称(应该有,因为是通过名称找到的)
if not target_user.get('username'):
return f"❌ 用户「{target_username}」尚未注册用户名!"
# 检查目标用户是否有个人webhook URL
if not db.has_webhook_url(target_user_id):
return f"❌ 用户「{target_username}」尚未注册个人webhook URL\n\n提示:目标用户需要使用 `.register url <URL>` 注册个人URL后才能接收私聊消息。"
# 发送私聊消息
from utils.message import send_private_message
success = await send_private_message(
user_id=target_user_id,
content=content,
msg_type='text'
)
if success:
# 私聊消息发送成功不向主URL发送提示消息
return ""
else:
# 发送失败时仍然需要提示用户
return f"❌ 发送私聊消息失败,请稍后重试。"
except Exception as e:
logger.error(f"处理私聊指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"

111
routers/private.py Normal file
View File

@@ -0,0 +1,111 @@
"""私聊相关API路由"""
import logging
from typing import List, Dict
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
from core.database import get_db
from core.models import PrivateMessageRequest, CheckBatchRequest, CheckBatchResponse
from utils.message import send_private_message
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/private/send")
async def send_private(request: PrivateMessageRequest):
"""发送私聊消息
请求体:
{
"user_id": 123456,
"content": "消息内容",
"msg_type": "text" // 可选,默认为"text"
}
"""
try:
# 验证msg_type
if request.msg_type not in ['text', 'markdown']:
raise HTTPException(
status_code=400,
detail="msg_type必须是'text''markdown'"
)
# 调用send_private_message
success = await send_private_message(
user_id=request.user_id,
content=request.content,
msg_type=request.msg_type
)
if not success:
# 检查用户是否有个人URL
db = get_db()
has_url = db.has_webhook_url(request.user_id)
if not has_url:
raise HTTPException(
status_code=400,
detail=f"用户 {request.user_id} 没有注册个人webhook URL"
)
else:
raise HTTPException(
status_code=500,
detail="消息发送失败,请稍后重试"
)
return JSONResponse({
"success": True,
"message": "消息发送成功"
})
except HTTPException:
raise
except Exception as e:
logger.error(f"发送私聊消息API错误: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"服务器内部错误: {str(e)}"
)
@router.get("/private/check/{user_id}")
async def check_user_webhook(user_id: int):
"""检查用户是否有个人webhook URL"""
try:
db = get_db()
has_webhook_url = db.has_webhook_url(user_id)
return JSONResponse({
"user_id": user_id,
"has_webhook_url": has_webhook_url
})
except Exception as e:
logger.error(f"检查用户webhook URL错误: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"服务器内部错误: {str(e)}"
)
@router.post("/private/check-batch")
async def check_users_webhook_batch(request: CheckBatchRequest):
"""批量检查用户是否有个人webhook URL
请求体:
{
"user_ids": [123456, 789012, ...]
}
"""
try:
db = get_db()
results = db.check_users_webhook_urls(request.user_ids)
return CheckBatchResponse(results=results)
except Exception as e:
logger.error(f"批量检查用户webhook URL错误: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"服务器内部错误: {str(e)}"
)

44
start_background.sh Executable file
View File

@@ -0,0 +1,44 @@
#!/bin/bash
# LiuBai网站后台启动脚本
PID_FILE="./liubai_web.pid"
LOG_FILE="./liubai_web.log"
# 检查是否已经在运行
if [ -f "$PID_FILE" ]; then
PID=$(cat "$PID_FILE")
if kill -0 "$PID" 2>/dev/null; then
echo "服务已经在运行 (PID: $PID)"
exit 1
else
echo "删除旧的PID文件..."
rm -f "$PID_FILE"
fi
fi
echo "正在启动LiuBaiBlog网站服务器..."
# 使用nohup在后台运行
nohup venv/bin/python3 -m jurigged -v app.py > "$LOG_FILE" 2>&1 &
PID=$!
# 保存PID
echo $PID > "$PID_FILE"
echo "服务器已在后台启动!"
echo "PID: $PID"
echo "日志文件: $LOG_FILE"
echo "要停止服务,请运行: ./stop_background.sh"
# 等待一下确保服务器启动
sleep 2
# 检查进程是否还在运行
if kill -0 "$PID" 2>/dev/null; then
echo "服务器启动成功!"
else
echo "服务器启动失败,请检查日志文件: $LOG_FILE"
rm -f "$PID_FILE"
exit 1
fi

38
stop_background.sh Executable file
View File

@@ -0,0 +1,38 @@
#!/bin/bash
# LiuBai网站后台停止脚本
PID_FILE="./liubai_web.pid"
if [ ! -f "$PID_FILE" ]; then
echo "服务未运行或PID文件不存在"
exit 1
fi
PID=$(cat "$PID_FILE")
echo "正在停止服务器 (PID: $PID)..."
# 尝试优雅地停止进程
if kill -TERM "$PID" 2>/dev/null; then
echo "发送停止信号..."
# 等待进程结束
for i in {1..10}; do
if ! kill -0 "$PID" 2>/dev/null; then
echo "服务器已停止"
rm -f "$PID_FILE"
exit 0
fi
sleep 1
done
# 如果进程还在运行,强制杀死
echo "强制停止进程..."
kill -KILL "$PID" 2>/dev/null
rm -f "$PID_FILE"
echo "服务器已强制停止"
else
echo "进程不存在或已停止"
rm -f "$PID_FILE"
fi

View File

@@ -135,3 +135,40 @@ def get_message_sender() -> MessageSender:
_sender_instance.webhook_url = GetWebhookURL() _sender_instance.webhook_url = GetWebhookURL()
return _sender_instance return _sender_instance
async def send_private_message(user_id: int, content: str, msg_type: str = 'text') -> bool:
"""发送私聊消息到用户个人webhook URL
Args:
user_id: 目标用户ID
content: 消息内容
msg_type: 消息类型 ('text''markdown')
Returns:
是否发送成功如果用户没有个人URL则返回False
"""
from core.database import get_db
# 从数据库获取用户webhook URL
db = get_db()
webhook_url = db.get_user_webhook_url(user_id)
if not webhook_url:
logger.warning(f"用户 {user_id} 没有注册个人webhook URL无法发送私聊消息")
return False
# 创建MessageSender实例使用用户的个人URL
sender = MessageSender(webhook_url=webhook_url)
try:
# 根据msg_type调用相应方法
if msg_type == 'markdown':
return await sender.send_markdown(content)
else:
return await sender.send_text(content)
except Exception as e:
logger.error(f"发送私聊消息失败: user_id={user_id}, error={e}", exc_info=True)
return False
finally:
# 关闭HTTP客户端
await sender.close()

View File

@@ -68,6 +68,15 @@ class CommandParser:
'.ai': 'ai_chat', '.ai': 'ai_chat',
'.aiconfig': 'ai_chat', '.aiconfig': 'ai_chat',
# 复述
'.say': 'say',
'.说': 'say',
'.复述': 'say',
# 私聊
'.talk': 'talk',
'.私聊': 'talk',
# 帮助 # 帮助
'.help': 'help', '.help': 'help',
'.帮助': 'help', '.帮助': 'help',
@@ -75,6 +84,14 @@ class CommandParser:
# 统计 # 统计
'.stats': 'stats', '.stats': 'stats',
'.统计': 'stats', '.统计': 'stats',
# 赌场系统
'.赌场': 'casino',
'.casino': 'casino',
# 狼人杀系统
'.werewolf': 'werewolf',
'.狼人杀': 'werewolf',
} }
# 机器人名称模式(用于从@消息中提取) # 机器人名称模式(用于从@消息中提取)
@@ -98,11 +115,20 @@ class CommandParser:
if at_match: if at_match:
content = at_match.group(1).strip() content = at_match.group(1).strip()
# 检查是否以指令开头 # 拦截全角空格与全角标点(不允许)
for cmd_prefix, game_type in cls.COMMAND_MAP.items(): # 范围包含:全角空格\u3000、全角标点\uFF01-\uFF60、兼容区\uFFE0-\uFFEE
if content == cmd_prefix: # if re.search(r"[\u3000\uFF01-\uFF60\uFFE0-\uFFEE]", content):
# 返回游戏类型和完整指令 # logger.debug(f"包含全角字符,忽略: {content}")
return game_type, content # return None
# 大小写不敏感匹配(仅用于匹配,不改变返回的原始内容)
content_lower = content.lower()
# 使用最长前缀优先,避免 .r 误匹配 .roll 等更长前缀
command_keys_sorted = sorted(cls.COMMAND_MAP.keys(), key=len, reverse=True)
for cmd_prefix in command_keys_sorted:
if content_lower.startswith(cmd_prefix.lower()):
return cls.COMMAND_MAP[cmd_prefix], content
# 特殊处理:.ai 和 .aiconfig 指令支持参数 # 特殊处理:.ai 和 .aiconfig 指令支持参数
if content.startswith('.ai '): if content.startswith('.ai '):