Files
NewWPSBot/Plugins/WPSCombatSystem/combat_plugin_battle.py
2025-11-10 14:59:07 +08:00

252 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""PVP对战插件 - 回合制战斗"""
from __future__ import annotations
from typing import Optional
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from .combat_plugin_base import WPSCombatBase
logger: ProjectConfig = ProjectConfig()
class WPSCombatBattle(WPSCombatBase):
"""PVP对战插件"""
def is_enable_plugin(self) -> bool:
return True
def wake_up(self) -> None:
super().wake_up()
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSCombatBattle 插件已加载{ConsoleFrontColor.RESET}"
)
self.register_plugin("挑战")
self.register_plugin("接受挑战")
self.register_plugin("拒绝挑战")
self.register_plugin("战斗")
self.register_plugin("battle")
self.register_plugin("投降")
# 启动超时检查(定期轮询)
# TODO: 使用时钟调度器定期检查超时
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
"""
处理PVP命令
命令格式:
- 挑战 <目标用户ID>
- 接受挑战 <挑战ID>
- 拒绝挑战 <挑战ID>
- 战斗 <战斗ID> <技能名>
- 投降 <战斗ID>
"""
message = self.parse_message_after_at(message).strip()
tokens = message.split()
if not tokens:
return await self.send_markdown_message(
self._help_message(),
chat_id,
user_id
)
command = tokens[0].lower()
if command in ["挑战", "challenge"]:
return await self._handle_challenge(chat_id, user_id, tokens[1:])
elif command in ["接受挑战", "accept"]:
return await self._handle_accept(chat_id, user_id, tokens[1:])
elif command in ["拒绝挑战", "reject"]:
return await self._handle_reject(chat_id, user_id, tokens[1:])
elif command in ["投降", "surrender"]:
return await self._handle_surrender(chat_id, user_id, tokens[1:])
else:
# 默认视为战斗动作
return await self._handle_battle_action(chat_id, user_id, tokens)
async def _handle_challenge(
self,
chat_id: int,
user_id: int,
args: list
) -> Optional[str]:
"""处理挑战命令"""
if not args:
return await self.send_markdown_message(
"❌ 请指定目标用户ID\n用法:`挑战 <目标用户ID>`",
chat_id,
user_id
)
try:
target_id = int(args[0])
except ValueError:
return await self.send_markdown_message(
"❌ 用户ID格式错误",
chat_id,
user_id
)
if target_id == user_id:
return await self.send_markdown_message(
"❌ 不能挑战自己",
chat_id,
user_id
)
service = self.service()
success, msg, challenge_id = service.create_pvp_challenge(user_id, target_id)
return await self.send_markdown_message(msg, chat_id, user_id)
async def _handle_accept(
self,
chat_id: int,
user_id: int,
args: list
) -> Optional[str]:
"""处理接受挑战"""
if not args:
return await self.send_markdown_message(
"❌ 请指定挑战ID\n用法:`接受挑战 <挑战ID>`",
chat_id,
user_id
)
try:
challenge_id = int(args[0])
except ValueError:
return await self.send_markdown_message(
"❌ 挑战ID格式错误",
chat_id,
user_id
)
service = self.service()
success, msg, battle_id = service.accept_challenge(challenge_id, user_id)
return await self.send_markdown_message(msg, chat_id, user_id)
async def _handle_reject(
self,
chat_id: int,
user_id: int,
args: list
) -> Optional[str]:
"""处理拒绝挑战"""
if not args:
return await self.send_markdown_message(
"❌ 请指定挑战ID\n用法:`拒绝挑战 <挑战ID>`",
chat_id,
user_id
)
try:
challenge_id = int(args[0])
except ValueError:
return await self.send_markdown_message(
"❌ 挑战ID格式错误",
chat_id,
user_id
)
service = self.service()
success, msg = service.reject_challenge(challenge_id, user_id)
return await self.send_markdown_message(msg, chat_id, user_id)
async def _handle_battle_action(
self,
chat_id: int,
user_id: int,
tokens: list
) -> Optional[str]:
"""处理战斗动作"""
if len(tokens) < 2:
return await self.send_markdown_message(
"❌ 命令格式错误\n用法:`战斗 <战斗ID> <技能名>`",
chat_id,
user_id
)
try:
battle_id = int(tokens[0])
except ValueError:
return await self.send_markdown_message(
"❌ 战斗ID格式错误",
chat_id,
user_id
)
skill_name = " ".join(tokens[1:])
service = self.service()
success, msg = service.execute_battle_action(battle_id, user_id, skill_name)
return await self.send_markdown_message(msg, chat_id, user_id)
async def _handle_surrender(
self,
chat_id: int,
user_id: int,
args: list
) -> Optional[str]:
"""处理投降"""
if not args:
return await self.send_markdown_message(
"❌ 请指定战斗ID\n用法:`投降 <战斗ID>`",
chat_id,
user_id
)
try:
battle_id = int(args[0])
except ValueError:
return await self.send_markdown_message(
"❌ 战斗ID格式错误",
chat_id,
user_id
)
service = self.service()
success, msg = service.surrender_battle(battle_id, user_id)
return await self.send_markdown_message(msg, chat_id, user_id)
def _help_message(self) -> str:
"""帮助信息"""
return """# ⚔️ PVP对战系统
**命令格式:**
- `挑战 <目标用户ID>`发起PVP挑战
- `接受挑战 <挑战ID>`:接受挑战
- `拒绝挑战 <挑战ID>`:拒绝挑战
- `战斗 <战斗ID> <技能名>`:执行战斗动作
- `投降 <战斗ID>`:投降
**说明:**
- 挑战有效期15分钟超时自动失效
- 回合制战斗,速度高者先手
- 胜者获得1000积分或失败者全部积分
- 超时未操作视为失败
- 可随时投降
**示例:**
- `挑战 12345`向用户12345发起挑战
- `接受挑战 1`接受挑战ID为1的挑战
- `战斗 1 攻击`在战斗1中使用"攻击"技能
- `投降 1`在战斗1中投降
"""
__all__ = ["WPSCombatBattle"]