This commit is contained in:
Slipstream 2025-05-02 19:32:50 -06:00
parent 709885a01c
commit 215f8b49be
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
7 changed files with 1314 additions and 194 deletions

345
cogs/economy/database.py Normal file
View File

@ -0,0 +1,345 @@
import aiosqlite
import os
import datetime
import logging
from typing import Optional
# Configure logging
log = logging.getLogger(__name__)
# Database path (adjust relative path)
DB_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'data')
DB_PATH = os.path.join(DB_DIR, 'economy.db')
# Ensure the data directory exists
os.makedirs(DB_DIR, exist_ok=True)
# --- Database Setup ---
async def init_db():
"""Initializes the database and creates tables if they don't exist."""
try:
async with aiosqlite.connect(DB_PATH) as db:
# Create economy table
await db.execute("""
CREATE TABLE IF NOT EXISTS economy (
user_id INTEGER PRIMARY KEY,
balance INTEGER NOT NULL DEFAULT 0
)
""")
log.info("Checked/created 'economy' table.")
# Create command_cooldowns table
await db.execute("""
CREATE TABLE IF NOT EXISTS command_cooldowns (
user_id INTEGER NOT NULL,
command_name TEXT NOT NULL,
last_used TIMESTAMP NOT NULL,
PRIMARY KEY (user_id, command_name)
)
""")
log.info("Checked/created 'command_cooldowns' table.")
# Create user_jobs table
await db.execute("""
CREATE TABLE IF NOT EXISTS user_jobs (
user_id INTEGER PRIMARY KEY,
job_name TEXT,
job_level INTEGER NOT NULL DEFAULT 1,
job_xp INTEGER NOT NULL DEFAULT 0,
last_job_action TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES economy(user_id) ON DELETE CASCADE
)
""")
log.info("Checked/created 'user_jobs' table.")
# Create items table
await db.execute("""
CREATE TABLE IF NOT EXISTS items (
item_key TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
sell_price INTEGER NOT NULL DEFAULT 0
)
""")
log.info("Checked/created 'items' table.")
# --- Add some basic items ---
initial_items = [
('raw_iron', 'Raw Iron Ore', 'Basic metal ore.', 5),
('coal', 'Coal', 'A lump of fossil fuel.', 3),
('shiny_gem', 'Shiny Gem', 'A pretty, potentially valuable gem.', 50),
('common_fish', 'Common Fish', 'A standard fish.', 4),
('rare_fish', 'Rare Fish', 'An uncommon fish.', 15),
('treasure_chest', 'Treasure Chest', 'Might contain goodies!', 0), # Sell price 0, opened via command?
('iron_ingot', 'Iron Ingot', 'Refined iron, ready for crafting.', 12),
('basic_tool', 'Basic Tool', 'A simple tool.', 25)
]
await db.executemany("""
INSERT OR IGNORE INTO items (item_key, name, description, sell_price)
VALUES (?, ?, ?, ?)
""", initial_items)
log.info("Ensured initial items exist.")
# Create user_inventory table
await db.execute("""
CREATE TABLE IF NOT EXISTS user_inventory (
user_id INTEGER NOT NULL,
item_key TEXT NOT NULL,
quantity INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (user_id, item_key),
FOREIGN KEY (user_id) REFERENCES economy(user_id) ON DELETE CASCADE,
FOREIGN KEY (item_key) REFERENCES items(item_key) ON DELETE CASCADE
)
""")
log.info("Checked/created 'user_inventory' table.")
await db.commit()
log.info(f"Economy database initialized successfully at {DB_PATH}")
except Exception as e:
log.error(f"Failed to initialize economy database at {DB_PATH}: {e}", exc_info=True)
raise # Re-raise the exception
# --- Database Helper Functions ---
async def get_balance(user_id: int) -> int:
"""Gets the balance for a user, creating an entry if needed."""
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("SELECT balance FROM economy WHERE user_id = ?", (user_id,)) as cursor:
result = await cursor.fetchone()
if result:
return result[0]
else:
try:
await db.execute("INSERT INTO economy (user_id, balance) VALUES (?, ?)", (user_id, 0))
await db.commit()
log.info(f"Created new economy entry for user_id: {user_id}")
return 0
except aiosqlite.IntegrityError:
log.warning(f"Race condition handled for user_id: {user_id} during balance fetch.")
async with db.execute("SELECT balance FROM economy WHERE user_id = ?", (user_id,)) as cursor_retry:
result_retry = await cursor_retry.fetchone()
return result_retry[0] if result_retry else 0
async def update_balance(user_id: int, amount: int):
"""Updates a user's balance by adding the specified amount (can be negative)."""
async with aiosqlite.connect(DB_PATH) as db:
# Ensure user exists first by trying to get balance
await get_balance(user_id)
await db.execute("UPDATE economy SET balance = balance + ? WHERE user_id = ?", (amount, user_id))
await db.commit()
log.debug(f"Updated balance for user_id {user_id} by {amount}.")
async def check_cooldown(user_id: int, command_name: str) -> Optional[datetime.datetime]:
"""Checks if a command is on cooldown for a user. Returns the last used time if on cooldown, else None."""
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("SELECT last_used FROM command_cooldowns WHERE user_id = ? AND command_name = ?", (user_id, command_name)) as cursor:
result = await cursor.fetchone()
if result:
try:
# Timestamps are stored in ISO format
last_used_dt = datetime.datetime.fromisoformat(result[0])
# Ensure it's timezone-aware (UTC) if it's not already
if last_used_dt.tzinfo is None:
last_used_dt = last_used_dt.replace(tzinfo=datetime.timezone.utc)
return last_used_dt
except ValueError:
log.error(f"Could not parse timestamp '{result[0]}' for user {user_id}, command {command_name}")
return None
else:
return None
async def set_cooldown(user_id: int, command_name: str):
"""Sets or updates the cooldown timestamp for a command."""
now = datetime.datetime.now(datetime.timezone.utc)
now_iso = now.isoformat()
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("""
INSERT INTO command_cooldowns (user_id, command_name, last_used)
VALUES (?, ?, ?)
ON CONFLICT(user_id, command_name) DO UPDATE SET last_used = excluded.last_used
""", (user_id, command_name, now_iso))
await db.commit()
log.debug(f"Set cooldown for user_id {user_id}, command {command_name} to {now_iso}")
async def get_leaderboard(count: int = 10) -> list[tuple[int, int]]:
"""Retrieves the top users by balance."""
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("SELECT user_id, balance FROM economy ORDER BY balance DESC LIMIT ?", (count,)) as cursor:
results = await cursor.fetchall()
return results if results else []
# --- Job Functions ---
async def get_user_job(user_id: int) -> Optional[dict]:
"""Gets the user's job details (name, level, xp, last_action). Creates entry if needed."""
async with aiosqlite.connect(DB_PATH) as db:
# Ensure user exists in economy table first
await get_balance(user_id)
# Try to fetch job
async with db.execute("SELECT job_name, job_level, job_xp, last_job_action FROM user_jobs WHERE user_id = ?", (user_id,)) as cursor:
result = await cursor.fetchone()
if result:
last_action = None
if result[3]:
try:
last_action = datetime.datetime.fromisoformat(result[3])
if last_action.tzinfo is None:
last_action = last_action.replace(tzinfo=datetime.timezone.utc)
except ValueError:
log.error(f"Could not parse job timestamp '{result[3]}' for user {user_id}")
return {"name": result[0], "level": result[1], "xp": result[2], "last_action": last_action}
else:
# Create job entry if it doesn't exist (defaults to no job)
try:
await db.execute("INSERT INTO user_jobs (user_id, job_name, job_level, job_xp, last_job_action) VALUES (?, NULL, 1, 0, NULL)", (user_id,))
await db.commit()
log.info(f"Created default job entry for user_id: {user_id}")
return {"name": None, "level": 1, "xp": 0, "last_action": None}
except aiosqlite.IntegrityError:
log.warning(f"Race condition handled for user_id: {user_id} during job fetch.")
# Retry fetch
async with db.execute("SELECT job_name, job_level, job_xp, last_job_action FROM user_jobs WHERE user_id = ?", (user_id,)) as cursor_retry:
result_retry = await cursor_retry.fetchone()
if result_retry:
last_action_retry = None
if result_retry[3]:
try:
last_action_retry = datetime.datetime.fromisoformat(result_retry[3])
if last_action_retry.tzinfo is None:
last_action_retry = last_action_retry.replace(tzinfo=datetime.timezone.utc)
except ValueError: pass
return {"name": result_retry[0], "level": result_retry[1], "xp": result_retry[2], "last_action": last_action_retry}
else: # Should not happen after insert attempt, but handle defensively
return {"name": None, "level": 1, "xp": 0, "last_action": None}
async def set_user_job(user_id: int, job_name: Optional[str]):
"""Sets or clears a user's job. Resets level/xp if changing/leaving."""
async with aiosqlite.connect(DB_PATH) as db:
# Ensure job entry exists
await get_user_job(user_id)
# Update job, resetting level/xp
await db.execute("UPDATE user_jobs SET job_name = ?, job_level = 1, job_xp = 0 WHERE user_id = ?", (job_name, user_id))
await db.commit()
log.info(f"Set job for user_id {user_id} to {job_name}. Level/XP reset.")
async def add_job_xp(user_id: int, xp_amount: int) -> tuple[int, int, bool]:
"""Adds XP to the user's job, handles level ups. Returns (new_level, new_xp, did_level_up)."""
async with aiosqlite.connect(DB_PATH) as db:
job_info = await get_user_job(user_id)
if not job_info or not job_info.get("name"):
log.warning(f"Attempted to add XP to user {user_id} with no job.")
return (1, 0, False) # Return default values if no job
current_level = job_info["level"]
current_xp = job_info["xp"]
new_xp = current_xp + xp_amount
did_level_up = False
# --- Leveling Logic ---
xp_needed = current_level * 100 # Example: Level 1 needs 100 XP, Level 2 needs 200 XP
while new_xp >= xp_needed:
new_xp -= xp_needed
current_level += 1
xp_needed = current_level * 100 # Update for next potential level
did_level_up = True
log.info(f"User {user_id} leveled up their job to {current_level}!")
# Update database
await db.execute("UPDATE user_jobs SET job_level = ?, job_xp = ? WHERE user_id = ?", (current_level, new_xp, user_id))
await db.commit()
log.debug(f"Updated job XP for user {user_id}. New Level: {current_level}, New XP: {new_xp}")
return (current_level, new_xp, did_level_up)
async def set_job_cooldown(user_id: int):
"""Sets the job cooldown timestamp."""
now = datetime.datetime.now(datetime.timezone.utc)
now_iso = now.isoformat()
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("UPDATE user_jobs SET last_job_action = ? WHERE user_id = ?", (now_iso, user_id))
await db.commit()
log.debug(f"Set job cooldown for user_id {user_id} to {now_iso}")
# --- Item/Inventory Functions ---
async def get_item_details(item_key: str) -> Optional[dict]:
"""Gets details for a specific item."""
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("SELECT name, description, sell_price FROM items WHERE item_key = ?", (item_key,)) as cursor:
result = await cursor.fetchone()
if result:
return {"key": item_key, "name": result[0], "description": result[1], "sell_price": result[2]}
else:
return None
async def get_inventory(user_id: int) -> list[dict]:
"""Gets a user's inventory."""
inventory = []
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("""
SELECT inv.item_key, inv.quantity, i.name, i.description, i.sell_price
FROM user_inventory inv
JOIN items i ON inv.item_key = i.item_key
WHERE inv.user_id = ?
ORDER BY i.name
""", (user_id,)) as cursor:
results = await cursor.fetchall()
for row in results:
inventory.append({
"key": row[0],
"quantity": row[1],
"name": row[2],
"description": row[3],
"sell_price": row[4]
})
return inventory
async def add_item_to_inventory(user_id: int, item_key: str, quantity: int = 1):
"""Adds an item to the user's inventory."""
if quantity <= 0:
log.warning(f"Attempted to add non-positive quantity ({quantity}) of item {item_key} for user {user_id}")
return
# Check if item exists
item_details = await get_item_details(item_key)
if not item_details:
log.error(f"Attempted to add non-existent item '{item_key}' to inventory for user {user_id}")
return
async with aiosqlite.connect(DB_PATH) as db:
# Ensure user exists in economy table
await get_balance(user_id)
await db.execute("""
INSERT INTO user_inventory (user_id, item_key, quantity)
VALUES (?, ?, ?)
ON CONFLICT(user_id, item_key) DO UPDATE SET quantity = quantity + excluded.quantity
""", (user_id, item_key, quantity))
await db.commit()
log.debug(f"Added {quantity} of {item_key} to user {user_id}'s inventory.")
async def remove_item_from_inventory(user_id: int, item_key: str, quantity: int = 1) -> bool:
"""Removes an item from the user's inventory. Returns True if successful, False otherwise."""
if quantity <= 0:
log.warning(f"Attempted to remove non-positive quantity ({quantity}) of item {item_key} for user {user_id}")
return False
async with aiosqlite.connect(DB_PATH) as db:
# Check current quantity first
async with db.execute("SELECT quantity FROM user_inventory WHERE user_id = ? AND item_key = ?", (user_id, item_key)) as cursor:
result = await cursor.fetchone()
if not result or result[0] < quantity:
log.debug(f"User {user_id} does not have enough {item_key} (needs {quantity}, has {result[0] if result else 0})")
return False # Not enough items
current_quantity = result[0]
if current_quantity == quantity:
# Delete the row if quantity becomes zero
await db.execute("DELETE FROM user_inventory WHERE user_id = ? AND item_key = ?", (user_id, item_key))
else:
# Otherwise, just decrease the quantity
await db.execute("UPDATE user_inventory SET quantity = quantity - ? WHERE user_id = ? AND item_key = ?", (quantity, user_id, item_key))
await db.commit()
log.debug(f"Removed {quantity} of {item_key} from user {user_id}'s inventory.")
return True

179
cogs/economy/earning.py Normal file
View File

@ -0,0 +1,179 @@
import discord
from discord.ext import commands
import datetime
import random
import logging
from typing import Optional
# Import database functions from the sibling module
from . import database
log = logging.getLogger(__name__)
class EarningCommands(commands.Cog):
"""Cog containing currency earning commands."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.hybrid_command(name="daily", description="Claim your daily reward.")
async def daily(self, ctx: commands.Context):
"""Allows users to claim a daily currency reward."""
user_id = ctx.author.id
command_name = "daily"
cooldown_duration = datetime.timedelta(hours=24)
reward_amount = 100 # Example daily reward
last_used = await database.check_cooldown(user_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
# Ensure last_used is timezone-aware for comparison
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
hours, remainder = divmod(int(time_left.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
await ctx.send(f"You've already claimed your daily reward. Try again in **{hours}h {minutes}m {seconds}s**.", ephemeral=True)
return
# Not on cooldown or cooldown expired
await database.update_balance(user_id, reward_amount)
await database.set_cooldown(user_id, command_name)
current_balance = await database.get_balance(user_id)
await ctx.send(f"🎉 You claimed your daily reward of **${reward_amount:,}**! Your new balance is **${current_balance:,}**.")
@commands.hybrid_command(name="beg", description="Beg for some spare change.")
async def beg(self, ctx: commands.Context):
"""Allows users to beg for a small amount of currency with a chance of success."""
user_id = ctx.author.id
command_name = "beg"
cooldown_duration = datetime.timedelta(minutes=5) # 5-minute cooldown
success_chance = 0.4 # 40% chance of success
min_reward = 1
max_reward = 20
last_used = await database.check_cooldown(user_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
minutes, seconds = divmod(int(time_left.total_seconds()), 60)
await ctx.send(f"You can't beg again so soon. Try again in **{minutes}m {seconds}s**.", ephemeral=True)
return
# Set cooldown regardless of success/failure
await database.set_cooldown(user_id, command_name)
# Determine success
if random.random() < success_chance:
reward_amount = random.randint(min_reward, max_reward)
await database.update_balance(user_id, reward_amount)
current_balance = await database.get_balance(user_id)
await ctx.send(f"🙏 Someone took pity on you! You received **${reward_amount:,}**. Your new balance is **${current_balance:,}**.")
else:
await ctx.send("🤷 Nobody gave you anything. Better luck next time!")
@commands.hybrid_command(name="work", description="Do some work for a guaranteed reward.")
async def work(self, ctx: commands.Context):
"""Allows users to perform work for a small, guaranteed reward."""
user_id = ctx.author.id
command_name = "work"
cooldown_duration = datetime.timedelta(hours=1) # 1-hour cooldown
reward_amount = random.randint(15, 35) # Small reward range - This is now fallback if no job
# --- Check if user has a job ---
job_info = await database.get_user_job(user_id)
if job_info and job_info.get("name"):
job_key = job_info["name"]
# Dynamically get job details if possible (assuming JOB_DEFINITIONS might be accessible or refactored)
# For simplicity here, just use the job key. A better approach might involve a shared config or helper.
# from .jobs import JOB_DEFINITIONS # Avoid circular import if possible
# job_details = JOB_DEFINITIONS.get(job_key)
# command_to_use = job_details['command'] if job_details else f"your job command (`/{job_key}`)" # Fallback
command_to_use = f"`/{job_key}`" # Simple fallback
await ctx.send(f"You have a job! Use {command_to_use} instead of the generic `/work` command.", ephemeral=True)
return
# --- End Job Check ---
# Proceed with generic /work only if no job
last_used = await database.check_cooldown(user_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
hours, remainder = divmod(int(time_left.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
await ctx.send(f"You need to rest after working. Try again in **{hours}h {minutes}m {seconds}s**.", ephemeral=True)
return
# Set cooldown and give reward
await database.set_cooldown(user_id, command_name)
await database.update_balance(user_id, reward_amount)
# Add some flavor text
work_messages = [
f"You worked hard and earned **${reward_amount:,}**!",
f"After a solid hour of work, you got **${reward_amount:,}**.",
f"Your efforts paid off! You received **${reward_amount:,}**.",
]
current_balance = await database.get_balance(user_id)
await ctx.send(f"{random.choice(work_messages)} Your new balance is **${current_balance:,}**.")
@commands.hybrid_command(name="search", description="Search around for some spare change.")
async def search(self, ctx: commands.Context):
"""Allows users to search for a small chance of finding money."""
user_id = ctx.author.id
command_name = "search"
cooldown_duration = datetime.timedelta(minutes=30) # 30-minute cooldown
success_chance = 0.25 # 25% chance to find something
min_reward = 1
max_reward = 10
last_used = await database.check_cooldown(user_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
minutes, seconds = divmod(int(time_left.total_seconds()), 60)
await ctx.send(f"You've searched recently. Try again in **{minutes}m {seconds}s**.", ephemeral=True)
return
# Set cooldown regardless of success
await database.set_cooldown(user_id, command_name)
# Flavor text for searching
search_locations = [
"under the sofa cushions", "in an old coat pocket", "behind the dumpster",
"in a dusty corner", "on the sidewalk", "in a forgotten drawer"
]
location = random.choice(search_locations)
if random.random() < success_chance:
reward_amount = random.randint(min_reward, max_reward)
await database.update_balance(user_id, reward_amount)
current_balance = await database.get_balance(user_id)
await ctx.send(f"🔍 You searched {location} and found **${reward_amount:,}**! Your new balance is **${current_balance:,}**.")
else:
await ctx.send(f"🔍 You searched {location} but found nothing but lint.")
# No setup function needed here, it will be in __init__.py

70
cogs/economy/gambling.py Normal file
View File

@ -0,0 +1,70 @@
import discord
from discord.ext import commands
import datetime
import random
import logging
from typing import Optional
# Import database functions from the sibling module
from . import database
log = logging.getLogger(__name__)
class GamblingCommands(commands.Cog):
"""Cog containing gambling-related economy commands."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.hybrid_command(name="coinflip", aliases=["cf"], description="Gamble your money on a coin flip.")
async def coinflip(self, ctx: commands.Context, amount: int, choice: str):
"""Bets a certain amount on a coin flip (heads or tails)."""
user_id = ctx.author.id
command_name = "coinflip" # Cooldown specific to coinflip
cooldown_duration = datetime.timedelta(seconds=10) # Short cooldown
choice = choice.lower()
if choice not in ["heads", "tails", "h", "t"]:
await ctx.send("Invalid choice. Please choose 'heads' or 'tails'.", ephemeral=True)
return
if amount <= 0:
await ctx.send("Please enter a positive amount to bet.", ephemeral=True)
return
# Check cooldown
last_used = await database.check_cooldown(user_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
await ctx.send(f"You're flipping too fast! Try again in **{int(time_left.total_seconds())}s**.", ephemeral=True)
return
# Check balance
user_balance = await database.get_balance(user_id)
if user_balance < amount:
await ctx.send(f"You don't have enough money to bet that much! Your balance is **${user_balance:,}**.", ephemeral=True)
return
# Set cooldown before proceeding
await database.set_cooldown(user_id, command_name)
# Perform the coin flip
result = random.choice(["heads", "tails"])
win = (choice.startswith(result[0])) # True if choice matches result
if win:
await database.update_balance(user_id, amount) # Win the amount bet
current_balance = await database.get_balance(user_id)
await ctx.send(f"🪙 The coin landed on **{result}**! You won **${amount:,}**! Your new balance is **${current_balance:,}**.")
else:
await database.update_balance(user_id, -amount) # Lose the amount bet
current_balance = await database.get_balance(user_id)
await ctx.send(f"🪙 The coin landed on **{result}**. You lost **${amount:,}**. Your new balance is **${current_balance:,}**.")
# No setup function needed here

488
cogs/economy/jobs.py Normal file
View File

@ -0,0 +1,488 @@
import discord
from discord.ext import commands
from discord import app_commands # Required for choices/autocomplete
import datetime
import random
import logging
from typing import Optional, List
# Import database functions from the sibling module
from . import database
log = logging.getLogger(__name__)
# --- Job Definitions ---
# Store job details centrally for easier management
JOB_DEFINITIONS = {
"miner": {
"name": "Miner",
"description": "Mine for ores and gems.",
"command": "/mine",
"cooldown": datetime.timedelta(hours=1),
"base_currency": (15, 30), # Min/Max currency per action
"base_xp": 15,
"drops": { # Item Key: Chance (0.0 to 1.0)
"raw_iron": 0.6,
"coal": 0.4,
"shiny_gem": 0.05 # Lower chance for rarer items
},
"level_bonus": { # Applied per level
"currency_increase": 1, # Add +1 to min/max currency range per level
"rare_find_increase": 0.005 # Increase shiny_gem chance by 0.5% per level
}
},
"fisher": {
"name": "Fisher",
"description": "Catch fish and maybe find treasure.",
"command": "/fish",
"cooldown": datetime.timedelta(minutes=45),
"base_currency": (5, 15),
"base_xp": 10,
"drops": {
"common_fish": 0.8, # High chance for common
"rare_fish": 0.15,
"treasure_chest": 0.02
},
"level_bonus": {
"currency_increase": 0.5, # Smaller increase
"rare_find_increase": 0.003 # Increase rare_fish/treasure chance
}
},
"crafter": {
"name": "Crafter",
"description": "Use materials to craft valuable items.",
"command": "/craft",
"cooldown": datetime.timedelta(minutes=15), # Cooldown per craft action
"base_currency": (0, 0), # No direct currency
"base_xp": 20, # Higher XP for crafting
"recipes": { # Output Item Key: {Input Item Key: Quantity Required}
"iron_ingot": {"raw_iron": 2, "coal": 1},
"basic_tool": {"iron_ingot": 3}
},
"level_bonus": {
"unlock_recipe_level": { # Level required to unlock recipe
"basic_tool": 5
},
# Could add reduced material cost later
}
}
}
# Helper function to format time delta
def format_timedelta(delta: datetime.timedelta) -> str:
"""Formats a timedelta into a human-readable string (e.g., 1h 30m 15s)."""
total_seconds = int(delta.total_seconds())
if total_seconds < 0:
return "now"
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
parts = []
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
if seconds > 0 or not parts: # Show seconds if it's the only unit or > 0
parts.append(f"{seconds}s")
return " ".join(parts)
class JobsCommands(commands.Cog):
"""Cog containing job-related economy commands."""
def __init__(self, bot: commands.Bot):
self.bot = bot
# --- Job Management Commands ---
@commands.hybrid_command(name="jobs", description="List available jobs.")
async def list_jobs(self, ctx: commands.Context):
"""Displays available jobs and their basic information."""
embed = discord.Embed(title="Available Jobs", color=discord.Color.blue())
description = "Choose a job to specialize your earning potential!\n\n"
for key, details in JOB_DEFINITIONS.items():
description += f"**{details['name']} (`{key}`)**\n"
description += f"- {details['description']}\n"
description += f"- Command: `{details['command']}`\n"
description += f"- Cooldown: {format_timedelta(details['cooldown'])}\n\n"
embed.description = description
embed.set_footer(text="Use /choosejob <job_name> to select a job.")
await ctx.send(embed=embed)
@commands.hybrid_command(name="myjob", description="Show your current job status.")
async def my_job(self, ctx: commands.Context):
"""Displays the user's current job, level, and XP."""
user_id = ctx.author.id
job_info = await database.get_user_job(user_id)
if not job_info or not job_info.get("name"):
await ctx.send("You don't currently have a job. Use `/jobs` to see available options and `/choosejob <job_name>` to pick one.", ephemeral=True)
return
job_key = job_info["name"]
level = job_info["level"]
xp = job_info["xp"]
job_details = JOB_DEFINITIONS.get(job_key)
if not job_details:
await ctx.send(f"Error: Your job '{job_key}' is not recognized. Please contact an admin.", ephemeral=True)
log.error(f"User {user_id} has unrecognized job '{job_key}' in database.")
return
xp_needed = level * 100 # Matches logic in database.py
embed = discord.Embed(title=f"{ctx.author.display_name}'s Job: {job_details['name']}", color=discord.Color.green())
embed.add_field(name="Level", value=level, inline=True)
embed.add_field(name="XP", value=f"{xp} / {xp_needed}", inline=True)
# Cooldown check
last_action = job_info.get("last_action")
cooldown = job_details['cooldown']
if last_action:
now_utc = datetime.datetime.now(datetime.timezone.utc)
time_since = now_utc - last_action
if time_since < cooldown:
time_left = cooldown - time_since
embed.add_field(name="Cooldown", value=f"Ready in: {format_timedelta(time_left)}", inline=False)
else:
embed.add_field(name="Cooldown", value="Ready!", inline=False)
else:
embed.add_field(name="Cooldown", value="Ready!", inline=False)
embed.set_footer(text=f"Use {job_details['command']} to perform your job action.")
await ctx.send(embed=embed)
# Autocomplete for choosejob and leavejob
async def job_autocomplete(self, interaction: discord.Interaction, current: str) -> List[app_commands.Choice[str]]:
return [
app_commands.Choice(name=details["name"], value=key)
for key, details in JOB_DEFINITIONS.items() if current.lower() in key.lower() or current.lower() in details["name"].lower()
][:25] # Limit to 25 choices
@commands.hybrid_command(name="choosejob", description="Select a job to pursue.")
@app_commands.autocomplete(job_name=job_autocomplete)
async def choose_job(self, ctx: commands.Context, job_name: str):
"""Sets the user's active job."""
user_id = ctx.author.id
job_key = job_name.lower()
if job_key not in JOB_DEFINITIONS:
await ctx.send(f"Invalid job name '{job_name}'. Use `/jobs` to see available options.", ephemeral=True)
return
current_job_info = await database.get_user_job(user_id)
if current_job_info and current_job_info.get("name") == job_key:
await ctx.send(f"You are already a {JOB_DEFINITIONS[job_key]['name']}.", ephemeral=True)
return
# Implement switching cost/cooldown here if desired
# For now, allow free switching, resetting progress
await database.set_user_job(user_id, job_key)
await ctx.send(f"Congratulations! You are now a **{JOB_DEFINITIONS[job_key]['name']}**. Your previous job progress (if any) has been reset.")
@commands.hybrid_command(name="leavejob", description="Leave your current job.")
async def leave_job(self, ctx: commands.Context):
"""Abandons the user's current job, resetting progress."""
user_id = ctx.author.id
current_job_info = await database.get_user_job(user_id)
if not current_job_info or not current_job_info.get("name"):
await ctx.send("You don't have a job to leave.", ephemeral=True)
return
job_key = current_job_info["name"]
job_name = JOB_DEFINITIONS.get(job_key, {}).get("name", "Unknown Job")
await database.set_user_job(user_id, None) # Set job to NULL
await ctx.send(f"You have left your job as a **{job_name}**. Your level and XP for this job have been reset. You can choose a new job with `/choosejob`.")
# --- Job Action Commands ---
async def _handle_job_action(self, ctx: commands.Context, job_key: str):
"""Internal handler for job actions to check cooldowns, grant rewards, etc."""
user_id = ctx.author.id
job_info = await database.get_user_job(user_id)
# 1. Check if user has the correct job
if not job_info or job_info.get("name") != job_key:
correct_job_info = await database.get_user_job(user_id)
if correct_job_info and correct_job_info.get("name"):
correct_job_details = JOB_DEFINITIONS.get(correct_job_info["name"])
await ctx.send(f"You need to be a {JOB_DEFINITIONS[job_key]['name']} to use this command. Your current job is {correct_job_details['name']}. Use `{correct_job_details['command']}` instead, or change jobs with `/choosejob`.", ephemeral=True)
else:
await ctx.send(f"You need to be a {JOB_DEFINITIONS[job_key]['name']} to use this command. You don't have a job. Use `/choosejob {job_key}` first.", ephemeral=True)
return None # Indicate failure
job_details = JOB_DEFINITIONS[job_key]
level = job_info["level"]
# 2. Check Cooldown
last_action = job_info.get("last_action")
cooldown = job_details['cooldown']
if last_action:
now_utc = datetime.datetime.now(datetime.timezone.utc)
time_since = now_utc - last_action
if time_since < cooldown:
time_left = cooldown - time_since
await ctx.send(f"You need to wait **{format_timedelta(time_left)}** before you can {job_key} again.", ephemeral=True)
return None # Indicate failure
# 3. Set Cooldown Immediately
await database.set_job_cooldown(user_id)
# 4. Calculate Rewards
level_bonus = job_details.get("level_bonus", {})
currency_bonus = level * level_bonus.get("currency_increase", 0)
min_curr, max_curr = job_details["base_currency"]
currency_earned = random.randint(int(min_curr + currency_bonus), int(max_curr + currency_bonus))
items_found = {}
if "drops" in job_details:
rare_find_bonus = level * level_bonus.get("rare_find_increase", 0)
for item_key, base_chance in job_details["drops"].items():
# Apply level bonus to specific rare items if configured (e.g., gems for miner)
current_chance = base_chance
if item_key == 'shiny_gem' and job_key == 'miner':
current_chance += rare_find_bonus
elif (item_key == 'rare_fish' or item_key == 'treasure_chest') and job_key == 'fisher':
current_chance += rare_find_bonus
if random.random() < current_chance:
items_found[item_key] = items_found.get(item_key, 0) + 1
# 5. Grant Rewards (DB updates)
if currency_earned > 0:
await database.update_balance(user_id, currency_earned)
for item_key, quantity in items_found.items():
await database.add_item_to_inventory(user_id, item_key, quantity)
# 6. Grant XP & Handle Level Up
xp_earned = job_details["base_xp"] # Could add level bonus to XP later
new_level, new_xp, did_level_up = await database.add_job_xp(user_id, xp_earned)
# 7. Construct Response Message
response_parts = []
if currency_earned > 0:
response_parts.append(f"earned **${currency_earned:,}**")
if items_found:
item_strings = []
for item_key, quantity in items_found.items():
item_details = await database.get_item_details(item_key)
item_name = item_details['name'] if item_details else item_key
item_strings.append(f"{quantity}x **{item_name}**")
response_parts.append(f"found {', '.join(item_strings)}")
response_parts.append(f"gained **{xp_earned} XP**")
action_verb = job_key.capitalize() # "Mine", "Fish"
message = f"⛏️ You {action_verb} and {', '.join(response_parts)}." # Default message
# Customize message based on job
if job_key == "miner":
message = f"⛏️ You mined and {', '.join(response_parts)}."
elif job_key == "fisher":
message = f"🎣 You fished and {', '.join(response_parts)}."
# Crafter handled separately
if did_level_up:
message += f"\n**Congratulations! You reached Level {new_level} in {job_details['name']}!** 🎉"
current_balance = await database.get_balance(user_id)
message += f"\nYour current balance is **${current_balance:,}**."
return message # Indicate success and return message
@commands.hybrid_command(name="mine", description="Mine for ores and gems (Miner job).")
async def mine(self, ctx: commands.Context):
"""Performs the Miner job action."""
result_message = await self._handle_job_action(ctx, "miner")
if result_message:
await ctx.send(result_message)
@commands.hybrid_command(name="fish", description="Catch fish and maybe find treasure (Fisher job).")
async def fish(self, ctx: commands.Context):
"""Performs the Fisher job action."""
result_message = await self._handle_job_action(ctx, "fisher")
if result_message:
await ctx.send(result_message)
# --- Crafter Specific ---
async def craft_autocomplete(self, interaction: discord.Interaction, current: str) -> List[app_commands.Choice[str]]:
user_id = interaction.user.id
job_info = await database.get_user_job(user_id)
choices = []
if job_info and job_info.get("name") == "crafter":
crafter_details = JOB_DEFINITIONS["crafter"]
level = job_info["level"]
for item_key, recipe in crafter_details.get("recipes", {}).items():
# Check level requirement
required_level = crafter_details.get("level_bonus", {}).get("unlock_recipe_level", {}).get(item_key, 1)
if level < required_level:
continue
item_details = await database.get_item_details(item_key)
item_name = item_details['name'] if item_details else item_key
if current.lower() in item_key.lower() or current.lower() in item_name.lower():
choices.append(app_commands.Choice(name=item_name, value=item_key))
return choices[:25]
@commands.hybrid_command(name="craft", description="Craft items using materials (Crafter job).")
@app_commands.autocomplete(item_to_craft=craft_autocomplete)
async def craft(self, ctx: commands.Context, item_to_craft: str):
"""Performs the Crafter job action."""
user_id = ctx.author.id
job_key = "crafter"
job_info = await database.get_user_job(user_id)
# 1. Check if user has the correct job
if not job_info or job_info.get("name") != job_key:
await ctx.send("You need to be a Crafter to use this command. Use `/choosejob crafter` first.", ephemeral=True)
return
job_details = JOB_DEFINITIONS[job_key]
level = job_info["level"]
recipe_key = item_to_craft.lower()
# 2. Check if recipe exists
recipes = job_details.get("recipes", {})
if recipe_key not in recipes:
await ctx.send(f"Unknown recipe: '{item_to_craft}'. Check available recipes.", ephemeral=True) # TODO: Add /recipes command?
return
# 3. Check Level Requirement
required_level = job_details.get("level_bonus", {}).get("unlock_recipe_level", {}).get(recipe_key, 1)
if level < required_level:
await ctx.send(f"You need to be Level {required_level} to craft this item. You are currently Level {level}.", ephemeral=True)
return
# 4. Check Cooldown
last_action = job_info.get("last_action")
cooldown = job_details['cooldown']
if last_action:
now_utc = datetime.datetime.now(datetime.timezone.utc)
time_since = now_utc - last_action
if time_since < cooldown:
time_left = cooldown - time_since
await ctx.send(f"You need to wait **{format_timedelta(time_left)}** before you can craft again.", ephemeral=True)
return
# 5. Check Materials
required_materials = recipes[recipe_key]
inventory = await database.get_inventory(user_id)
inventory_map = {item['key']: item['quantity'] for item in inventory}
missing_materials = []
can_craft = True
for mat_key, mat_qty in required_materials.items():
if inventory_map.get(mat_key, 0) < mat_qty:
can_craft = False
mat_details = await database.get_item_details(mat_key)
mat_name = mat_details['name'] if mat_details else mat_key
missing_materials.append(f"{mat_qty - inventory_map.get(mat_key, 0)}x {mat_name}")
if not can_craft:
await ctx.send(f"You don't have the required materials. You still need: {', '.join(missing_materials)}.", ephemeral=True)
return
# 6. Set Cooldown Immediately
await database.set_job_cooldown(user_id)
# 7. Consume Materials & Grant Item
success = True
for mat_key, mat_qty in required_materials.items():
if not await database.remove_item_from_inventory(user_id, mat_key, mat_qty):
success = False
log.error(f"Failed to remove material {mat_key} x{mat_qty} for user {user_id} during crafting, despite check.")
await ctx.send("An error occurred while consuming materials. Please try again.", ephemeral=True)
# Should ideally revert cooldown here, but that's complex.
return
if success:
await database.add_item_to_inventory(user_id, recipe_key, 1)
# 8. Grant XP & Handle Level Up
xp_earned = job_details["base_xp"]
new_level, new_xp, did_level_up = await database.add_job_xp(user_id, xp_earned)
# 9. Construct Response
crafted_item_details = await database.get_item_details(recipe_key)
crafted_item_name = crafted_item_details['name'] if crafted_item_details else recipe_key
message = f"🛠️ You successfully crafted 1x **{crafted_item_name}** and gained **{xp_earned} XP**."
if did_level_up:
message += f"\n**Congratulations! You reached Level {new_level} in {job_details['name']}!** 🎉"
await ctx.send(message)
# --- Inventory Commands ---
@commands.hybrid_command(name="inventory", aliases=["inv"], description="View your items.")
async def inventory(self, ctx: commands.Context):
"""Displays the items in the user's inventory."""
user_id = ctx.author.id
inventory_items = await database.get_inventory(user_id)
if not inventory_items:
await ctx.send("Your inventory is empty.", ephemeral=True)
return
embed = discord.Embed(title=f"{ctx.author.display_name}'s Inventory", color=discord.Color.orange())
description = ""
for item in inventory_items:
sell_info = f" (Sell: ${item['sell_price']:,})" if item['sell_price'] > 0 else ""
description += f"- **{item['name']}** x{item['quantity']}{sell_info}\n"
if item['description']:
description += f" *({item['description']})*\n" # Add description if available
# Handle potential description length limit
if len(description) > 4000: # Embed description limit is 4096
description = description[:4000] + "\n... (Inventory too large to display fully)"
embed.description = description
await ctx.send(embed=embed)
# Autocomplete for sell command
async def inventory_autocomplete(self, interaction: discord.Interaction, current: str) -> List[app_commands.Choice[str]]:
user_id = interaction.user.id
inventory = await database.get_inventory(user_id)
return [
app_commands.Choice(name=f"{item['name']} (Have: {item['quantity']})", value=item['key'])
for item in inventory if item['sell_price'] > 0 and (current.lower() in item['key'].lower() or current.lower() in item['name'].lower())
][:25]
@commands.hybrid_command(name="sell", description="Sell items from your inventory.")
@app_commands.autocomplete(item_key=inventory_autocomplete)
async def sell(self, ctx: commands.Context, item_key: str, quantity: Optional[int] = 1):
"""Sells a specified quantity of an item from the inventory."""
user_id = ctx.author.id
if quantity <= 0:
await ctx.send("Please enter a positive quantity to sell.", ephemeral=True)
return
item_details = await database.get_item_details(item_key)
if not item_details:
await ctx.send(f"Invalid item key '{item_key}'. Check your `/inventory`.", ephemeral=True)
return
if item_details['sell_price'] <= 0:
await ctx.send(f"You cannot sell **{item_details['name']}**.", ephemeral=True)
return
# Try to remove items first
removed = await database.remove_item_from_inventory(user_id, item_key, quantity)
if not removed:
# Get current quantity to show in error message
inventory = await database.get_inventory(user_id)
current_quantity = 0
for item in inventory:
if item['key'] == item_key:
current_quantity = item['quantity']
break
await ctx.send(f"You don't have {quantity}x **{item_details['name']}** to sell. You only have {current_quantity}.", ephemeral=True)
return
# Grant money if removal was successful
total_earnings = item_details['sell_price'] * quantity
await database.update_balance(user_id, total_earnings)
current_balance = await database.get_balance(user_id)
await ctx.send(f"💰 You sold {quantity}x **{item_details['name']}** for **${total_earnings:,}**. Your new balance is **${current_balance:,}**.")

101
cogs/economy/risky.py Normal file
View File

@ -0,0 +1,101 @@
import discord
from discord.ext import commands
import datetime
import random
import logging
from typing import Optional
# Import database functions from the sibling module
from . import database
log = logging.getLogger(__name__)
class RiskyCommands(commands.Cog):
"""Cog containing risky economy commands like robbing."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.hybrid_command(name="rob", description="Attempt to rob another user (risky!).")
async def rob(self, ctx: commands.Context, target: discord.User):
"""Attempts to steal money from another user."""
robber_id = ctx.author.id
target_id = target.id
command_name = "rob"
cooldown_duration = datetime.timedelta(hours=6) # 6-hour cooldown
success_chance = 0.30 # 30% base chance of success
min_target_balance = 100 # Target must have at least this much to be robbed
fine_multiplier = 0.5 # Fine is 50% of what you tried to steal if caught
steal_percentage_min = 0.05 # Steal between 5%
steal_percentage_max = 0.20 # and 20% of target's balance
if robber_id == target_id:
await ctx.send("You can't rob yourself!", ephemeral=True)
return
# Check cooldown
last_used = await database.check_cooldown(robber_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
hours, remainder = divmod(int(time_left.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
await ctx.send(f"You need to lay low after your last attempt. Try again in **{hours}h {minutes}m {seconds}s**.", ephemeral=True)
return
# Check target balance
target_balance = await database.get_balance(target_id)
if target_balance < min_target_balance:
await ctx.send(f"{target.display_name} doesn't have enough money to be worth robbing (minimum ${min_target_balance:,}).", ephemeral=True)
# Don't apply cooldown if target wasn't viable
return
# Set cooldown now that a valid attempt is being made
await database.set_cooldown(robber_id, command_name)
# Check robber balance (needed for potential fine)
robber_balance = await database.get_balance(robber_id)
# Determine success
if random.random() < success_chance:
# Success!
steal_percentage = random.uniform(steal_percentage_min, steal_percentage_max)
stolen_amount = int(target_balance * steal_percentage)
if stolen_amount <= 0: # Ensure at least 1 is stolen if percentage is too low
stolen_amount = 1
await database.update_balance(robber_id, stolen_amount)
await database.update_balance(target_id, -stolen_amount)
current_robber_balance = await database.get_balance(robber_id)
await ctx.send(f"🚨 Success! You skillfully robbed **${stolen_amount:,}** from {target.mention}! Your new balance is **${current_robber_balance:,}**.")
try:
await target.send(f"🚨 Oh no! {ctx.author.mention} robbed you for **${stolen_amount:,}**!")
except discord.Forbidden:
pass # Ignore if DMs are closed
else:
# Failure! Calculate potential fine
# Fine based on what they *could* have stolen (using average percentage for calculation)
potential_steal_amount = int(target_balance * ((steal_percentage_min + steal_percentage_max) / 2))
if potential_steal_amount <= 0: potential_steal_amount = 1
fine_amount = int(potential_steal_amount * fine_multiplier)
# Ensure fine doesn't exceed robber's balance
fine_amount = min(fine_amount, robber_balance)
if fine_amount > 0:
await database.update_balance(robber_id, -fine_amount)
# Optional: Give the fine to the target? Or just remove it? Let's remove it.
# await database.update_balance(target_id, fine_amount)
current_robber_balance = await database.get_balance(robber_id)
await ctx.send(f"👮‍♂️ You were caught trying to rob {target.mention}! You paid a fine of **${fine_amount:,}**. Your new balance is **${current_robber_balance:,}**.")
else:
# Robber is broke, can't pay fine
await ctx.send(f"👮‍♂️ You were caught trying to rob {target.mention}, but you're too broke to pay the fine!")
# No setup function needed here

95
cogs/economy/utility.py Normal file
View File

@ -0,0 +1,95 @@
import discord
from discord.ext import commands
import datetime
import logging
from typing import Optional
# Import database functions from the sibling module
from . import database
log = logging.getLogger(__name__)
class UtilityCommands(commands.Cog):
"""Cog containing utility-related economy commands."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.hybrid_command(name="balance", description="Check your or another user's balance.")
@commands.cooldown(1, 5, commands.BucketType.user) # Basic discord.py cooldown
async def balance(self, ctx: commands.Context, user: Optional[discord.User] = None):
"""Displays the economy balance for a user."""
target_user = user or ctx.author
balance_amount = await database.get_balance(target_user.id)
await ctx.send(f"{target_user.display_name} has a balance of **${balance_amount:,}**.", ephemeral=True)
@commands.hybrid_command(name="leaderboard", aliases=["lb", "top"], description="Show the richest users.")
@commands.cooldown(1, 30, commands.BucketType.user) # Prevent spam
async def leaderboard(self, ctx: commands.Context, count: int = 10):
"""Displays the top users by balance."""
if not 1 <= count <= 25:
await ctx.send("Please provide a count between 1 and 25.", ephemeral=True)
return
results = await database.get_leaderboard(count)
if not results:
await ctx.send("The leaderboard is empty!", ephemeral=True)
return
embed = discord.Embed(title="💰 Economy Leaderboard", color=discord.Color.gold())
description = ""
rank = 1
for user_id, balance in results:
user = self.bot.get_user(user_id) # Try to get user object for display name
# Fetch user if not in cache - might be slow for large leaderboards
if user is None:
try:
user = await self.bot.fetch_user(user_id)
except discord.NotFound:
user = None # User might have left all shared servers
except discord.HTTPException:
user = None # Other Discord API error
log.warning(f"Failed to fetch user {user_id} for leaderboard.")
user_name = user.display_name if user else f"User ID: {user_id}"
description += f"{rank}. {user_name} - **${balance:,}**\n"
rank += 1
embed.description = description
await ctx.send(embed=embed)
@commands.hybrid_command(name="pay", description="Transfer money to another user.")
async def pay(self, ctx: commands.Context, recipient: discord.User, amount: int):
"""Transfers currency from the command author to another user."""
sender_id = ctx.author.id
recipient_id = recipient.id
if sender_id == recipient_id:
await ctx.send("You cannot pay yourself!", ephemeral=True)
return
if amount <= 0:
await ctx.send("Please enter a positive amount to pay.", ephemeral=True)
return
sender_balance = await database.get_balance(sender_id)
if sender_balance < amount:
await ctx.send(f"You don't have enough money! Your balance is **${sender_balance:,}**.", ephemeral=True)
return
# Perform the transfer
await database.update_balance(sender_id, -amount) # Decrease sender's balance
await database.update_balance(recipient_id, amount) # Increase recipient's balance
current_sender_balance = await database.get_balance(sender_id)
await ctx.send(f"💸 You successfully paid **${amount:,}** to {recipient.mention}. Your new balance is **${current_sender_balance:,}**.")
try:
# Optionally DM the recipient
await recipient.send(f"💸 You received a payment of **${amount:,}** from {ctx.author.mention}!")
except discord.Forbidden:
log.warning(f"Could not DM recipient {recipient_id} about payment.") # User might have DMs closed
# No setup function needed here

View File

@ -1,210 +1,52 @@
import discord
from discord.ext import commands, tasks
import aiosqlite
import os
import datetime
from discord.ext import commands
import logging
import random
from typing import Optional
# Configure logging
# Import command classes from submodules
from .economy.database import init_db
from .economy.earning import EarningCommands
from .economy.gambling import GamblingCommands
from .economy.utility import UtilityCommands
from .economy.risky import RiskyCommands
from .economy.jobs import JobsCommands # Import the new JobsCommands
log = logging.getLogger(__name__)
# Database path (within the discordbot/data directory)
DB_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
DB_PATH = os.path.join(DB_DIR, 'economy.db')
# --- Main Cog Implementation ---
# Ensure the data directory exists
os.makedirs(DB_DIR, exist_ok=True)
# --- Database Setup ---
async def init_db():
"""Initializes the database and creates tables if they don't exist."""
try:
async with aiosqlite.connect(DB_PATH) as db:
# Create economy table
await db.execute("""
CREATE TABLE IF NOT EXISTS economy (
user_id INTEGER PRIMARY KEY,
balance INTEGER NOT NULL DEFAULT 0
)
""")
log.info("Checked/created 'economy' table.")
# Create command_cooldowns table
await db.execute("""
CREATE TABLE IF NOT EXISTS command_cooldowns (
user_id INTEGER NOT NULL,
command_name TEXT NOT NULL,
last_used TIMESTAMP NOT NULL,
PRIMARY KEY (user_id, command_name)
)
""")
log.info("Checked/created 'command_cooldowns' table.")
await db.commit()
log.info(f"Database initialized successfully at {DB_PATH}")
except Exception as e:
log.error(f"Failed to initialize economy database at {DB_PATH}: {e}", exc_info=True)
raise # Re-raise the exception to prevent the cog from loading incorrectly
# --- Cog Implementation ---
class EconomyCog(commands.Cog):
"""Cog for handling economy commands like balance, daily, and beg."""
# Inherit from commands.Cog and all the command classes
class EconomyCog(
EarningCommands,
GamblingCommands,
UtilityCommands,
RiskyCommands,
JobsCommands, # Add JobsCommands to the inheritance list
commands.Cog # Ensure commands.Cog is included
):
"""Main cog for the economy system, combining all command groups."""
def __init__(self, bot: commands.Bot):
# Initialize all parent cogs (important!)
super().__init__(bot) # Calls __init__ of the first parent in MRO (EarningCommands)
# If other parent cogs had complex __init__, we might need to call them explicitly,
# but in this case, they only store the bot instance, which super() handles.
self.bot = bot
log.info("EconomyCog initialized.")
log.info("EconomyCog initialized (combined).")
async def cog_load(self):
"""Called when the cog is loaded, ensures DB is initialized."""
log.info("Loading EconomyCog...")
await init_db()
log.info("EconomyCog database initialization complete.")
# --- Database Helper Methods ---
async def _get_balance(self, user_id: int) -> int:
"""Gets the balance for a user, creating an entry if needed."""
async with aiosqlite.connect(DB_PATH) as db:
# Try to fetch existing balance
async with db.execute("SELECT balance FROM economy WHERE user_id = ?", (user_id,)) as cursor:
result = await cursor.fetchone()
if result:
return result[0]
else:
# User doesn't exist, create entry and return default balance (0)
try:
await db.execute("INSERT INTO economy (user_id, balance) VALUES (?, ?)", (user_id, 0))
await db.commit()
log.info(f"Created new economy entry for user_id: {user_id}")
return 0
except aiosqlite.IntegrityError:
# Handle rare race condition where another process inserted the user just now
log.warning(f"Race condition handled for user_id: {user_id} during balance fetch.")
async with db.execute("SELECT balance FROM economy WHERE user_id = ?", (user_id,)) as cursor_retry:
result_retry = await cursor_retry.fetchone()
return result_retry[0] if result_retry else 0 # Should exist now
async def _update_balance(self, user_id: int, amount: int):
"""Updates a user's balance by adding the specified amount (can be negative)."""
async with aiosqlite.connect(DB_PATH) as db:
# Ensure user exists first
await self._get_balance(user_id)
# Update balance
await db.execute("UPDATE economy SET balance = balance + ? WHERE user_id = ?", (amount, user_id))
await db.commit()
log.debug(f"Updated balance for user_id {user_id} by {amount}.")
async def _check_cooldown(self, user_id: int, command_name: str) -> Optional[datetime.datetime]:
"""Checks if a command is on cooldown for a user. Returns the last used time if on cooldown, else None."""
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("SELECT last_used FROM command_cooldowns WHERE user_id = ? AND command_name = ?", (user_id, command_name)) as cursor:
result = await cursor.fetchone()
if result:
# Parse the timestamp string back into a datetime object
# Assuming timestamps are stored in ISO 8601 format (YYYY-MM-DD HH:MM:SS.ffffff)
try:
last_used_dt = datetime.datetime.fromisoformat(result[0])
return last_used_dt
except ValueError:
log.error(f"Could not parse timestamp '{result[0]}' for user {user_id}, command {command_name}")
# Fallback: treat as if not on cooldown or handle error appropriately
return None
else:
return None # Not on cooldown
async def _set_cooldown(self, user_id: int, command_name: str):
"""Sets or updates the cooldown timestamp for a command."""
now = datetime.datetime.now(datetime.timezone.utc)
now_iso = now.isoformat() # Store in ISO format for consistency
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("""
INSERT INTO command_cooldowns (user_id, command_name, last_used)
VALUES (?, ?, ?)
ON CONFLICT(user_id, command_name) DO UPDATE SET last_used = excluded.last_used
""", (user_id, command_name, now_iso))
await db.commit()
log.debug(f"Set cooldown for user_id {user_id}, command {command_name} to {now_iso}")
# --- Commands ---
@commands.hybrid_command(name="balance", description="Check your or another user's balance.")
@commands.cooldown(1, 5, commands.BucketType.user) # Basic discord.py cooldown to prevent spamming the check itself
async def balance(self, ctx: commands.Context, user: Optional[discord.User] = None):
"""Displays the economy balance for a user."""
target_user = user or ctx.author
balance_amount = await self._get_balance(target_user.id)
await ctx.send(f"{target_user.display_name} has a balance of **${balance_amount:,}**.", ephemeral=True)
@commands.hybrid_command(name="daily", description="Claim your daily reward.")
async def daily(self, ctx: commands.Context):
"""Allows users to claim a daily currency reward."""
user_id = ctx.author.id
command_name = "daily"
cooldown_duration = datetime.timedelta(hours=24)
reward_amount = 100 # Example daily reward
last_used = await self._check_cooldown(user_id, command_name)
if last_used:
time_since_last_used = datetime.datetime.now(datetime.timezone.utc) - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
# Format timedelta nicely
hours, remainder = divmod(int(time_left.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
await ctx.send(f"You've already claimed your daily reward. Try again in **{hours}h {minutes}m {seconds}s**.", ephemeral=True)
return
else:
# Cooldown expired but entry exists, proceed to claim
pass
# Not on cooldown or cooldown expired
await self._update_balance(user_id, reward_amount)
await self._set_cooldown(user_id, command_name)
await ctx.send(f"🎉 You claimed your daily reward of **${reward_amount:,}**! Your new balance is **${await self._get_balance(user_id):,}**.")
@commands.hybrid_command(name="beg", description="Beg for some spare change.")
async def beg(self, ctx: commands.Context):
"""Allows users to beg for a small amount of currency with a chance of success."""
user_id = ctx.author.id
command_name = "beg"
cooldown_duration = datetime.timedelta(minutes=5) # 5-minute cooldown
success_chance = 0.4 # 40% chance of success
min_reward = 1
max_reward = 20
last_used = await self._check_cooldown(user_id, command_name)
if last_used:
time_since_last_used = datetime.datetime.now(datetime.timezone.utc) - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
minutes, seconds = divmod(int(time_left.total_seconds()), 60)
await ctx.send(f"You can't beg again so soon. Try again in **{minutes}m {seconds}s**.", ephemeral=True)
return
else:
# Cooldown expired
pass
# Set cooldown regardless of success/failure
await self._set_cooldown(user_id, command_name)
# Determine success
if random.random() < success_chance:
reward_amount = random.randint(min_reward, max_reward)
await self._update_balance(user_id, reward_amount)
await ctx.send(f"🙏 Someone took pity on you! You received **${reward_amount:,}**. Your new balance is **${await self._get_balance(user_id):,}**.")
else:
await ctx.send("🤷 Nobody gave you anything. Better luck next time!")
log.info("Loading EconomyCog (combined)...")
try:
await init_db()
log.info("EconomyCog database initialization complete.")
except Exception as e:
log.error(f"EconomyCog failed to initialize database during load: {e}", exc_info=True)
# Prevent the cog from loading if DB init fails
raise commands.ExtensionFailed(self.qualified_name, e) from e
# --- Setup Function ---
async def setup(bot: commands.Bot):
"""Sets up the EconomyCog."""
"""Sets up the combined EconomyCog."""
await bot.add_cog(EconomyCog(bot))
log.info("EconomyCog added to bot.")
log.info("Combined EconomyCog added to bot.")