Files
NewWPSBot/Plugins/WPSGardenSystem/garden_service.py
2025-11-15 17:06:12 +08:00

515 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Garden service handling planting, harvesting, stealing, and selling."""
from __future__ import annotations
import json
import random
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
from PWF.CoreModules.plugin_interface import PluginInterface
from PWF.CoreModules.flags import get_internal_debug
from pydantic import BaseModel
from .garden_models import (
GARDEN_CROPS,
GARDEN_FRUITS,
GARDEN_MISC_ITEMS,
GARDEN_TRAPS_DICT,
GardenCropDefinition,
GardenTrapDefinition,
get_garden_db_models,
)
Timestamp = str
def _local_now() -> datetime:
return datetime.now()
def _parse_local_iso(ts: str) -> datetime:
return datetime.fromisoformat(ts)
class GardenConfig(BaseModel):
max_plots: int
sale_multiplier: int
fortune_coeff: float
theft_threshold_ratio: float
seed_store_limit: int
crops_config_path: str
class Config:
allow_mutation = False
@classmethod
def load(cls) -> "GardenConfig":
project_config: ProjectConfig = Architecture.Get(ProjectConfig)
max_plots = int(project_config.FindItem("garden_max_plots_per_user", 4))
sale_multiplier = int(project_config.FindItem("garden_sale_multiplier", 10))
fortune_coeff = float(project_config.FindItem("garden_fortune_coeff", 0.03))
theft_ratio = float(project_config.FindItem("garden_theft_threshold_ratio", 0.5))
seed_store_limit = int(project_config.FindItem("garden_seed_store_limit", 5))
crops_config_path = str(project_config.FindItem("garden_crops_config_path", "Plugins/garden_crops.json"))
project_config.SaveProperties()
return cls(
max_plots=max_plots,
sale_multiplier=sale_multiplier,
fortune_coeff=fortune_coeff,
theft_threshold_ratio=theft_ratio,
seed_store_limit=seed_store_limit,
crops_config_path=crops_config_path,
)
class GardenService:
def __init__(self) -> None:
self._config = GardenConfig.load()
self._db = get_db()
self._logger: ProjectConfig = Architecture.Get(ProjectConfig)
@property
def config(self) -> GardenConfig:
return self._config
# region Query helpers
def list_plots(self, user_id: int) -> List[Dict[str, object]]:
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT * FROM garden_plots WHERE user_id = ? ORDER BY plot_index ASC",
(user_id,),
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_plot(self, user_id: int, plot_index: int) -> Optional[Dict[str, object]]:
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT * FROM garden_plots WHERE user_id = ? AND plot_index = ?",
(user_id, plot_index),
)
row = cursor.fetchone()
return dict(row) if row else None
# endregion
# region Planting
def plant(
self,
*,
user_id: int,
chat_id: int,
seed_id: str,
plot_index: Optional[int] = None,
register_callback: Optional[
Tuple[PluginInterface, str]
] = None,
) -> Tuple[int, datetime]:
crop = GARDEN_CROPS.get(seed_id)
if not crop:
raise ValueError("未知的种子")
plots = self.list_plots(user_id)
used_indices = {int(plot["plot_index"]) for plot in plots}
if len(used_indices) >= self._config.max_plots:
raise ValueError("没有空闲土地")
if plot_index is None:
for idx in range(1, self._config.max_plots + 1):
if idx not in used_indices:
plot_index = idx
break
if plot_index is None:
raise ValueError("无法分配地块")
planted_at = _local_now()
mature_at = planted_at + timedelta(minutes=crop.growth_minutes)
debug_mode = get_internal_debug()
if debug_mode:
mature_at = planted_at
cursor = self._db.conn.cursor()
cursor.execute(
"""
INSERT INTO garden_plots (
user_id, chat_id, plot_index, seed_id, seed_quality, planted_at, mature_at,
is_mature, base_yield, extra_type, extra_payload, remaining_fruit, theft_users, scheduled_task_id
) VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?, ?, NULL)
""",
(
user_id,
chat_id,
plot_index,
crop.seed_id,
crop.tier,
planted_at.isoformat(),
mature_at.isoformat(),
crop.base_yield,
crop.extra_reward.kind,
json.dumps(crop.extra_reward.payload) if crop.extra_reward else None,
crop.base_yield,
json.dumps([]),
),
)
self._db.conn.commit()
task_id = None
if register_callback:
plugin, callback_name = register_callback
delay_ms = 0 if debug_mode else int(crop.growth_minutes * 60 * 1000)
task_id = plugin.register_clock(
getattr(plugin, callback_name),
delay_ms,
kwargs={"user_id": user_id, "chat_id": chat_id, "plot_index": plot_index},
)
cursor.execute(
"UPDATE garden_plots SET scheduled_task_id = ? WHERE user_id = ? AND plot_index = ?",
(task_id, user_id, plot_index),
)
self._db.conn.commit()
return plot_index, mature_at.isoformat()
def mark_mature(self, user_id: int, plot_index: int) -> None:
cursor = self._db.conn.cursor()
cursor.execute(
"UPDATE garden_plots SET is_mature = 1, scheduled_task_id = NULL WHERE user_id = ? AND plot_index = ?",
(user_id, plot_index),
)
self._db.conn.commit()
# endregion
# region Harvest
def harvest(self, *, user_id: int, plot_index: int, fortune_value: float) -> Dict[str, object]:
plot = self.get_plot(user_id, plot_index)
if not plot:
raise ValueError("指定地块不存在")
if int(plot["is_mature"]) != 1:
raise ValueError("作物尚未成熟")
crop = GARDEN_CROPS.get(plot["seed_id"])
if not crop:
raise ValueError("未知作物")
initial_yield = int(plot["base_yield"])
remaining_fruit = int(plot["remaining_fruit"])
if remaining_fruit < 0:
remaining_fruit = 0
extra_reward = None
if crop.extra_reward:
base_rate = crop.extra_reward.base_rate
probability = max(
0.0,
min(1.0, base_rate + fortune_value * self._config.fortune_coeff),
)
if random.random() <= probability:
if crop.extra_reward.kind == "points":
data = crop.extra_reward.payload
max_points = min(
data.get("max", initial_yield * crop.seed_price),
crop.seed_price * self._config.sale_multiplier,
)
min_points = data.get("min", 0)
if max_points > 0:
amount = random.randint(min_points, max_points)
extra_reward = {"type": "points", "amount": amount}
elif crop.extra_reward.kind == "item" and crop.extra_item_id:
data = crop.extra_reward.payload
min_qty = max(0, data.get("min", 0))
max_qty = max(min_qty, data.get("max", min_qty))
if max_qty > 0:
quantity = random.randint(min_qty, max_qty)
extra_reward = {
"type": "item",
"item_id": crop.extra_item_id,
"quantity": quantity,
}
result = {
"crop": crop,
"base_yield": remaining_fruit,
"extra": extra_reward,
}
result["initial_yield"] = initial_yield
cursor = self._db.conn.cursor()
cursor.execute(
"DELETE FROM garden_plots WHERE user_id = ? AND plot_index = ?",
(user_id, plot_index),
)
self._db.conn.commit()
return result
def clear_plot(self, *, user_id: int, plot_index: int) -> bool:
cursor = self._db.conn.cursor()
cursor.execute(
"DELETE FROM garden_plots WHERE user_id = ? AND plot_index = ?",
(user_id, plot_index),
)
deleted = cursor.rowcount > 0
if deleted:
self._db.conn.commit()
return deleted
# endregion
# region Steal
def _is_theft_banned(self, user_id: int) -> Tuple[bool, Optional[str]]:
"""检查用户是否被禁止偷盗
Returns:
(是否被禁止, 如果被禁止则返回解封时间字符串否则为None)
"""
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT banned_until FROM garden_theft_ban WHERE user_id = ?",
(user_id,),
)
row = cursor.fetchone()
if not row:
return False, None
banned_until_str = row["banned_until"]
banned_until = _parse_local_iso(banned_until_str)
now = _local_now()
if banned_until > now:
return True, banned_until_str
else:
# 已过期,删除记录
cursor.execute("DELETE FROM garden_theft_ban WHERE user_id = ?", (user_id,))
self._db.conn.commit()
return False, None
def _ban_theft(self, user_id: int, ban_hours: int) -> str:
"""禁止用户偷盗一定时间
Returns:
解封时间字符串
"""
banned_until = _local_now() + timedelta(hours=ban_hours)
banned_until_str = banned_until.isoformat()
cursor = self._db.conn.cursor()
cursor.execute(
"""
INSERT INTO garden_theft_ban (user_id, banned_until)
VALUES (?, ?)
ON CONFLICT(user_id) DO UPDATE SET banned_until = excluded.banned_until
""",
(user_id, banned_until_str),
)
self._db.conn.commit()
return banned_until_str
def _check_trap(self, plot: Dict[str, object], thief_id: int) -> Optional[Dict[str, object]]:
"""检查并触发陷阱
Returns:
如果触发陷阱返回陷阱信息字典否则返回None
"""
trap_item_id = plot.get("trap_item_id")
if not trap_item_id:
return None
# 检查陷阱耐久度
trap_durability = int(plot.get("trap_durability", 0))
if trap_durability <= 0:
return None
trap = GARDEN_TRAPS_DICT.get(trap_item_id)
if not trap:
return None
# 检查触发概率
if random.random() > trap.trigger_rate:
return None
# 触发陷阱:设置禁令
banned_until_str = self._ban_theft(thief_id, trap.ban_hours)
# 减少耐久度
new_durability = trap_durability - 1
user_id = int(plot["user_id"])
plot_index = int(plot["plot_index"])
cursor = self._db.conn.cursor()
if new_durability <= 0:
# 耐久度归零,移除陷阱
cursor.execute(
"""
UPDATE garden_plots SET trap_item_id = NULL, trap_config = NULL, trap_durability = 0
WHERE user_id = ? AND plot_index = ?
""",
(user_id, plot_index),
)
else:
# 更新耐久度
cursor.execute(
"""
UPDATE garden_plots SET trap_durability = ?
WHERE user_id = ? AND plot_index = ?
""",
(new_durability, user_id, plot_index),
)
self._db.conn.commit()
return {
"trap": trap,
"fine_points": trap.fine_points,
"ban_hours": trap.ban_hours,
"banned_until": banned_until_str,
"durability": new_durability,
"durability_exhausted": new_durability <= 0,
"trigger_message": trap.trigger_message.format(
fine=trap.fine_points,
hours=trap.ban_hours,
),
}
def steal(self, *, thief_id: int, owner_id: int, plot_index: int) -> Dict[str, object]:
plot = self.get_plot(owner_id, plot_index)
if not plot:
raise ValueError("目标地块不存在")
if int(plot["is_mature"]) != 1:
raise ValueError("目标作物尚未成熟")
# 检查是否被禁止偷盗
is_banned, banned_until_str = self._is_theft_banned(thief_id)
if is_banned:
banned_until = _parse_local_iso(banned_until_str)
now = _local_now()
remaining_hours = (banned_until - now).total_seconds() / 3600
raise ValueError(f"你已被禁止偷盗,解封时间:{self.format_display_time(banned_until_str)}(剩余约{int(remaining_hours)}小时)")
crop = GARDEN_CROPS.get(plot["seed_id"])
if not crop:
raise ValueError("未知作物")
remaining = int(plot["remaining_fruit"])
threshold = int(round(int(plot["base_yield"]) * self._config.theft_threshold_ratio))
if remaining <= threshold:
raise ValueError("果实剩余不足,无法偷取")
theft_users = set(json.loads(plot["theft_users"]))
if thief_id in theft_users:
raise ValueError("你已经偷取过该作物")
# 检查陷阱(在偷盗之前检查)
trap_result = self._check_trap(plot, thief_id)
theft_users.add(thief_id)
remaining -= 1
cursor = self._db.conn.cursor()
cursor.execute(
"""
UPDATE garden_plots SET remaining_fruit = ?, theft_users = ?
WHERE user_id = ? AND plot_index = ?
""",
(
remaining,
json.dumps(list(theft_users)),
owner_id,
plot_index,
),
)
self._db.conn.commit()
result = {
"crop": crop,
"stolen_quantity": 1,
"remaining": remaining,
"chat_id": plot["chat_id"],
"trap_result": trap_result,
}
return result
# endregion
# region Trap
def place_trap(self, *, user_id: int, plot_index: int, trap_item_id: str) -> None:
"""在地块上放置陷阱"""
plot = self.get_plot(user_id, plot_index)
if not plot:
raise ValueError("目标地块不存在")
trap = GARDEN_TRAPS_DICT.get(trap_item_id)
if not trap:
raise ValueError("未知陷阱物品")
# 构建陷阱配置JSON格式
trap_config = json.dumps({
"item_id": trap.item_id,
"display_name": trap.display_name,
"tier": trap.tier,
})
cursor = self._db.conn.cursor()
cursor.execute(
"""
UPDATE garden_plots SET trap_item_id = ?, trap_config = ?, trap_durability = ?
WHERE user_id = ? AND plot_index = ?
""",
(trap_item_id, trap_config, trap.durability, user_id, plot_index),
)
self._db.conn.commit()
def remove_trap(self, *, user_id: int, plot_index: int) -> None:
"""移除地块上的陷阱"""
plot = self.get_plot(user_id, plot_index)
if not plot:
raise ValueError("目标地块不存在")
cursor = self._db.conn.cursor()
cursor.execute(
"""
UPDATE garden_plots SET trap_item_id = NULL, trap_config = NULL, trap_durability = 0
WHERE user_id = ? AND plot_index = ?
""",
(user_id, plot_index),
)
self._db.conn.commit()
# endregion
# region Selling
def sell_fruit(self, *, user_id: int, fruit_id: str, quantity: int) -> Tuple[int, int]:
crop = GARDEN_FRUITS.get(fruit_id)
if not crop:
raise ValueError("未知果实")
if quantity <= 0:
raise ValueError("数量必须大于0")
price_per = crop.seed_price * self._config.sale_multiplier
total_points = price_per * quantity
return total_points, price_per
# endregion
# region Maintenance
def recover_overdue_plots(self) -> None:
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT user_id, plot_index, mature_at, is_mature FROM garden_plots WHERE is_mature = 0",
)
rows = cursor.fetchall()
now = _local_now()
updated = 0
for row in rows:
mature_at = _parse_local_iso(row["mature_at"])
if mature_at <= now:
self.mark_mature(row["user_id"], row["plot_index"])
updated += 1
if updated:
self._logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}同步成熟地块 {updated}{ConsoleFrontColor.RESET}",
)
# endregion
# region Utilities
def format_display_time(self, iso_ts: str) -> str:
try:
dt = _parse_local_iso(iso_ts)
return dt.strftime("%Y年%m月%d%H时%M分")
except Exception:
return iso_ts
# endregion
__all__ = ["GardenService", "GardenConfig", "get_garden_db_models"]