Files
NewWPSBot/Plugins/WPSGardenSystem/garden_plugin_steal.py

160 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Steal plugin for garden system."""
from __future__ import annotations
from PWF.Convention.Runtime.Architecture import *
from PWF.CoreModules.database import get_db
from Plugins.WPSAPI import GuideEntry
from Plugins.WPSBackpackSystem import WPSBackpackSystem
from Plugins.WPSConfigSystem import WPSConfigAPI
from .garden_plugin_base import WPSGardenBase
class WPSGardenSteal(WPSGardenBase):
@override
def get_webhook_url(self, message: str, chat_id: int, user_id: int) -> str:
return self.get_main_webhook_url()
def get_guide_subtitle(self) -> str:
return "偷取其他用户成熟果实的互动指令"
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
GuideEntry(
title="偷取",
identifier="偷取 <用户> <地块序号>",
description="从其他用户的成熟作物中偷取果实。",
metadata={"别名": "steal"},
icon="🕵️",
details=[
{
"type": "steps",
"items": [
"输入目标用户 ID 或昵称,以及成熟地块序号。",
"系统校验目标用户菜园与成熟状态。",
"成功后获得 1 个果实并通知对方被偷记录。",
],
},
"同一地块可多次被不同用户偷取,超限后将提示果实不足。",
],
),
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "指令格式",
"description": "`偷取 <用户ID|昵称> <地块序号>`,不可针对自己。",
},
{
"title": "通知机制",
"description": "成功偷取后会向目标用户推送警报消息,包含剩余果实数量。",
},
)
def wake_up(self) -> None:
super().wake_up()
self.register_plugin("steal")
self.register_plugin("偷取")
self.register_plugin("偷菜")
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message("❌ 指令格式:`偷取 <用户> <地块序号>`", chat_id, user_id)
tokens = [token.strip() for token in payload.split() if token.strip()]
if len(tokens) < 2:
return await self.send_markdown_message("❌ 指令格式:`偷取 <用户> <地块序号>`", chat_id, user_id)
target_identifier = tokens[0]
if not tokens[1].isdigit():
return await self.send_markdown_message("❌ 指令格式:`偷取 <用户> <地块序号>`", chat_id, user_id)
plot_index = int(tokens[1])
owner_id = self._resolve_user_identifier(target_identifier)
if owner_id is None:
return await self.send_markdown_message("❌ 未找到目标用户", chat_id, user_id)
if owner_id == user_id:
return await self.send_markdown_message("❌ 不能偷取自己的菜园", chat_id, user_id)
try:
result = self.service().steal(
thief_id=user_id,
owner_id=owner_id,
plot_index=plot_index,
)
except ValueError as exc:
return await self.send_markdown_message(f"{exc}", chat_id, user_id)
crop = result["crop"]
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, crop.fruit_id, result["stolen_quantity"])
remaining = result["remaining"]
trap_result = result.get("trap_result")
# 处理陷阱触发
if trap_result:
trap = trap_result["trap"]
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
# 扣除罚金
current_points = config_api.get_user_points(user_id)
actual_fine = min(trap_result["fine_points"], current_points)
if actual_fine > 0:
await config_api.adjust_user_points(
chat_id, user_id, -actual_fine, f"触发陷阱罚金:{trap.display_name}"
)
# 发送陷阱触发消息给偷盗者
trap_message = (
f"# 🚨 陷阱触发\n"
f"{trap_result['trigger_message']}\n"
f"- 扣除罚金:{actual_fine}"
)
await self.send_markdown_message(trap_message, chat_id, user_id)
message = (
"# 🕵️ 偷取成功\n"
f"- 目标:{crop.display_name}\n"
f"- 获得:{crop.display_name}的果实 × {result['stolen_quantity']}\n"
f"- 目标剩余果实:{remaining}"
)
if trap_result:
message += f"\n- ⚠️ 注意:你触发了陷阱!"
await self.send_markdown_message(message, chat_id, user_id)
owner_chat = result.get("chat_id")
if owner_chat:
owner_message = (
"# ⚠️ 菜园警报\n"
f"- 你的 {crop.display_name} 被偷走了 1 个果实\n"
f"- 当前剩余果实:{remaining}"
)
if trap_result:
trap = trap_result["trap"]
durability = trap_result.get("durability", 0)
if trap_result.get("durability_exhausted", False):
owner_message += f"\n- 🎯 你的{trap.display_name}成功触发了!但陷阱耐久度已耗尽并被移除。"
else:
owner_message += f"\n- 🎯 好消息:你的{trap.display_name}成功触发了!剩余耐久度:{durability}"
await self.send_markdown_message(owner_message, owner_chat, owner_id)
return None
def _resolve_user_identifier(self, identifier: str) -> Optional[int]:
text = identifier.strip()
if text.isdigit():
return int(text)
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT user_id FROM user_info WHERE username = ? COLLATE NOCASE",
(text,),
)
row = cursor.fetchone()
if row:
return int(row["user_id"])
return None
__all__ = ["WPSGardenSteal"]