积分抽奖更新奖池模式

This commit is contained in:
2025-11-13 20:31:17 +08:00
parent 2853119f58
commit a121a2478f
2 changed files with 365 additions and 273 deletions

View File

@@ -5,7 +5,7 @@ import random
from collections import defaultdict, Counter
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Sequence, Set, Tuple, override
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple, override
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
@@ -27,7 +27,20 @@ from .WPSFortuneSystem import WPSFortuneSystem
logger: ProjectConfig = Architecture.Get(ProjectConfig)
FORTUNE_COEFF: float = logger.FindItem("alchemy_fortune_coeff", 0.03)
COOLDOWN_MINUTES: int = logger.FindItem("alchemy_cooldown_minutes", 2)
MAX_POINTS_PER_BATCH:int = logger.FindItem("alchemy_max_points_per_batch", 100)
MAX_POINTS_PER_BATCH: int = logger.FindItem("alchemy_max_points_per_batch", 10000)
POINT_ALCHEMY_FEE: int = logger.FindItem("alchemy_point_fee", 100)
POOL_INITIAL_BALANCE: int = logger.FindItem("alchemy_pool_initial_balance", 10000)
REWARD_SLOT_CONFIG = logger.FindItem("alchemy_reward_slots", sorted([
(50,0.01),
(45,0.02),
(40,0.03),
(35,0.05),
(30,0.07),
(20,0.1),
(10,0.2)
], key=lambda x: x[0]))
COMMON_BONUS_RATE: float = logger.FindItem("alchemy_common_bonus_rate", 0.3333333)
COMMON_BONUS_BLACKLIST: Sequence[str] = logger.FindItem("alchemy_common_blacklist", [])
logger.SaveProperties()
@@ -71,16 +84,30 @@ class WPSAlchemyGame(WPSAPI):
self._cooldown_ms = int(self._cooldown_minutes * 60 * 1000)
# 从配置读取单次积分炼金上限
self._max_points_per_batch = MAX_POINTS_PER_BATCH
self._point_fee = max(1, int(POINT_ALCHEMY_FEE))
self._pool_initial_balance = max(0, int(POOL_INITIAL_BALANCE))
self._reward_slot_config = REWARD_SLOT_CONFIG
self._reward_total_probability = sum(
probability for _, probability in self._reward_slot_config
)
self._common_bonus_rate = clamp01(float(COMMON_BONUS_RATE))
self._common_bonus_blacklist = self._parse_identifier_collection(
COMMON_BONUS_BLACKLIST
)
self._pool_balance_cache: Optional[int] = None
logger.SaveProperties()
def get_guide_subtitle(self) -> str:
return "积分炼金与材料合成系统"
def get_guide_metadata(self) -> Dict[str, str]:
pool_balance = self._get_reward_pool_balance(refresh=True)
return {
"配方数量": str(len(self._recipes)),
"冷却时间(分钟)": str(self._cooldown_minutes),
"单次积分上限": str(self._max_points_per_batch),
"单次费用上限": str(self._max_points_per_batch),
"当前奖池": str(pool_balance),
"单次费用": str(self._point_fee),
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
@@ -112,8 +139,8 @@ class WPSAlchemyGame(WPSAPI):
{
"title": "积分炼金",
"description": (
f"`炼金 <积分>` 消耗积分尝试炼金,单次上限 {self._max_points_per_batch}"
"结果与运势、阶段概率表 `_PHASE_TABLE` 相关"
f"`炼金 <次数>` 按次扣除费用(每次 {self._point_fee} 分)"
"所有费用注入奖池,再依据挡位概率领取奖池百分比奖励"
),
"icon": "💎",
},
@@ -281,10 +308,11 @@ class WPSAlchemyGame(WPSAPI):
def dependencies(self) -> List[type]:
return [WPSAPI, WPSBackpackSystem, WPSConfigAPI, WPSFortuneSystem, WPSStoreSystem]
@override
def register_db_model(self) -> DatabaseModel:
"""注册炼金记录数据库表"""
return DatabaseModel(
_POOL_TABLE = "alchemy_reward_pool"
def register_db_model(self) -> DatabaseModel | List[DatabaseModel]:
"""注册炼金记录数据库表与奖池表"""
records_model = DatabaseModel(
table_name="alchemy_records",
column_defs={
"alchemy_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
@@ -299,6 +327,15 @@ class WPSAlchemyGame(WPSAPI):
"scheduled_task_id": "INTEGER",
},
)
pool_model = DatabaseModel(
table_name=self._POOL_TABLE,
column_defs={
"pool_id": "INTEGER PRIMARY KEY",
"balance": "INTEGER NOT NULL",
"updated_at": "TEXT NOT NULL",
},
)
return [records_model, pool_model]
@override
def wake_up(self) -> None:
@@ -309,6 +346,7 @@ class WPSAlchemyGame(WPSAPI):
self.register_plugin("alchemy")
self.register_plugin("炼金")
self._register_alchemy_items()
self._ensure_reward_pool()
# 恢复过期炼金
self.recover_overdue_alchemy()
@@ -454,6 +492,84 @@ class WPSAlchemyGame(WPSAPI):
"fail": [self._recipes[key] for key in fail_keys],
}
@staticmethod
def _parse_identifier_collection(raw: object) -> Set[str]:
if raw is None:
return set()
items: Iterable[str]
if isinstance(raw, str):
stripped = raw.strip()
if not stripped:
return set()
try:
parsed = json.loads(stripped)
if isinstance(parsed, (list, tuple, set)):
items = [str(x) for x in parsed]
else:
items = [str(parsed)]
except json.JSONDecodeError:
items = [token.strip() for token in stripped.split(",")]
elif isinstance(raw, Iterable):
items = [str(item) for item in raw]
else:
items = [str(raw)]
normalized = {
item.strip().lower()
for item in items
if isinstance(item, str) and item.strip()
}
return normalized
def _ensure_reward_pool(self) -> None:
cursor = get_db().conn.cursor()
cursor.execute(
f"SELECT balance FROM {self._POOL_TABLE} WHERE pool_id = 1"
)
row = cursor.fetchone()
if row is None:
now = datetime.now().isoformat()
cursor.execute(
f"INSERT INTO {self._POOL_TABLE} (pool_id, balance, updated_at) VALUES (1, ?, ?)",
(self._pool_initial_balance, now),
)
get_db().conn.commit()
self._pool_balance_cache = self._pool_initial_balance
elif self._pool_balance_cache is None:
self._pool_balance_cache = int(row["balance"])
def _get_reward_pool_balance(self, *, refresh: bool = False) -> int:
if refresh or self._pool_balance_cache is None:
self._ensure_reward_pool()
cursor = get_db().conn.cursor()
cursor.execute(
f"SELECT balance FROM {self._POOL_TABLE} WHERE pool_id = 1"
)
row = cursor.fetchone()
balance = int(row["balance"]) if row else 0
self._pool_balance_cache = balance
return self._pool_balance_cache or 0
def _update_reward_pool(self, delta: int) -> int:
self._ensure_reward_pool()
cursor = get_db().conn.cursor()
cursor.execute(
f"UPDATE {self._POOL_TABLE} SET balance = balance + ?, updated_at = ? WHERE pool_id = 1",
(delta, datetime.now().isoformat()),
)
get_db().conn.commit()
return self._get_reward_pool_balance(refresh=True)
def _set_reward_pool(self, balance: int) -> int:
self._ensure_reward_pool()
cursor = get_db().conn.cursor()
cursor.execute(
f"UPDATE {self._POOL_TABLE} SET balance = ?, updated_at = ? WHERE pool_id = 1",
(balance, datetime.now().isoformat()),
)
get_db().conn.commit()
self._pool_balance_cache = balance
return balance
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
@@ -474,9 +590,9 @@ class WPSAlchemyGame(WPSAPI):
return await self.send_markdown_message(response, chat_id, user_id)
if len(tokens) == 1 and tokens[0].isdigit():
points = int(tokens[0])
times = int(tokens[0])
response = await self._handle_point_alchemy(
chat_id, user_id, points
chat_id, user_id, times
)
return await self.send_markdown_message(response, chat_id, user_id)
@@ -500,30 +616,43 @@ class WPSAlchemyGame(WPSAPI):
)
async def _handle_point_alchemy(
self, chat_id: int, user_id: int, points: int
self, chat_id: int, user_id: int, times: int
) -> str:
if points <= 0:
return "投入积分必须大于 0"
if times <= 0:
return "炼金次数必须大于 0"
if times > self.MAX_BATCH_TIMES:
return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES}"
# 检查冷却
is_on_cooldown, cooldown_msg = self._check_cooldown(user_id)
if is_on_cooldown:
return cooldown_msg
total_cost = self._point_fee * times
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
current_points = config_api.get_user_points(user_id)
if points > self._max_points_per_batch:
return f"❌ 单次炼金积分不能超过 {self._max_points_per_batch}"
if current_points < points:
return f"❌ 积分不足,需要 {points} 分,当前仅有 {current_points}"
if total_cost > self._max_points_per_batch:
return f"❌ 单次炼金总费用不能超过 {self._max_points_per_batch}"
if current_points < total_cost:
return (
f"❌ 积分不足,需要 {total_cost} 分,当前仅有 {current_points}"
)
# 扣除积分
await config_api.adjust_user_points(
chat_id, user_id, -points, "炼金消耗"
chat_id, user_id, -total_cost, "炼金消耗"
)
# 奖池增加投入
pool_balance = self._update_reward_pool(total_cost)
# 创建炼金记录
input_data = {"type": "point", "points": points}
input_data = {
"type": "point",
"times": times,
"cost_per_attempt": self._point_fee,
"pool_deposit": total_cost,
}
alchemy_id = self._create_alchemy_record(
user_id, chat_id, "point", input_data
)
@@ -559,12 +688,29 @@ class WPSAlchemyGame(WPSAPI):
time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟"
reward_probability_line = ""
slot_brief_line = ""
if self._reward_slot_config:
reward_probability_line = (
f"- 有奖励挡位总概率:{self._reward_total_probability:.0%}\n"
)
preview = ", ".join(
f"{percent:.0f}%{probability:.2%}"
for percent, probability in self._reward_slot_config[:5]
)
if preview:
slot_brief_line = f"- 挡位示例:{preview}{'' if len(self._reward_slot_config) > 5 else ''}\n"
return (
f"# ⚗️ 炼金开始{debug_hint}\n"
f"- 类型:积分炼金\n"
f"- 投入积分`{points}`\n"
f"- 炼金次数`{times}`(每次费用 {self._point_fee} 分)\n"
f"- 本次消耗:`{total_cost}` 分\n"
f"- 当前奖池:`{pool_balance}` 分\n"
f"- 预计耗时:{time_str}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"{reward_probability_line}"
f"{slot_brief_line}"
f"- 状态:炼金进行中..."
)
@@ -579,6 +725,43 @@ class WPSAlchemyGame(WPSAPI):
return multiplier, label, text
return self._PHASE_TABLE[-1][1:]
def _draw_reward_slot(
self, fortune_value: float
) -> Tuple[Optional[float], float]:
if not self._reward_slot_config:
return None, 0.0
offset = fortune_value * self._fortune_coeff
landing = clamp01(random.random() + offset)
cumulative = 0.0
for percent, probability in self._reward_slot_config:
cumulative += probability
if landing <= cumulative:
return percent, probability
return None, max(0.0, 1.0 - cumulative)
@staticmethod
def _calculate_slot_reward(pool_balance: int, percent: float) -> int:
if percent <= 0 or pool_balance <= 0:
return 0
reward = pool_balance * (percent / 100.0)
return int(reward)
def _should_drop_common_bonus(self) -> bool:
if self._common_bonus_rate <= 0:
return False
return random.random() < self._common_bonus_rate
def _draw_common_bonus_item(
self, backpack: WPSBackpackSystem
) -> Optional[BackpackItemDefinition]:
candidates = backpack.get_items_by_tier(
BackpackItemTier.COMMON,
blacklist=self._common_bonus_blacklist,
)
if not candidates:
return None
return random.choice(candidates)
async def _handle_item_alchemy(
self,
chat_id: int,
@@ -809,41 +992,103 @@ class WPSAlchemyGame(WPSAPI):
try:
if alchemy_type == "point":
# 积分炼金结算
points = input_data["points"]
multiplier, phase_label, phase_text = self._draw_point_multiplier(
fortune_value
)
reward = int(points * multiplier)
if reward:
config_api.adjust_user_points_sync(
user_id, reward, f"炼金收益({phase_label}"
)
# 积分炼金结算(奖池模式)
times = int(input_data.get("times", 1))
cost_per_attempt = int(input_data.get("cost_per_attempt", self._point_fee))
total_cost = cost_per_attempt * times
pool_balance = self._get_reward_pool_balance(refresh=True)
slot_counter: Dict[float, int] = defaultdict(int)
empty_count = 0
total_reward_points = 0
ash_reward = 0
if multiplier == 0.0:
ash_reward = min(points // 10, 99)
if ash_reward > 0:
backpack.add_item(user_id, self.ASH_ITEM_ID, ash_reward)
bonus_items: Dict[str, int] = {}
attempt_records: List[Dict[str, Any]] = []
for _ in range(times):
slot_percent, _ = self._draw_reward_slot(fortune_value)
if slot_percent is None:
empty_count += 1
attempt_records.append({"slot": None, "reward": 0})
continue
slot_counter[slot_percent] += 1
reward_points = self._calculate_slot_reward(pool_balance, slot_percent)
if reward_points > 0:
pool_balance = self._update_reward_pool(-reward_points)
total_reward_points += reward_points
config_api.adjust_user_points_sync(
user_id,
reward_points,
f"炼金奖池奖励({slot_percent:.0f}%挡位)",
)
attempt_record: Dict[str, Any] = {
"slot": slot_percent,
"reward": reward_points,
}
if slot_percent <= 0:
backpack.add_item(user_id, self.ASH_ITEM_ID, 1)
ash_reward += 1
attempt_record["ash"] = True
if slot_percent < 50 and self._should_drop_common_bonus():
bonus_item = self._draw_common_bonus_item(backpack)
if bonus_item is not None:
backpack.add_item(user_id, bonus_item.item_id, 1)
bonus_items[bonus_item.item_id] = bonus_items.get(bonus_item.item_id, 0) + 1
attempt_record["bonus_item"] = bonus_item.item_id
attempt_records.append(attempt_record)
final_points = config_api.get_user_points(user_id)
extra_line = ""
if ash_reward > 0:
extra_line = f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`\n"
slot_lines: List[str] = []
if slot_counter:
for percent in sorted(slot_counter.keys()):
slot_lines.append(
f" - {percent:.0f}% 挡位:{slot_counter[percent]}"
)
if empty_count > 0:
slot_lines.append(f" - 空白挡位:{empty_count}")
message_lines.extend([
f"- 投入积分`{points}`\n",
f"- 结果:{phase_text}\n",
f"- 获得倍率:×{multiplier:.1f},返还 `+{reward}` \n",
extra_line,
f"- 当前积分:`{final_points}`",
f"- 炼金次数`{times}`(单次扣除 {cost_per_attempt} 分)\n",
f"- 本次总消耗:`{total_cost}` 分\n",
f"- 奖池剩余:`{pool_balance}` 分\n",
f"- 总计返还:`+{total_reward_points}` 分\n",
f"- 当前积分:`{final_points}`\n",
])
if slot_lines:
message_lines.append("- 挡位统计:\n" + "\n".join(slot_lines))
if ash_reward > 0:
message_lines.append(
f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`"
)
if bonus_items:
bonus_lines: List[str] = []
for item_id, count in bonus_items.items():
try:
definition = backpack._get_definition(item_id)
item_name = definition.name
except Exception:
item_name = item_id
bonus_lines.append(f" - {item_name} × {count}")
message_lines.append("\n- 普通物品掉落:\n" + "\n".join(bonus_lines))
result_data = {
"multiplier": multiplier,
"reward_points": reward,
"times": times,
"total_cost": total_cost,
"total_reward": total_reward_points,
"slots": {str(percent): count for percent, count in slot_counter.items()},
"empty": empty_count,
"ash_reward": ash_reward,
"bonus_items": bonus_items,
"pool_balance": pool_balance,
"attempts": attempt_records,
}
elif alchemy_type == "item":
@@ -988,6 +1233,7 @@ class WPSAlchemyGame(WPSAPI):
return (
"# ⚗️ 炼金状态\n"
"- 状态:无进行中的炼金\n"
f"- 当前奖池:`{self._get_reward_pool_balance(refresh=True)}` 分\n"
"- 可以开始新的炼金"
)
@@ -1004,14 +1250,34 @@ class WPSAlchemyGame(WPSAPI):
remaining_minutes = int(remaining.total_seconds() / 60) + 1
remaining_str = f"{remaining_minutes} 分钟"
return (
"# ⚗️ 炼金状态\n"
f"- 状态:进行中\n"
f"- 类型:{alchemy_type_name}\n"
f"- 开始时间:{start_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 剩余时间:{remaining_str}"
lines = [
"# ⚗️ 炼金状态",
"- 状态:进行中",
f"- 类型:{alchemy_type_name}",
f"- 开始时间:{start_time.strftime('%Y-%m-%d %H:%M')}",
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}",
f"- 剩余时间:{remaining_str}",
]
if alchemy_type == "point":
try:
input_data = json.loads(record["input_data"])
except Exception:
input_data = {}
times = int(input_data.get("times", 1))
cost_per_attempt = int(input_data.get("cost_per_attempt", self._point_fee))
total_cost = cost_per_attempt * times
lines.extend(
[
f"- 计划次数:{times}",
f"- 单次费用:{cost_per_attempt}",
f"- 总投入:{total_cost}",
]
)
lines.append(
f"- 当前奖池:`{self._get_reward_pool_balance(refresh=True)}` 分"
)
return "\n".join(lines)
def recover_overdue_alchemy(self) -> None:
"""恢复过期但未结算的炼金"""
@@ -1048,206 +1314,10 @@ class WPSAlchemyGame(WPSAPI):
def _help_message(self) -> str:
return (
"# ⚗️ 炼金指令帮助\n"
f"- `炼金 <积分>`投入积分尝试炼金(单次最多 {self._max_points_per_batch} 分)\n"
f"- `炼金 <次数>`按次数扣费参与积分炼金(每次 {self._point_fee} 分,单次总费用不超过 {self._max_points_per_batch} 分)\n"
"- `炼金 <材料1> <材料2> <材料3> [次数]`:使用三件材料进行炼金(可选次数,默认 1\n"
"- `炼金 状态`:查询当前炼金状态\n"
"> 建议提前备足材料及积分,谨慎开启炼金流程。炼金需要等待一定时间后才会获得结果"
)
class WPSAlchemyRecipeLookup(WPSAPI):
def __init__(self) -> None:
super().__init__()
self._alchemy: Optional[WPSAlchemyGame] = None
self._backpack: Optional[WPSBackpackSystem] = None
def get_guide_subtitle(self) -> str:
return "查询指定物品涉及的炼金配方"
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "炼金配方",
"identifier": "炼金配方",
"description": "展示物品作为材料、成功产物或失败产物的所有配方。",
"metadata": {"别名": "alchemy_recipe"},
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "查询格式",
"description": "`炼金配方 <物品ID>` 或 `炼金配方 <物品名称>`,忽略大小写。",
},
{
"title": "输出结构",
"description": "结果按材料/成功/失败三类分组,列出配方材料与成功率。",
},
)
def dependencies(self) -> List[type]:
return [WPSAlchemyGame, WPSBackpackSystem]
def is_enable_plugin(self) -> bool:
return True
def wake_up(self) -> None:
self._alchemy = Architecture.Get(WPSAlchemyGame)
self._backpack = Architecture.Get(WPSBackpackSystem)
self.register_plugin("炼金配方")
self.register_plugin("alchemy_recipe")
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSAlchemyRecipeLookup 插件已加载{ConsoleFrontColor.RESET}",
)
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message(self._help_text(), chat_id, user_id)
backpack = self._backpack or Architecture.Get(WPSBackpackSystem)
definition = self._resolve_definition(payload, backpack)
if definition is None:
return await self.send_markdown_message(
f"❌ 未找到物品 `{payload}`,请确认输入的物品 ID 或名称。",
chat_id,
user_id,
)
alchemy = self._alchemy or Architecture.Get(WPSAlchemyGame)
material_recipes = alchemy.get_recipes_using_item(definition.item_id)
produce_map = alchemy.get_recipes_producing_item(definition.item_id)
success_recipes = produce_map["success"]
fail_recipes = produce_map["fail"]
message_text = self._format_markdown(
definition,
material_recipes,
success_recipes,
fail_recipes,
backpack,
)
return await self.send_markdown_message(message_text, chat_id, user_id)
def _resolve_definition(
self, identifier: str, backpack: WPSBackpackSystem
) -> Optional[BackpackItemDefinition]:
lowered = identifier.strip().lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(lowered, lowered),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier.strip()
try:
return backpack._get_definition(item_id) # noqa: SLF001
except Exception:
return None
def _format_markdown(
self,
target: BackpackItemDefinition,
material_recipes: List[AlchemyRecipe],
success_recipes: List[AlchemyRecipe],
fail_recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
) -> str:
lines = [
f"# 🔍 炼金配方查询|{target.name}",
f"- 物品 ID`{target.item_id}`",
"---",
]
lines.append("## 作为配方材料")
lines.extend(
self._format_recipe_entries(material_recipes, backpack)
or ["- 暂无记录"]
)
lines.append("\n## 作为成功产物")
lines.extend(
self._format_recipe_entries(success_recipes, backpack, role="success")
or ["- 暂无记录"]
)
lines.append("\n## 作为失败产物")
lines.extend(
self._format_recipe_entries(fail_recipes, backpack, role="fail")
or ["- 暂无记录"]
)
return "\n".join(lines)
def _format_recipe_entries(
self,
recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
*,
role: str = "material",
) -> List[str]:
if not recipes:
return []
entries: List[str] = []
for recipe in recipes:
materials = self._summarize_materials(recipe, backpack)
success_name = self._resolve_item_name(recipe.success_item_id, backpack)
fail_name = self._resolve_item_name(recipe.fail_item_id, backpack)
rate = f"{recipe.base_success_rate:.0%}"
if role == "material":
entry = (
f"- 材料:{materials}|成功产物:`{success_name}`"
f"失败产物:`{fail_name}`|成功率:{rate}"
)
elif role == "success":
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"失败产物:`{fail_name}`"
)
else:
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"成功产物:`{success_name}`"
)
entries.append(entry)
return entries
def _summarize_materials(
self, recipe: AlchemyRecipe, backpack: WPSBackpackSystem
) -> str:
counter = Counter(recipe.materials)
parts: List[str] = []
for item_id, count in sorted(counter.items()):
name = self._resolve_item_name(item_id, backpack)
if count == 1:
parts.append(f"`{name}`")
else:
parts.append(f"`{name}` × {count}")
return " + ".join(parts)
def _resolve_item_name(
self, item_id: str, backpack: WPSBackpackSystem
) -> str:
try:
definition = backpack._get_definition(item_id) # noqa: SLF001
return definition.name
except Exception:
return item_id
def _help_text(self) -> str:
return (
"# ⚗️ 炼金配方查询帮助\n"
"- `炼金配方 <物品ID>`\n"
"- `炼金配方 <物品名称>`\n"
"> 输入需要精确匹配注册物品,名称不区分大小写。"
"> 积分炼金费用将进入奖池,根据挡位概率抽取奖池奖励;部分挡位会额外掉落普通物品"
)

View File

@@ -2,8 +2,8 @@ from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional, Sequence, override
from PWF.Convention.Runtime.Config import *
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
@@ -174,6 +174,28 @@ class WPSBackpackSystem(WPSAPI):
return ()
return tuple(self._item_cache.values())
def get_items_by_tier(
self,
tier: BackpackItemTier,
*,
blacklist: Optional[Collection[str]] = None,
) -> List[BackpackItemDefinition]:
normalized_blacklist: Set[str] = set()
if blacklist:
normalized_blacklist = {
str(item_id).strip().lower()
for item_id in blacklist
if str(item_id).strip()
}
items: List[BackpackItemDefinition] = []
for definition in self._iter_registered_items():
if definition.tier != tier:
continue
if normalized_blacklist and definition.item_id.lower() in normalized_blacklist:
continue
items.append(definition)
return items
def collect_additional_sections(self) -> Sequence[GuideSection]:
sections = list(super().collect_additional_sections())
item_entries: List[GuideEntry] = []