"""Garden service handling planting, harvesting, stealing, and selling.""" from __future__ import annotations import json import random from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.database import get_db from PWF.CoreModules.plugin_interface import PluginInterface from PWF.CoreModules.flags import get_internal_debug from pydantic import BaseModel from .garden_models import ( GARDEN_CROPS, GARDEN_FRUITS, GARDEN_MISC_ITEMS, GardenCropDefinition, get_garden_db_models, ) Timestamp = str def _local_now() -> datetime: return datetime.now() def _parse_local_iso(ts: str) -> datetime: return datetime.fromisoformat(ts) class GardenConfig(BaseModel): max_plots: int sale_multiplier: int fortune_coeff: float theft_threshold_ratio: float seed_store_limit: int crops_config_path: str class Config: allow_mutation = False @classmethod def load(cls) -> "GardenConfig": project_config: ProjectConfig = Architecture.Get(ProjectConfig) max_plots = int(project_config.FindItem("garden_max_plots_per_user", 4)) sale_multiplier = int(project_config.FindItem("garden_sale_multiplier", 10)) fortune_coeff = float(project_config.FindItem("garden_fortune_coeff", 0.03)) theft_ratio = float(project_config.FindItem("garden_theft_threshold_ratio", 0.5)) seed_store_limit = int(project_config.FindItem("garden_seed_store_limit", 5)) crops_config_path = str(project_config.FindItem("garden_crops_config_path", "Plugins/garden_crops.json")) project_config.SaveProperties() return cls( max_plots=max_plots, sale_multiplier=sale_multiplier, fortune_coeff=fortune_coeff, theft_threshold_ratio=theft_ratio, seed_store_limit=seed_store_limit, crops_config_path=crops_config_path, ) class GardenService: def __init__(self) -> None: self._config = GardenConfig.load() self._db = get_db() self._logger: ProjectConfig = Architecture.Get(ProjectConfig) @property def config(self) -> GardenConfig: return self._config # region Query helpers def list_plots(self, user_id: int) -> List[Dict[str, object]]: cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM garden_plots WHERE user_id = ? ORDER BY plot_index ASC", (user_id,), ) rows = cursor.fetchall() return [dict(row) for row in rows] def get_plot(self, user_id: int, plot_index: int) -> Optional[Dict[str, object]]: cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM garden_plots WHERE user_id = ? AND plot_index = ?", (user_id, plot_index), ) row = cursor.fetchone() return dict(row) if row else None # endregion # region Planting def plant( self, *, user_id: int, chat_id: int, seed_id: str, plot_index: Optional[int] = None, register_callback: Optional[ Tuple[PluginInterface, str] ] = None, ) -> Tuple[int, datetime]: crop = GARDEN_CROPS.get(seed_id) if not crop: raise ValueError("未知的种子") plots = self.list_plots(user_id) used_indices = {int(plot["plot_index"]) for plot in plots} if len(used_indices) >= self._config.max_plots: raise ValueError("没有空闲土地") if plot_index is None: for idx in range(1, self._config.max_plots + 1): if idx not in used_indices: plot_index = idx break if plot_index is None: raise ValueError("无法分配地块") planted_at = _local_now() mature_at = planted_at + timedelta(minutes=crop.growth_minutes) debug_mode = get_internal_debug() if debug_mode: mature_at = planted_at cursor = self._db.conn.cursor() cursor.execute( """ INSERT INTO garden_plots ( user_id, chat_id, plot_index, seed_id, seed_quality, planted_at, mature_at, is_mature, base_yield, extra_type, extra_payload, remaining_fruit, theft_users, scheduled_task_id ) VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?, ?, NULL) """, ( user_id, chat_id, plot_index, crop.seed_id, crop.tier, planted_at.isoformat(), mature_at.isoformat(), crop.base_yield, crop.extra_reward.kind, json.dumps(crop.extra_reward.payload) if crop.extra_reward else None, crop.base_yield, json.dumps([]), ), ) self._db.conn.commit() task_id = None if register_callback: plugin, callback_name = register_callback delay_ms = 0 if debug_mode else int(crop.growth_minutes * 60 * 1000) task_id = plugin.register_clock( getattr(plugin, callback_name), delay_ms, kwargs={"user_id": user_id, "chat_id": chat_id, "plot_index": plot_index}, ) cursor.execute( "UPDATE garden_plots SET scheduled_task_id = ? WHERE user_id = ? AND plot_index = ?", (task_id, user_id, plot_index), ) self._db.conn.commit() return plot_index, mature_at.isoformat() def mark_mature(self, user_id: int, plot_index: int) -> None: cursor = self._db.conn.cursor() cursor.execute( "UPDATE garden_plots SET is_mature = 1, scheduled_task_id = NULL WHERE user_id = ? AND plot_index = ?", (user_id, plot_index), ) self._db.conn.commit() # endregion # region Harvest def harvest(self, *, user_id: int, plot_index: int, fortune_value: float) -> Dict[str, object]: plot = self.get_plot(user_id, plot_index) if not plot: raise ValueError("指定地块不存在") if int(plot["is_mature"]) != 1: raise ValueError("作物尚未成熟") crop = GARDEN_CROPS.get(plot["seed_id"]) if not crop: raise ValueError("未知作物") initial_yield = int(plot["base_yield"]) remaining_fruit = int(plot["remaining_fruit"]) if remaining_fruit < 0: remaining_fruit = 0 extra_reward = None if crop.extra_reward: base_rate = crop.extra_reward.base_rate probability = max( 0.0, min(1.0, base_rate + fortune_value * self._config.fortune_coeff), ) if random.random() <= probability: if crop.extra_reward.kind == "points": data = crop.extra_reward.payload max_points = min( data.get("max", initial_yield * crop.seed_price), crop.seed_price * self._config.sale_multiplier, ) min_points = data.get("min", 0) if max_points > 0: amount = random.randint(min_points, max_points) extra_reward = {"type": "points", "amount": amount} elif crop.extra_reward.kind == "item" and crop.extra_item_id: data = crop.extra_reward.payload min_qty = max(0, data.get("min", 0)) max_qty = max(min_qty, data.get("max", min_qty)) if max_qty > 0: quantity = random.randint(min_qty, max_qty) extra_reward = { "type": "item", "item_id": crop.extra_item_id, "quantity": quantity, } result = { "crop": crop, "base_yield": remaining_fruit, "extra": extra_reward, } result["initial_yield"] = initial_yield cursor = self._db.conn.cursor() cursor.execute( "DELETE FROM garden_plots WHERE user_id = ? AND plot_index = ?", (user_id, plot_index), ) self._db.conn.commit() return result def clear_plot(self, *, user_id: int, plot_index: int) -> bool: cursor = self._db.conn.cursor() cursor.execute( "DELETE FROM garden_plots WHERE user_id = ? AND plot_index = ?", (user_id, plot_index), ) deleted = cursor.rowcount > 0 if deleted: self._db.conn.commit() return deleted # endregion # region Steal def steal(self, *, thief_id: int, owner_id: int, plot_index: int) -> Dict[str, object]: plot = self.get_plot(owner_id, plot_index) if not plot: raise ValueError("目标地块不存在") if int(plot["is_mature"]) != 1: raise ValueError("目标作物尚未成熟") crop = GARDEN_CROPS.get(plot["seed_id"]) if not crop: raise ValueError("未知作物") remaining = int(plot["remaining_fruit"]) threshold = int(round(int(plot["base_yield"]) * self._config.theft_threshold_ratio)) if remaining <= threshold: raise ValueError("果实剩余不足,无法偷取") theft_users = set(json.loads(plot["theft_users"])) if thief_id in theft_users: raise ValueError("你已经偷取过该作物") theft_users.add(thief_id) remaining -= 1 cursor = self._db.conn.cursor() cursor.execute( """ UPDATE garden_plots SET remaining_fruit = ?, theft_users = ? WHERE user_id = ? AND plot_index = ? """, ( remaining, json.dumps(list(theft_users)), owner_id, plot_index, ), ) self._db.conn.commit() return { "crop": crop, "stolen_quantity": 1, "remaining": remaining, "chat_id": plot["chat_id"], } # endregion # region Selling def sell_fruit(self, *, user_id: int, fruit_id: str, quantity: int) -> Tuple[int, int]: crop = GARDEN_FRUITS.get(fruit_id) if not crop: raise ValueError("未知果实") if quantity <= 0: raise ValueError("数量必须大于0") price_per = crop.seed_price * self._config.sale_multiplier total_points = price_per * quantity return total_points, price_per # endregion # region Maintenance def recover_overdue_plots(self) -> None: cursor = self._db.conn.cursor() cursor.execute( "SELECT user_id, plot_index, mature_at, is_mature FROM garden_plots WHERE is_mature = 0", ) rows = cursor.fetchall() now = _local_now() updated = 0 for row in rows: mature_at = _parse_local_iso(row["mature_at"]) if mature_at <= now: self.mark_mature(row["user_id"], row["plot_index"]) updated += 1 if updated: self._logger.Log( "Info", f"{ConsoleFrontColor.GREEN}同步成熟地块 {updated} 个{ConsoleFrontColor.RESET}", ) # endregion # region Utilities def format_display_time(self, iso_ts: str) -> str: try: dt = _parse_local_iso(iso_ts) return dt.strftime("%Y年%m月%d日 %H时%M分") except Exception: return iso_ts # endregion __all__ = ["GardenService", "GardenConfig", "get_garden_db_models"]