382 lines
14 KiB
Python
382 lines
14 KiB
Python
from __future__ import annotations
|
||
|
||
import random
|
||
from dataclasses import dataclass
|
||
from typing import Dict, List, Optional, Sequence, Tuple, override
|
||
|
||
from PWF.Convention.Runtime.Architecture import Architecture
|
||
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
|
||
from PWF.CoreModules.database import get_db
|
||
|
||
from .WPSAPI import WPSAPI
|
||
from .WPSBackpackSystem import (
|
||
BackpackItemDefinition,
|
||
BackpackItemTier,
|
||
WPSBackpackSystem,
|
||
)
|
||
from .WPSConfigSystem import WPSConfigAPI
|
||
from .WPSStoreSystem import WPSStoreSystem
|
||
from .WPSFortuneSystem import WPSFortuneSystem
|
||
|
||
|
||
logger: ProjectConfig = Architecture.Get(ProjectConfig)
|
||
FORTUNE_COEFF:float = logger.FindItem("alchemy_fortune_coeff", 0.03)
|
||
logger.SaveProperties()
|
||
|
||
|
||
def clamp01(value: float) -> float:
|
||
return max(0.0, min(1.0, value))
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class AlchemyRecipe:
|
||
materials: Tuple[str, str, str]
|
||
success_item_id: str
|
||
fail_item_id: str
|
||
base_success_rate: float
|
||
|
||
|
||
class WPSAlchemyGame(WPSAPI):
|
||
ASH_ITEM_ID = "alchemy_ash"
|
||
ASH_ITEM_NAME = "炉灰"
|
||
SLAG_ITEM_ID = "alchemy_slag"
|
||
SLAG_ITEM_NAME = "炉渣"
|
||
MAX_BATCH_TIMES = 99
|
||
|
||
_PHASE_TABLE: List[Tuple[float, float, str, str]] = [
|
||
(0.1481481481, 0.0, "爆炸", "💥 炼金反噬,积分化为飞灰……"),
|
||
(0.4444444444, 0.5, "失败", "😖 炼金失败,仅保留半数积分。"),
|
||
(0.7407407407, 2.0, "成功", "😊 炼金成功,积分翻倍!"),
|
||
(0.8888888888, 5.0, "丰厚积分", "😁 运气加成,收获丰厚积分!"),
|
||
(0.9629629630, 10.0, "巨额积分", "🤩 巨额积分入账,今日欧气爆棚!"),
|
||
(1.0, 100.0, "传说积分", "🌈 传说级好运!积分暴涨一百倍!"),
|
||
]
|
||
|
||
def __init__(self) -> None:
|
||
super().__init__()
|
||
self._recipes: Dict[Tuple[str, str, str], AlchemyRecipe] = {}
|
||
self._fortune_coeff = FORTUNE_COEFF
|
||
|
||
@override
|
||
def dependencies(self) -> List[type]:
|
||
return [WPSAPI, WPSBackpackSystem, WPSConfigAPI, WPSFortuneSystem, WPSStoreSystem]
|
||
|
||
@override
|
||
def wake_up(self) -> None:
|
||
logger.Log(
|
||
"Info",
|
||
f"{ConsoleFrontColor.GREEN}WPSAlchemyGame 插件已加载{ConsoleFrontColor.RESET}",
|
||
)
|
||
self.register_plugin("alchemy")
|
||
self.register_plugin("炼金")
|
||
self._register_alchemy_items()
|
||
|
||
def _register_alchemy_items(self) -> None:
|
||
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
|
||
try:
|
||
backpack.register_item(
|
||
self.ASH_ITEM_ID,
|
||
self.ASH_ITEM_NAME,
|
||
BackpackItemTier.COMMON,
|
||
)
|
||
except Exception as exc:
|
||
logger.Log(
|
||
"Warning",
|
||
f"{ConsoleFrontColor.YELLOW}注册炉灰物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
|
||
)
|
||
else:
|
||
try:
|
||
store_system = Architecture.Get(WPSStoreSystem)
|
||
store_system.register_mode(
|
||
item_id=self.ASH_ITEM_ID,
|
||
price=8,
|
||
limit_amount=999,
|
||
)
|
||
except Exception as exc:
|
||
logger.Log(
|
||
"Warning",
|
||
f"{ConsoleFrontColor.YELLOW}注册炉灰商店模式时出现问题: {exc}{ConsoleFrontColor.RESET}",
|
||
)
|
||
try:
|
||
backpack.register_item(
|
||
self.SLAG_ITEM_ID,
|
||
self.SLAG_ITEM_NAME,
|
||
BackpackItemTier.COMMON,
|
||
)
|
||
except Exception as exc:
|
||
logger.Log(
|
||
"Warning",
|
||
f"{ConsoleFrontColor.YELLOW}注册炉渣物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
|
||
)
|
||
try:
|
||
self.register_recipe(
|
||
(self.ASH_ITEM_ID, self.ASH_ITEM_ID, self.ASH_ITEM_ID),
|
||
self.SLAG_ITEM_ID,
|
||
self.ASH_ITEM_ID,
|
||
1.0,
|
||
)
|
||
except Exception as exc:
|
||
logger.Log(
|
||
"Warning",
|
||
f"{ConsoleFrontColor.YELLOW}注册炉渣配方时出现问题: {exc}{ConsoleFrontColor.RESET}",
|
||
)
|
||
|
||
def register_recipe(
|
||
self,
|
||
materials: Tuple[str, str, str]|Sequence[str],
|
||
success_item_id: str,
|
||
fail_item_id: str,
|
||
base_success_rate: float,
|
||
) -> None:
|
||
if len(materials) != 3:
|
||
raise ValueError("炼金配方必须提供三种材料")
|
||
sorted_materials = tuple(sorted(mat.strip() for mat in materials))
|
||
if any(not material for material in sorted_materials):
|
||
raise ValueError("炼金材料 ID 不能为空")
|
||
clamped_rate = clamp01(base_success_rate)
|
||
if clamped_rate != base_success_rate:
|
||
raise ValueError("配方成功率必须位于 0~1 之间")
|
||
recipe = AlchemyRecipe(
|
||
materials=sorted_materials,
|
||
success_item_id=success_item_id.strip(),
|
||
fail_item_id=fail_item_id.strip() or self.ASH_ITEM_ID,
|
||
base_success_rate=base_success_rate,
|
||
)
|
||
self._recipes[sorted_materials] = recipe
|
||
logger.Log(
|
||
"Info",
|
||
f"{ConsoleFrontColor.CYAN}已注册炼金配方 {sorted_materials} -> {success_item_id} ({base_success_rate:.2%}){ConsoleFrontColor.RESET}",
|
||
)
|
||
|
||
async def callback(
|
||
self, message: str, chat_id: int, user_id: int
|
||
) -> Optional[str]:
|
||
payload = self.parse_message_after_at(message).strip()
|
||
if not payload:
|
||
return await self.send_markdown_message(
|
||
self._help_message(), chat_id, user_id
|
||
)
|
||
tokens = [token.strip() for token in payload.split() if token.strip()]
|
||
if not tokens:
|
||
return await self.send_markdown_message(
|
||
self._help_message(), chat_id, user_id
|
||
)
|
||
|
||
if len(tokens) == 1 and tokens[0].isdigit():
|
||
points = int(tokens[0])
|
||
response = await self._handle_point_alchemy(
|
||
chat_id, user_id, points
|
||
)
|
||
return await self.send_markdown_message(response, chat_id, user_id)
|
||
|
||
if len(tokens) >= 3:
|
||
materials = tokens[:3]
|
||
times = 1
|
||
if len(tokens) >= 4:
|
||
try:
|
||
times = int(tokens[3])
|
||
except ValueError:
|
||
return await self.send_markdown_message(
|
||
"❌ 炼金次数必须是整数", chat_id, user_id
|
||
)
|
||
response = await self._handle_item_alchemy(
|
||
chat_id, user_id, materials, times
|
||
)
|
||
return await self.send_markdown_message(response, chat_id, user_id)
|
||
|
||
return await self.send_markdown_message(
|
||
self._help_message(), chat_id, user_id
|
||
)
|
||
|
||
async def _handle_point_alchemy(
|
||
self, chat_id: int, user_id: int, points: int
|
||
) -> str:
|
||
if points <= 0:
|
||
return "❌ 投入积分必须大于 0"
|
||
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
|
||
current_points = config_api.get_user_points(user_id)
|
||
if current_points < points:
|
||
return f"❌ 积分不足,需要 {points} 分,当前仅有 {current_points} 分"
|
||
|
||
await config_api.adjust_user_points(
|
||
chat_id, user_id, -points, "炼金消耗"
|
||
)
|
||
|
||
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
|
||
fortune_value = fortune_system.get_fortune_value(user_id)
|
||
multiplier, phase_label, phase_text = self._draw_point_multiplier(
|
||
fortune_value
|
||
)
|
||
reward = int(points * multiplier)
|
||
if reward:
|
||
await config_api.adjust_user_points(
|
||
chat_id, user_id, reward, f"炼金收益({phase_label})"
|
||
)
|
||
ash_reward = 0
|
||
if multiplier == 0.0:
|
||
ash_reward = min(points // 10, 99)
|
||
if ash_reward > 0:
|
||
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
|
||
backpack.add_item(user_id, self.ASH_ITEM_ID, ash_reward)
|
||
|
||
final_points = config_api.get_user_points(user_id)
|
||
extra_line = ""
|
||
if ash_reward > 0:
|
||
extra_line = (
|
||
f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`\n"
|
||
)
|
||
return (
|
||
"# 🔮 炼金结算\n"
|
||
f"- 投入积分:`{points}`\n"
|
||
f"- 结果:{phase_text}\n"
|
||
f"- 获得倍率:×{multiplier:.1f},返还 `+{reward}` 积分\n"
|
||
f"{extra_line}"
|
||
f"- 当前积分:`{final_points}`"
|
||
)
|
||
|
||
def _draw_point_multiplier(
|
||
self, fortune_value: float
|
||
) -> Tuple[float, str, str]:
|
||
offset = fortune_value * self._fortune_coeff
|
||
random_value = random.random()
|
||
landing = clamp01(offset + random_value)
|
||
for threshold, multiplier, label, text in self._PHASE_TABLE:
|
||
if landing <= threshold:
|
||
return multiplier, label, text
|
||
return self._PHASE_TABLE[-1][1:]
|
||
|
||
async def _handle_item_alchemy(
|
||
self,
|
||
chat_id: int,
|
||
user_id: int,
|
||
materials: Sequence[str],
|
||
times: int,
|
||
) -> str:
|
||
if times <= 0:
|
||
return "❌ 炼金次数必须大于 0"
|
||
if times > self.MAX_BATCH_TIMES:
|
||
return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES} 次"
|
||
|
||
resolved: List[BackpackItemDefinition] = []
|
||
for identifier in materials:
|
||
resolved_item = self._resolve_item(identifier)
|
||
if resolved_item is None:
|
||
return f"❌ 未找到材料 `{identifier}`,请确认已注册"
|
||
resolved.append(resolved_item)
|
||
material_ids = [item.item_id for item in resolved]
|
||
|
||
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
|
||
for item in resolved:
|
||
owned = self._get_user_quantity(user_id, item.item_id)
|
||
if owned < times:
|
||
return (
|
||
f"❌ 材料 `{item.name}` 数量不足,需要 {times} 个,当前仅有 {owned} 个"
|
||
)
|
||
|
||
for item in resolved:
|
||
current = self._get_user_quantity(user_id, item.item_id)
|
||
backpack.set_item_quantity(
|
||
user_id, item.item_id, current - times
|
||
)
|
||
|
||
recipe = self._recipes.get(tuple(sorted(material_ids)))
|
||
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
|
||
fortune_value = fortune_system.get_fortune_value(user_id)
|
||
adjusted_rate = (
|
||
clamp01(recipe.base_success_rate + fortune_value * self._fortune_coeff)
|
||
if recipe
|
||
else 0.0
|
||
)
|
||
|
||
success_count = 0
|
||
fail_count = 0
|
||
rewards: Dict[str, int] = {}
|
||
for _ in range(times):
|
||
if recipe and random.random() < adjusted_rate:
|
||
reward_id = recipe.success_item_id
|
||
success_count += 1
|
||
else:
|
||
reward_id = (
|
||
recipe.fail_item_id if recipe else self.ASH_ITEM_ID
|
||
)
|
||
fail_count += 1
|
||
backpack.add_item(user_id, reward_id, 1)
|
||
rewards[reward_id] = rewards.get(reward_id, 0) + 1
|
||
|
||
details = []
|
||
for item_id, count in rewards.items():
|
||
try:
|
||
definition = backpack._get_definition(item_id)
|
||
item_name = definition.name
|
||
except Exception:
|
||
item_name = item_id
|
||
details.append(f"- {item_name} × **{count}**")
|
||
|
||
success_line = (
|
||
f"- 成功次数:`{success_count}`"
|
||
if recipe
|
||
else "- 成功次数:`0`(未知配方必定失败)"
|
||
)
|
||
fail_line = (
|
||
f"- 失败次数:`{fail_count}`"
|
||
if recipe
|
||
else f"- 失败次数:`{times}`"
|
||
)
|
||
rate_line = (
|
||
f"- 基础成功率:`{recipe.base_success_rate:.2%}`"
|
||
if recipe
|
||
else "- ✅ 未知配方仅产出炉灰"
|
||
)
|
||
rewards_block = "\n".join(details) if details else "- (无物品获得)"
|
||
|
||
return (
|
||
"# ⚗️ 物品炼金结果\n"
|
||
f"- 投入材料:{'、'.join([item.name for item in resolved])} × {times}\n"
|
||
f"{success_line}\n"
|
||
f"{fail_line}\n"
|
||
f"{rate_line}\n"
|
||
"- 获得物品:\n"
|
||
f"{rewards_block}"
|
||
)
|
||
|
||
def _resolve_item(
|
||
self, identifier: str
|
||
) -> Optional[BackpackItemDefinition]:
|
||
identifier_lower = identifier.strip().lower()
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
SELECT item_id
|
||
FROM {WPSBackpackSystem.ITEMS_TABLE}
|
||
WHERE lower(item_id) = ? OR lower(name) = ?
|
||
LIMIT 1
|
||
""",
|
||
(identifier_lower, identifier_lower),
|
||
)
|
||
row = cursor.fetchone()
|
||
item_id = row["item_id"] if row else identifier.strip()
|
||
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
|
||
try:
|
||
return backpack._get_definition(item_id)
|
||
except Exception:
|
||
return None
|
||
|
||
def _get_user_quantity(self, user_id: int, item_id: str) -> int:
|
||
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
|
||
for item in backpack.get_user_items(user_id):
|
||
if item.item_id == item_id:
|
||
return item.quantity
|
||
return 0
|
||
|
||
def _help_message(self) -> str:
|
||
return (
|
||
"# ⚗️ 炼金指令帮助\n"
|
||
"- `炼金 <积分>`:投入积分尝试炼金\n"
|
||
"- `炼金 <材料1> <材料2> <材料3> [次数]`:使用三件材料进行炼金(可选次数,默认 1)\n"
|
||
"> 建议提前备足材料及积分,谨慎开启炼金流程。"
|
||
)
|
||
|
||
|
||
__all__ = ["WPSAlchemyGame"]
|
||
|