Files
NewWPSBot/Plugins/WPSAlchemyGame.py

622 lines
23 KiB
Python
Raw Normal View History

2025-11-09 01:31:12 +08:00
from __future__ import annotations
import random
2025-11-11 00:06:56 +08:00
from collections import defaultdict, Counter
2025-11-09 01:31:12 +08:00
from dataclasses import dataclass
2025-11-11 00:06:56 +08:00
from typing import Dict, List, Optional, Sequence, Set, Tuple, override
2025-11-09 01:31:12 +08:00
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
from .WPSAPI import WPSAPI
from .WPSBackpackSystem import (
BackpackItemDefinition,
BackpackItemTier,
WPSBackpackSystem,
)
from .WPSConfigSystem import WPSConfigAPI
from .WPSStoreSystem import WPSStoreSystem
2025-11-09 01:31:12 +08:00
from .WPSFortuneSystem import WPSFortuneSystem
logger: ProjectConfig = Architecture.Get(ProjectConfig)
FORTUNE_COEFF:float = logger.FindItem("alchemy_fortune_coeff", 0.03)
logger.SaveProperties()
def clamp01(value: float) -> float:
return max(0.0, min(1.0, value))
@dataclass(frozen=True)
class AlchemyRecipe:
materials: Tuple[str, str, str]
success_item_id: str
fail_item_id: str
base_success_rate: float
class WPSAlchemyGame(WPSAPI):
ASH_ITEM_ID = "alchemy_ash"
ASH_ITEM_NAME = "炉灰"
SLAG_ITEM_ID = "alchemy_slag"
SLAG_ITEM_NAME = "炉渣"
MAX_BATCH_TIMES = 99
2025-11-09 01:31:12 +08:00
_PHASE_TABLE: List[Tuple[float, float, str, str]] = [
(0.1481481481, 0.0, "爆炸", "💥 炼金反噬,积分化为飞灰……"),
(0.4444444444, 0.5, "失败", "😖 炼金失败,仅保留半数积分。"),
(0.7407407407, 2.0, "成功", "😊 炼金成功,积分翻倍!"),
(0.8888888888, 5.0, "丰厚积分", "😁 运气加成,收获丰厚积分!"),
(0.9629629630, 10.0, "巨额积分", "🤩 巨额积分入账,今日欧气爆棚!"),
(1.0, 100.0, "传说积分", "🌈 传说级好运!积分暴涨一百倍!"),
]
def __init__(self) -> None:
super().__init__()
self._recipes: Dict[Tuple[str, str, str], AlchemyRecipe] = {}
2025-11-11 00:06:56 +08:00
self._material_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._success_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._fail_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
2025-11-09 01:31:12 +08:00
self._fortune_coeff = FORTUNE_COEFF
@override
def dependencies(self) -> List[type]:
return [WPSAPI, WPSBackpackSystem, WPSConfigAPI, WPSFortuneSystem, WPSStoreSystem]
2025-11-09 01:31:12 +08:00
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSAlchemyGame 插件已加载{ConsoleFrontColor.RESET}",
)
self.register_plugin("alchemy")
self.register_plugin("炼金")
self._register_alchemy_items()
def _register_alchemy_items(self) -> None:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
try:
backpack.register_item(
self.ASH_ITEM_ID,
self.ASH_ITEM_NAME,
BackpackItemTier.COMMON,
2025-11-10 22:30:16 +08:00
"炼金失败时残留的炉灰,可作为低阶材料或出售。",
2025-11-09 01:31:12 +08:00
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉灰物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
else:
try:
store_system = Architecture.Get(WPSStoreSystem)
store_system.register_mode(
item_id=self.ASH_ITEM_ID,
price=8,
limit_amount=999,
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉灰商店模式时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
2025-11-09 01:31:12 +08:00
try:
backpack.register_item(
self.SLAG_ITEM_ID,
self.SLAG_ITEM_NAME,
BackpackItemTier.COMMON,
2025-11-10 22:30:16 +08:00
"经高温提炼后的炉渣,可在特殊任务中使用。",
2025-11-09 01:31:12 +08:00
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉渣物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
try:
self.register_recipe(
(self.ASH_ITEM_ID, self.ASH_ITEM_ID, self.ASH_ITEM_ID),
self.SLAG_ITEM_ID,
self.ASH_ITEM_ID,
1.0,
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉渣配方时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
def register_recipe(
self,
materials: Tuple[str, str, str]|Sequence[str],
success_item_id: str,
fail_item_id: str,
base_success_rate: float,
) -> None:
if len(materials) != 3:
raise ValueError("炼金配方必须提供三种材料")
sorted_materials = tuple(sorted(mat.strip() for mat in materials))
if any(not material for material in sorted_materials):
raise ValueError("炼金材料 ID 不能为空")
clamped_rate = clamp01(base_success_rate)
if clamped_rate != base_success_rate:
raise ValueError("配方成功率必须位于 0~1 之间")
2025-11-11 00:06:56 +08:00
sorted_key = tuple(sorted_materials)
existing = self._recipes.get(sorted_key)
if existing:
self._unindex_recipe(sorted_key, existing)
2025-11-09 01:31:12 +08:00
recipe = AlchemyRecipe(
2025-11-11 00:06:56 +08:00
materials=sorted_key,
2025-11-09 01:31:12 +08:00
success_item_id=success_item_id.strip(),
fail_item_id=fail_item_id.strip() or self.ASH_ITEM_ID,
base_success_rate=base_success_rate,
)
2025-11-11 00:06:56 +08:00
self._recipes[sorted_key] = recipe
self._index_recipe(sorted_key, recipe)
2025-11-09 01:31:12 +08:00
logger.Log(
"Info",
f"{ConsoleFrontColor.CYAN}已注册炼金配方 {sorted_materials} -> {success_item_id} ({base_success_rate:.2%}){ConsoleFrontColor.RESET}",
)
2025-11-11 00:06:56 +08:00
def _index_recipe(
self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe
) -> None:
for material in recipe.materials:
self._material_index[material].add(materials_key)
self._success_index[recipe.success_item_id].add(materials_key)
self._fail_index[recipe.fail_item_id].add(materials_key)
def _unindex_recipe(
self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe
) -> None:
for material in recipe.materials:
material_set = self._material_index.get(material)
if material_set and materials_key in material_set:
material_set.discard(materials_key)
if not material_set:
del self._material_index[material]
success_set = self._success_index.get(recipe.success_item_id)
if success_set and materials_key in success_set:
success_set.discard(materials_key)
if not success_set:
del self._success_index[recipe.success_item_id]
fail_set = self._fail_index.get(recipe.fail_item_id)
if fail_set and materials_key in fail_set:
fail_set.discard(materials_key)
if not fail_set:
del self._fail_index[recipe.fail_item_id]
def get_recipes_using_item(self, item_id: str) -> List[AlchemyRecipe]:
if not item_id:
return []
material_keys = sorted(self._material_index.get(item_id, set()))
return [self._recipes[key] for key in material_keys]
def get_recipes_producing_item(
self, item_id: str
) -> Dict[str, List[AlchemyRecipe]]:
if not item_id:
return {"success": [], "fail": []}
success_keys = sorted(
self._success_index.get(item_id, set()),
key=lambda key: (
-self._recipes[key].base_success_rate,
self._recipes[key].materials,
),
)
fail_keys = sorted(
self._fail_index.get(item_id, set()),
key=lambda key: (
-self._recipes[key].base_success_rate,
self._recipes[key].materials,
),
)
return {
"success": [self._recipes[key] for key in success_keys],
"fail": [self._recipes[key] for key in fail_keys],
}
2025-11-09 01:31:12 +08:00
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
tokens = [token.strip() for token in payload.split() if token.strip()]
if not tokens:
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
if len(tokens) == 1 and tokens[0].isdigit():
points = int(tokens[0])
response = await self._handle_point_alchemy(
chat_id, user_id, points
)
return await self.send_markdown_message(response, chat_id, user_id)
if len(tokens) >= 3:
materials = tokens[:3]
times = 1
if len(tokens) >= 4:
try:
times = int(tokens[3])
except ValueError:
return await self.send_markdown_message(
"❌ 炼金次数必须是整数", chat_id, user_id
)
response = await self._handle_item_alchemy(
chat_id, user_id, materials, times
)
return await self.send_markdown_message(response, chat_id, user_id)
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
async def _handle_point_alchemy(
self, chat_id: int, user_id: int, points: int
) -> str:
if points <= 0:
return "❌ 投入积分必须大于 0"
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
current_points = config_api.get_user_points(user_id)
2025-11-09 01:31:12 +08:00
if current_points < points:
return f"❌ 积分不足,需要 {points} 分,当前仅有 {current_points}"
await config_api.adjust_user_points(
chat_id, user_id, -points, "炼金消耗"
)
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
fortune_value = fortune_system.get_fortune_value(user_id)
multiplier, phase_label, phase_text = self._draw_point_multiplier(
fortune_value
)
reward = int(points * multiplier)
if reward:
await config_api.adjust_user_points(
chat_id, user_id, reward, f"炼金收益({phase_label}"
)
ash_reward = 0
if multiplier == 0.0:
ash_reward = min(points // 10, 99)
if ash_reward > 0:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, self.ASH_ITEM_ID, ash_reward)
final_points = config_api.get_user_points(user_id)
2025-11-09 01:31:12 +08:00
extra_line = ""
if ash_reward > 0:
extra_line = (
f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`\n"
)
return (
"# 🔮 炼金结算\n"
f"- 投入积分:`{points}`\n"
f"- 结果:{phase_text}\n"
f"- 获得倍率:×{multiplier:.1f},返还 `+{reward}` 积分\n"
f"{extra_line}"
f"- 当前积分:`{final_points}`"
)
def _draw_point_multiplier(
self, fortune_value: float
) -> Tuple[float, str, str]:
offset = fortune_value * self._fortune_coeff
random_value = random.random()
landing = clamp01(offset + random_value)
for threshold, multiplier, label, text in self._PHASE_TABLE:
if landing <= threshold:
return multiplier, label, text
return self._PHASE_TABLE[-1][1:]
async def _handle_item_alchemy(
self,
chat_id: int,
user_id: int,
materials: Sequence[str],
times: int,
) -> str:
if times <= 0:
return "❌ 炼金次数必须大于 0"
if times > self.MAX_BATCH_TIMES:
return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES}"
resolved: List[BackpackItemDefinition] = []
for identifier in materials:
resolved_item = self._resolve_item(identifier)
if resolved_item is None:
return f"❌ 未找到材料 `{identifier}`,请确认已注册"
resolved.append(resolved_item)
material_ids = [item.item_id for item in resolved]
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for item in resolved:
owned = self._get_user_quantity(user_id, item.item_id)
if owned < times:
return (
f"❌ 材料 `{item.name}` 数量不足,需要 {times} 个,当前仅有 {owned}"
)
for item in resolved:
current = self._get_user_quantity(user_id, item.item_id)
backpack.set_item_quantity(
user_id, item.item_id, current - times
)
recipe = self._recipes.get(tuple(sorted(material_ids)))
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
fortune_value = fortune_system.get_fortune_value(user_id)
adjusted_rate = (
clamp01(recipe.base_success_rate + fortune_value * self._fortune_coeff)
if recipe
else 0.0
)
success_count = 0
fail_count = 0
rewards: Dict[str, int] = {}
for _ in range(times):
if recipe and random.random() < adjusted_rate:
reward_id = recipe.success_item_id
success_count += 1
else:
reward_id = (
recipe.fail_item_id if recipe else self.ASH_ITEM_ID
)
fail_count += 1
backpack.add_item(user_id, reward_id, 1)
rewards[reward_id] = rewards.get(reward_id, 0) + 1
details = []
for item_id, count in rewards.items():
try:
2025-11-10 01:15:17 +08:00
definition = backpack._get_definition(item_id)
2025-11-09 01:31:12 +08:00
item_name = definition.name
except Exception:
item_name = item_id
details.append(f"- {item_name} × **{count}**")
success_line = (
f"- 成功次数:`{success_count}`"
if recipe
else "- 成功次数:`0`(未知配方必定失败)"
)
fail_line = (
f"- 失败次数:`{fail_count}`"
if recipe
else f"- 失败次数:`{times}`"
)
rate_line = (
f"- 基础成功率:`{recipe.base_success_rate:.2%}`"
if recipe
else "- ✅ 未知配方仅产出炉灰"
)
rewards_block = "\n".join(details) if details else "- (无物品获得)"
return (
"# ⚗️ 物品炼金结果\n"
f"- 投入材料:{''.join([item.name for item in resolved])} × {times}\n"
f"{success_line}\n"
f"{fail_line}\n"
f"{rate_line}\n"
"- 获得物品:\n"
f"{rewards_block}"
)
def _resolve_item(
self, identifier: str
) -> Optional[BackpackItemDefinition]:
identifier_lower = identifier.strip().lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(identifier_lower, identifier_lower),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier.strip()
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
try:
2025-11-10 01:15:17 +08:00
return backpack._get_definition(item_id)
2025-11-09 01:31:12 +08:00
except Exception:
return None
def _get_user_quantity(self, user_id: int, item_id: str) -> int:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for item in backpack.get_user_items(user_id):
if item.item_id == item_id:
return item.quantity
return 0
def _help_message(self) -> str:
return (
"# ⚗️ 炼金指令帮助\n"
"- `炼金 <积分>`:投入积分尝试炼金\n"
"- `炼金 <材料1> <材料2> <材料3> [次数]`:使用三件材料进行炼金(可选次数,默认 1\n"
"> 建议提前备足材料及积分,谨慎开启炼金流程。"
)
2025-11-11 00:06:56 +08:00
class WPSAlchemyRecipeLookup(WPSAPI):
def __init__(self) -> None:
super().__init__()
self._alchemy: Optional[WPSAlchemyGame] = None
self._backpack: Optional[WPSBackpackSystem] = None
def dependencies(self) -> List[type]:
return [WPSAlchemyGame, WPSBackpackSystem]
def is_enable_plugin(self) -> bool:
return True
def wake_up(self) -> None:
self._alchemy = Architecture.Get(WPSAlchemyGame)
self._backpack = Architecture.Get(WPSBackpackSystem)
self.register_plugin("炼金配方")
self.register_plugin("alchemy_recipe")
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSAlchemyRecipeLookup 插件已加载{ConsoleFrontColor.RESET}",
)
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message(self._help_text(), chat_id, user_id)
backpack = self._backpack or Architecture.Get(WPSBackpackSystem)
definition = self._resolve_definition(payload, backpack)
if definition is None:
return await self.send_markdown_message(
f"❌ 未找到物品 `{payload}`,请确认输入的物品 ID 或名称。",
chat_id,
user_id,
)
alchemy = self._alchemy or Architecture.Get(WPSAlchemyGame)
material_recipes = alchemy.get_recipes_using_item(definition.item_id)
produce_map = alchemy.get_recipes_producing_item(definition.item_id)
success_recipes = produce_map["success"]
fail_recipes = produce_map["fail"]
message_text = self._format_markdown(
definition,
material_recipes,
success_recipes,
fail_recipes,
backpack,
)
return await self.send_markdown_message(message_text, chat_id, user_id)
def _resolve_definition(
self, identifier: str, backpack: WPSBackpackSystem
) -> Optional[BackpackItemDefinition]:
lowered = identifier.strip().lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(lowered, lowered),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier.strip()
try:
return backpack._get_definition(item_id) # noqa: SLF001
except Exception:
return None
def _format_markdown(
self,
target: BackpackItemDefinition,
material_recipes: List[AlchemyRecipe],
success_recipes: List[AlchemyRecipe],
fail_recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
) -> str:
lines = [
f"# 🔍 炼金配方查询|{target.name}",
f"- 物品 ID`{target.item_id}`",
"---",
]
lines.append("## 作为配方材料")
lines.extend(
self._format_recipe_entries(material_recipes, backpack)
or ["- 暂无记录"]
)
lines.append("\n## 作为成功产物")
lines.extend(
self._format_recipe_entries(success_recipes, backpack, role="success")
or ["- 暂无记录"]
)
lines.append("\n## 作为失败产物")
lines.extend(
self._format_recipe_entries(fail_recipes, backpack, role="fail")
or ["- 暂无记录"]
)
return "\n".join(lines)
def _format_recipe_entries(
self,
recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
*,
role: str = "material",
) -> List[str]:
if not recipes:
return []
entries: List[str] = []
for recipe in recipes:
materials = self._summarize_materials(recipe, backpack)
success_name = self._resolve_item_name(recipe.success_item_id, backpack)
fail_name = self._resolve_item_name(recipe.fail_item_id, backpack)
rate = f"{recipe.base_success_rate:.0%}"
if role == "material":
entry = (
f"- 材料:{materials}|成功产物:`{success_name}`"
f"失败产物:`{fail_name}`|成功率:{rate}"
)
elif role == "success":
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"失败产物:`{fail_name}`"
)
else:
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"成功产物:`{success_name}`"
)
entries.append(entry)
return entries
def _summarize_materials(
self, recipe: AlchemyRecipe, backpack: WPSBackpackSystem
) -> str:
counter = Counter(recipe.materials)
parts: List[str] = []
for item_id, count in sorted(counter.items()):
name = self._resolve_item_name(item_id, backpack)
if count == 1:
parts.append(f"`{name}`")
else:
parts.append(f"`{name}` × {count}")
return " + ".join(parts)
def _resolve_item_name(
self, item_id: str, backpack: WPSBackpackSystem
) -> str:
try:
definition = backpack._get_definition(item_id) # noqa: SLF001
return definition.name
except Exception:
return item_id
def _help_text(self) -> str:
return (
"# ⚗️ 炼金配方查询帮助\n"
"- `炼金配方 <物品ID>`\n"
"- `炼金配方 <物品名称>`\n"
"> 输入需要精确匹配注册物品,名称不区分大小写。"
)
__all__ = ["WPSAlchemyGame", "WPSAlchemyRecipeLookup"]
2025-11-09 01:31:12 +08:00