feat: Add Google Cloud TTS provider

Introduces a new TTS provider using Google Cloud Text-to-Speech (Chirp HD model). This allows for higher quality and more natural-sounding voice synthesis.

The `TTSProviderCog` now includes:
- A `google_cloud_tts` option in the `_synthesize_speech` method.
- Checks for the `google-cloud-texttospeech` library and provides installation instructions if missing.
- Error handling for common Google Cloud TTS issues like quota limits or credential problems.
- A new choice in the `/ttsprovider` slash command for "Google Cloud TTS (Chirp HD)".
- A check for the availability of the `google.cloud.texttospeech` module on startup.
This commit is contained in:
Slipstream 2025-05-30 21:25:01 -06:00
parent b2ea6540c0
commit db64d0e790
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
6 changed files with 962 additions and 7 deletions

367
cogs/VoiceGatewayCog.py Normal file
View File

@ -0,0 +1,367 @@
import discord
from discord.ext import commands
import asyncio
import os
import tempfile
import wave # For saving audio data
# Attempt to import STT, VAD, and Opus libraries
try:
import whisper
except ImportError:
print("Whisper library not found. Please install with 'pip install openai-whisper'")
whisper = None
try:
import webrtcvad
except ImportError:
print("webrtcvad library not found. Please install with 'pip install webrtc-voice-activity-detector'")
webrtcvad = None
try:
from opuslib import Decoder as OpusDecoder
from opuslib import OPUS_APPLICATION_VOIP, OPUS_SIGNAL_VOICE
except ImportError:
print("opuslib library not found. Please install with 'pip install opuslib' (requires Opus C library).")
OpusDecoder = None
FFMPEG_OPTIONS = {
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'
}
# Constants for audio processing
SAMPLE_RATE = 16000 # Whisper prefers 16kHz
CHANNELS = 1 # Mono
SAMPLE_WIDTH = 2 # 16-bit audio (2 bytes per sample)
VAD_MODE = 3 # VAD aggressiveness (0-3, 3 is most aggressive)
FRAME_DURATION_MS = 30 # Duration of a frame in ms for VAD (10, 20, or 30)
BYTES_PER_FRAME = (SAMPLE_RATE // 1000) * FRAME_DURATION_MS * CHANNELS * SAMPLE_WIDTH
OPUS_FRAME_SIZE_MS = 20 # Opus typically uses 20ms frames
OPUS_SAMPLES_PER_FRAME = (SAMPLE_RATE // 1000) * OPUS_FRAME_SIZE_MS # e.g. 16000/1000 * 20 = 320 samples for 16kHz
OPUS_BUFFER_SIZE = OPUS_SAMPLES_PER_FRAME * CHANNELS * SAMPLE_WIDTH # Bytes for PCM buffer for one Opus frame
# Silence detection parameters
SILENCE_THRESHOLD_FRAMES = 25 # Number of consecutive silent VAD frames to consider end of speech (e.g., 25 * 30ms = 750ms)
MAX_SPEECH_DURATION_S = 15 # Max duration of a single speech segment to process
MAX_SPEECH_FRAMES = (MAX_SPEECH_DURATION_S * 1000) // FRAME_DURATION_MS
class VoiceAudioSink(discord.AudioSink):
def __init__(self, cog_instance, voice_client: discord.VoiceClient):
super().__init__()
self.cog = cog_instance
self.voice_client = voice_client # Store the voice_client
self.user_audio_data = {} # {ssrc: {'buffer': bytearray, 'speaking': False, 'silent_frames': 0, 'speech_frames': 0, 'decoder': OpusDecoder, 'vad': VAD_instance}}
if not OpusDecoder:
print("OpusDecoder not available. AudioSink will not function correctly.")
if not webrtcvad:
print("VAD library not loaded. STT might be less efficient or not work as intended.")
def write(self, ssrc: int, data: bytes): # data is opus encoded
if not OpusDecoder or not webrtcvad or not self.voice_client:
return
user = self.voice_client.ssrc_map.get(ssrc)
if not user: # Unknown SSRC or user left
# Clean up if user data exists for this SSRC
if ssrc in self.user_audio_data:
del self.user_audio_data[ssrc]
return
user_id = user.id
if ssrc not in self.user_audio_data:
self.user_audio_data[ssrc] = {
'buffer': bytearray(),
'speaking': False,
'silent_frames': 0,
'speech_frames': 0,
'decoder': OpusDecoder(SAMPLE_RATE, CHANNELS), # Decode to 16kHz mono
'vad': webrtcvad.Vad(VAD_MODE) if webrtcvad else None
}
entry = self.user_audio_data[ssrc]
try:
# Decode Opus to PCM. Opus data is typically 20ms frames.
# Max frame size for opuslib decoder is 2 bytes/sample * 1 channel * 120ms * 48kHz = 11520 bytes
# We expect 20ms frames from Discord.
# The decoder needs to know the length of the PCM buffer it can write to.
# For 16kHz, 1 channel, 20ms: 320 samples * 2 bytes/sample = 640 bytes.
pcm_data = entry['decoder'].decode(data, OPUS_SAMPLES_PER_FRAME, decode_fec=False)
except Exception as e:
print(f"Opus decoding error for SSRC {ssrc} (User {user_id}): {e}")
return
# VAD processing expects frames of 10, 20, or 30 ms.
# Our pcm_data is likely 20ms if decoded correctly.
# We need to ensure it's split into VAD-compatible frame lengths if not already.
# If pcm_data is 20ms at 16kHz, its length is 640 bytes.
# A 10ms frame at 16kHz is 320 bytes. A 30ms frame is 960 bytes.
# Let's assume pcm_data is one 20ms frame. We can feed it directly if VAD supports 20ms.
# Or split it into two 10ms frames. Let's use 20ms frames for VAD.
# Ensure frame_length for VAD is correct (e.g. 20ms at 16kHz = 320 samples = 640 bytes)
frame_length_for_vad_20ms = (SAMPLE_RATE // 1000) * 20 * CHANNELS * SAMPLE_WIDTH # 640 bytes for 20ms @ 16kHz
if len(pcm_data) != frame_length_for_vad_20ms:
# This might happen if opus frame duration is not 20ms or sample rate mismatch
# print(f"Warning: PCM data length {len(pcm_data)} not expected {frame_length_for_vad_20ms} for SSRC {ssrc}. Skipping VAD for this frame.")
# For simplicity, if frame size is unexpected, we might skip or buffer differently.
# For now, let's assume it's mostly correct.
# A more robust solution would handle partial frames or resample/reframe.
pass
if entry['vad']:
try:
is_speech = entry['vad'].is_speech(pcm_data, SAMPLE_RATE)
except Exception as e: # webrtcvad can raise errors on invalid frame length
# print(f"VAD error for SSRC {ssrc} (User {user_id}) with PCM length {len(pcm_data)}: {e}. Defaulting to speech=True for this frame.")
# Fallback: if VAD fails, assume it's speech to avoid losing data, or handle more gracefully.
is_speech = True # Or False, depending on desired behavior on error
else: # No VAD
is_speech = True
if is_speech:
entry['buffer'].extend(pcm_data)
entry['speaking'] = True
entry['silent_frames'] = 0
entry['speech_frames'] += 1
if entry['speech_frames'] >= MAX_SPEECH_FRAMES:
# print(f"Max speech frames reached for SSRC {ssrc}. Processing segment.")
asyncio.create_task(self.cog.process_audio_segment(user_id, bytes(entry['buffer']), self.voice_client.guild))
entry['buffer'].clear()
entry['speaking'] = False
entry['speech_frames'] = 0
elif entry['speaking']: # Was speaking, now silence
entry['buffer'].extend(pcm_data) # Add this last silent frame for context
entry['silent_frames'] += 1
if entry['silent_frames'] >= SILENCE_THRESHOLD_FRAMES:
# print(f"Silence threshold reached for SSRC {ssrc}. Processing segment.")
asyncio.create_task(self.cog.process_audio_segment(user_id, bytes(entry['buffer']), self.voice_client.guild))
entry['buffer'].clear()
entry['speaking'] = False
entry['speech_frames'] = 0
entry['silent_frames'] = 0
# If not is_speech and not entry['speaking'], do nothing (ignore silence)
# else:
# If buffer has old data and user stopped talking long ago, clear it?
# This part can be tricky to avoid cutting off speech.
# The current logic processes on silence *after* speech.
def cleanup(self):
print("VoiceAudioSink cleanup called.")
for ssrc, data in self.user_audio_data.items():
# If there's buffered audio when cleaning up, process it
if data['buffer']:
user = self.voice_client.ssrc_map.get(ssrc)
if user:
print(f"Processing remaining audio for SSRC {ssrc} (User {user.id}) on cleanup.")
asyncio.create_task(self.cog.process_audio_segment(user.id, bytes(data['buffer']), self.voice_client.guild))
self.user_audio_data.clear()
class VoiceGatewayCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.active_sinks = {} # guild_id: VoiceAudioSink
self.whisper_model = None
if whisper:
try:
# Load a smaller model initially, can be made configurable
self.whisper_model = whisper.load_model("base")
print("Whisper model 'base' loaded successfully.")
except Exception as e:
print(f"Error loading Whisper model: {e}. STT will not be available.")
self.whisper_model = None
else:
print("Whisper library not available. STT functionality will be disabled.")
async def cog_load(self):
print("VoiceGatewayCog loaded!")
async def cog_unload(self):
print("Unloading VoiceGatewayCog...")
# Disconnect from all voice channels and clean up sinks
for vc in list(self.bot.voice_clients): # Iterate over a copy
guild_id = vc.guild.id
if guild_id in self.active_sinks:
if vc.is_connected():
vc.stop_listening() # Stop listening before cleanup
self.active_sinks[guild_id].cleanup()
del self.active_sinks[guild_id]
if vc.is_connected():
await vc.disconnect(force=True)
print("VoiceGatewayCog unloaded and disconnected from voice channels.")
async def connect_to_voice(self, channel: discord.VoiceChannel):
"""Connects the bot to a specified voice channel and starts listening."""
if not channel:
return None, "Channel not provided."
guild = channel.guild
voice_client = guild.voice_client
if voice_client and voice_client.is_connected():
if voice_client.channel == channel:
print(f"Already connected to {channel.name} in {guild.name}.")
# Ensure listening is active if already connected
if guild.id not in self.active_sinks or not voice_client.is_listening():
self.start_listening_for_vc(voice_client)
return voice_client, "Already connected to this channel."
else:
await voice_client.move_to(channel)
print(f"Moved to {channel.name} in {guild.name}.")
# Restart listening in the new channel
self.start_listening_for_vc(voice_client)
else:
try:
voice_client = await channel.connect(timeout=10.0) # Added timeout
print(f"Connected to {channel.name} in {guild.name}.")
self.start_listening_for_vc(voice_client)
except asyncio.TimeoutError:
return None, f"Timeout trying to connect to {channel.name}."
except Exception as e:
return None, f"Error connecting to {channel.name}: {str(e)}"
if not voice_client: # Should not happen if connect succeeded
return None, "Failed to establish voice client after connection."
return voice_client, f"Successfully connected and listening in {channel.name}."
def start_listening_for_vc(self, voice_client: discord.VoiceClient):
"""Starts or restarts listening for a given voice client."""
guild_id = voice_client.guild.id
if guild_id in self.active_sinks:
# If sink exists, ensure it's clean and listening is (re)started
if voice_client.is_listening():
voice_client.stop_listening() # Stop previous listening if any
self.active_sinks[guild_id].cleanup() # Clean old state
# Re-initialize or ensure the sink is fresh for the current VC
self.active_sinks[guild_id] = VoiceAudioSink(self, voice_client)
else:
self.active_sinks[guild_id] = VoiceAudioSink(self, voice_client)
if not voice_client.is_listening():
voice_client.listen(self.active_sinks[guild_id])
print(f"Started listening in {voice_client.channel.name} for guild {guild_id}")
else:
print(f"Already listening in {voice_client.channel.name} for guild {guild_id}")
async def disconnect_from_voice(self, guild: discord.Guild):
"""Disconnects the bot from the voice channel in the given guild."""
voice_client = guild.voice_client
if voice_client and voice_client.is_connected():
if voice_client.is_listening():
voice_client.stop_listening()
guild_id = guild.id
if guild_id in self.active_sinks:
self.active_sinks[guild_id].cleanup()
del self.active_sinks[guild_id]
await voice_client.disconnect(force=True)
print(f"Disconnected from voice in {guild.name}.")
return True, f"Disconnected from voice in {guild.name}."
return False, "Not connected to voice in this guild."
async def play_audio_file(self, voice_client: discord.VoiceClient, audio_file_path: str):
"""Plays an audio file in the voice channel."""
if not voice_client or not voice_client.is_connected():
print("Error: Voice client not connected.")
return False, "Voice client not connected."
if not os.path.exists(audio_file_path):
print(f"Error: Audio file not found at {audio_file_path}")
return False, "Audio file not found."
if voice_client.is_playing():
voice_client.stop() # Stop current audio if any
try:
audio_source = discord.FFmpegPCMAudio(audio_file_path, **FFMPEG_OPTIONS)
voice_client.play(audio_source, after=lambda e: self.after_audio_playback(e, audio_file_path))
print(f"Playing audio: {audio_file_path}")
return True, f"Playing {os.path.basename(audio_file_path)}"
except Exception as e:
print(f"Error creating/playing FFmpegPCMAudio source for {audio_file_path}: {e}")
return False, f"Error playing audio: {str(e)}"
def after_audio_playback(self, error, audio_file_path):
if error:
print(f"Error during audio playback for {audio_file_path}: {error}")
else:
print(f"Finished playing {audio_file_path}")
# TTSProviderCog's cleanup will handle deleting the file.
# Removed start_listening_pipeline as the sink now handles more logic directly or via tasks.
async def process_audio_segment(self, user_id: int, audio_data: bytes, guild: discord.Guild):
"""Processes a segment of audio data using Whisper."""
if not self.whisper_model or not audio_data: # also check if audio_data is empty
if not audio_data: print(f"process_audio_segment called for user {user_id} with empty audio_data.")
return
# Save audio_data (PCM) to a temporary WAV file
# Whisper expects a file path or a NumPy array.
# Using a temporary file is straightforward.
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav:
wav_file_path = tmp_wav.name
wf = wave.open(tmp_wav, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(audio_data)
wf.close()
# Transcribe using Whisper (this can be blocking, run in executor)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, # Default ThreadPoolExecutor
self.whisper_model.transcribe,
wav_file_path,
fp16=False # Set to True if GPU supports it and you want faster inference
)
transcribed_text = result["text"].strip()
if transcribed_text: # Only dispatch if there's actual text
user = guild.get_member(user_id) or await self.bot.fetch_user(user_id)
print(f"Transcription for {user.name} ({user_id}) in {guild.name}: {transcribed_text}")
self.bot.dispatch("voice_transcription_received", guild, user, transcribed_text)
except Exception as e:
print(f"Error processing audio segment for user {user_id}: {e}")
finally:
if 'wav_file_path' in locals() and os.path.exists(wav_file_path):
os.remove(wav_file_path)
async def setup(bot: commands.Bot):
# Check for FFmpeg before adding cog
try:
# Try running ffmpeg -version to check if it's installed and in PATH
process = await asyncio.create_subprocess_shell(
"ffmpeg -version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
print("FFmpeg found. VoiceGatewayCog can be loaded.")
await bot.add_cog(VoiceGatewayCog(bot))
print("VoiceGatewayCog loaded successfully!")
else:
print("FFmpeg not found or not working correctly. VoiceGatewayCog will not be loaded.")
print(f"FFmpeg check stdout: {stdout.decode(errors='ignore')}")
print(f"FFmpeg check stderr: {stderr.decode(errors='ignore')}")
except FileNotFoundError:
print("FFmpeg command not found. VoiceGatewayCog will not be loaded. Please install FFmpeg and ensure it's in your system's PATH.")
except Exception as e:
print(f"An error occurred while checking for FFmpeg: {e}. VoiceGatewayCog will not be loaded.")

View File

@ -6,6 +6,7 @@ import asyncio
import tempfile
import sys
import importlib.util
from google.cloud import texttospeech
class TTSProviderCog(commands.Cog):
def __init__(self, bot):
@ -118,9 +119,8 @@ class TTSProviderCog(commands.Cog):
import platform
try:
# Check if espeak-ng is available
if platform.system() == "Windows":
# On Windows, we'll check if the command exists
if platform.system() == "Windows":
result = subprocess.run(["where", "espeak-ng"], capture_output=True, text=True)
espeak_available = result.returncode == 0
else:
@ -162,6 +162,43 @@ class TTSProviderCog(commands.Cog):
except Exception as e:
return False, f"Error with espeak-ng: {str(e)}"
elif provider == "google_cloud_tts":
# Check if google-cloud-texttospeech is available
if importlib.util.find_spec("google.cloud.texttospeech") is None:
return False, "Google Cloud TTS library is not installed. Run: pip install google-cloud-texttospeech"
try:
client = texttospeech.TextToSpeechClient() # Assumes GOOGLE_APPLICATION_CREDENTIALS is set
input_text = texttospeech.SynthesisInput(text=text)
# Specify the voice, using your requested model
voice = texttospeech.VoiceSelectionParams(
language_code="en-US",
name="en-US-Chirp3-HD-Autonoe"
)
# Specify audio configuration (MP3 output)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
response = client.synthesize_speech(
request={"input": input_text, "voice": voice, "audio_config": audio_config}
)
# The response's audio_content is binary. Write it to the output file.
with open(output_file, "wb") as out:
out.write(response.audio_content)
return True, output_file
except Exception as e:
error_message = f"Error with Google Cloud TTS: {str(e)}"
if "quota" in str(e).lower():
error_message += " This might be a quota issue with your Google Cloud project."
elif "credentials" in str(e).lower():
error_message += " Please ensure GOOGLE_APPLICATION_CREDENTIALS environment variable is set correctly."
return False, error_message
else:
return False, f"Unknown TTS provider: {provider}"
@ -174,7 +211,8 @@ class TTSProviderCog(commands.Cog):
app_commands.Choice(name="Google TTS (Online)", value="gtts"),
app_commands.Choice(name="pyttsx3 (Offline)", value="pyttsx3"),
app_commands.Choice(name="Coqui TTS (AI Voice)", value="coqui"),
app_commands.Choice(name="eSpeak-NG (Offline)", value="espeak")
app_commands.Choice(name="eSpeak-NG (Offline)", value="espeak"),
app_commands.Choice(name="Google Cloud TTS (Chirp HD)", value="google_cloud_tts")
])
async def ttsprovider_slash(self, interaction: discord.Interaction,
provider: str,
@ -257,6 +295,16 @@ except Exception as e:
print(f"Error checking espeak-ng: {{e}}")
ESPEAK_AVAILABLE = False
try:
GCLOUD_TTS_AVAILABLE = importlib.util.find_spec("google.cloud.texttospeech") is not None
print(f"GCLOUD_TTS_AVAILABLE: {{GCLOUD_TTS_AVAILABLE}}")
if GCLOUD_TTS_AVAILABLE:
import google.cloud.texttospeech
print(f"google-cloud-texttospeech version: {{google.cloud.texttospeech.__version__}}")
except Exception as e:
print(f"Error checking google.cloud.texttospeech: {{e}}")
GCLOUD_TTS_AVAILABLE = False
def generate_tts_audio(provider, text, output_file):
print(f"Testing TTS provider: {{provider}}")
print(f"Text: {{text}}")
@ -335,6 +383,29 @@ def generate_tts_audio(provider, text, output_file):
print(f"Error with espeak-ng: {{e}}")
traceback.print_exc()
return False
elif provider == "google_cloud_tts" and GCLOUD_TTS_AVAILABLE:
try:
from google.cloud import texttospeech as gcloud_tts
client = gcloud_tts.TextToSpeechClient()
input_text = gcloud_tts.SynthesisInput(text=text)
voice = gcloud_tts.VoiceSelectionParams(
language_code="en-US",
name="en-US-Chirp3-HD-Autonoe"
)
audio_config = gcloud_tts.AudioConfig(
audio_encoding=gcloud_tts.AudioEncoding.MP3
)
response = client.synthesize_speech(
request={{"input": input_text, "voice": voice, "audio_config": audio_config}}
)
with open(output_file, "wb") as out:
out.write(response.audio_content)
print(f"Google Cloud TTS audio saved to {{output_file}}")
return True
except Exception as e:
print(f"Error with Google Cloud TTS: {{e}}")
traceback.print_exc()
return False
else:
print(f"TTS provider {{provider}} not available.")
return False
@ -473,6 +544,9 @@ else:
elif provider == "coqui":
error_message += " - Run: pip install TTS\n"
error_message += " - This may require additional dependencies based on your system\n"
elif provider == "google_cloud_tts":
error_message += " - Run: pip install google-cloud-texttospeech\n"
error_message += " - Ensure GOOGLE_APPLICATION_CREDENTIALS environment variable is set correctly.\n"
error_message += "2. Restart the bot after installing the packages\n"
@ -553,19 +627,31 @@ else:
except Exception as e:
espeak_version = f"Error checking: {str(e)}"
# Check for Google Cloud TTS
gcloud_tts_available = importlib.util.find_spec("google.cloud.texttospeech") is not None
gcloud_tts_version = "Not installed"
if gcloud_tts_available:
try:
import google.cloud.texttospeech as gcloud_tts_module
gcloud_tts_version = getattr(gcloud_tts_module, "__version__", "Unknown version")
except Exception as e:
gcloud_tts_version = f"Error importing: {str(e)}"
# Create a report
report = "**TTS Libraries Status:**\n"
report += f"- Google TTS (gtts): {gtts_version}\n"
report += f"- pyttsx3: {pyttsx3_version}\n"
report += f"- Coqui TTS: {coqui_version}\n"
report += f"- eSpeak-NG: {espeak_version}\n\n"
report += f"- eSpeak-NG: {espeak_version}\n"
report += f"- Google Cloud TTS: {gcloud_tts_version}\n\n"
# Add installation instructions
report += "**Installation Instructions:**\n"
report += "- Google TTS: `pip install gtts`\n"
report += "- pyttsx3: `pip install pyttsx3`\n"
report += "- Coqui TTS: `pip install TTS`\n"
report += "- eSpeak-NG: Install from https://github.com/espeak-ng/espeak-ng/releases\n\n"
report += "- eSpeak-NG: Install from https://github.com/espeak-ng/espeak-ng/releases\n"
report += "- Google Cloud TTS: `pip install google-cloud-texttospeech` (ensure `GOOGLE_APPLICATION_CREDENTIALS` is set)\n\n"
report += "After installing, restart the bot for the changes to take effect."

View File

@ -37,7 +37,9 @@ from .commands import setup_commands
from .listeners import (
on_ready_listener, on_message_listener, on_reaction_add_listener,
on_reaction_remove_listener, on_guild_join_listener, # Added on_guild_join_listener
on_guild_emojis_update_listener, on_guild_stickers_update_listener # Added emoji/sticker update listeners
on_guild_emojis_update_listener, on_guild_stickers_update_listener, # Added emoji/sticker update listeners
on_voice_transcription_received_listener, # Added voice transcription listener
on_voice_state_update_listener # Added voice state update listener
)
from . import api # Import api to access generate_image_description
from . import config as GurtConfig
@ -230,7 +232,17 @@ class GurtCog(commands.Cog, name="Gurt"): # Added explicit Cog name
async def on_guild_stickers_update(guild, before, after):
await on_guild_stickers_update_listener(self, guild, before, after)
print("GurtCog: Additional guild event listeners added.")
# Listener for voice transcriptions
@self.bot.event
async def on_voice_transcription_received(guild: discord.Guild, user: discord.Member, text: str):
# This event is dispatched by VoiceGatewayCog
await on_voice_transcription_received_listener(self, guild, user, text)
@self.bot.event
async def on_voice_state_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
await on_voice_state_update_listener(self, member, before, after)
print("GurtCog: Additional guild, custom, and voice state event listeners added.")
# Start background task
if self.background_task is None or self.background_task.done():

View File

@ -1719,6 +1719,51 @@ def create_tools_list():
)
)
# --- Voice Channel Tools ---
tool_declarations.append(
FunctionDeclaration(
name="join_voice_channel",
description="Connects GURT to a specified voice channel by its ID. GURT will automatically start listening for speech in this channel once connected. Use get_channel_id to find the ID if you only have the name.",
parameters={
"type": "object",
"properties": {
"channel_id": {"type": "string", "description": "The ID of the voice channel to join."}
},
"required": ["channel_id"]
}
)
)
tool_declarations.append(
FunctionDeclaration(
name="leave_voice_channel",
description="Disconnects GURT from its current voice channel.",
parameters={ # No parameters needed, but schema requires an object
"type": "object",
"properties": {},
"required": []
}
)
)
tool_declarations.append(
FunctionDeclaration(
name="speak_in_voice_channel",
description="Converts the given text to speech and plays it in GURT's current voice channel. If GURT is not in a voice channel, this tool will indicate an error. The bot will choose a suitable TTS provider automatically if none is specified.",
parameters={
"type": "object",
"properties": {
"text_to_speak": {"type": "string", "description": "The text GURT should say."},
"tts_provider": {
"type": "string",
"description": "Optional. Specify a TTS provider. If omitted, a default will be used.",
"enum": ["gtts", "pyttsx3", "coqui", "espeak", "google_cloud_tts"]
}
},
"required": ["text_to_speak"]
}
)
)
# --- End Voice Channel Tools ---
return tool_declarations
# Initialize TOOLS list, handling potential ImportError if library not installed

View File

@ -725,3 +725,308 @@ async def on_guild_stickers_update_listener(cog: 'GurtCog', guild: discord.Guild
await asyncio.gather(*tasks, return_exceptions=True)
else:
print(f"No new or significantly changed stickers to process in guild {guild.name}")
async def on_voice_transcription_received_listener(cog: 'GurtCog', guild: discord.Guild, user: discord.Member, text: str):
"""Listener for transcribed voice messages."""
from .api import get_ai_response # For processing the text
from .utils import format_message, simulate_human_typing # For creating pseudo-message and sending response
from .config import IGNORED_CHANNEL_IDS # To respect ignored channels if applicable
print(f"Voice transcription received from {user.name} ({user.id}) in {guild.name}: '{text}'")
# Avoid processing if user is a bot (including GURT itself if its speech gets transcribed)
if user.bot:
print(f"Skipping voice transcription from bot user: {user.name}")
return
# Determine a relevant text channel for context and potential text responses.
# This is a simplification; a more robust solution might track last active text channel per user/guild.
# For now, try to use a "general" or the first available text channel in the guild.
# Or, if GURT is in a voice channel, it might have an associated text channel.
# This part needs careful consideration for the best UX.
# Try to find a suitable text channel in the guild.
# This logic might need to be more sophisticated, e.g. last active channel for the user.
text_channel = None
if guild:
# Prefer system channel or a common channel name
if guild.system_channel and guild.system_channel.permissions_for(guild.me).send_messages:
text_channel = guild.system_channel
else:
for channel in guild.text_channels:
if channel.name.lower() in ["general", "chat", "lounge", "discussion"] and channel.permissions_for(guild.me).send_messages:
text_channel = channel
break
if not text_channel and guild.text_channels: # Fallback to first available text channel
text_channel = guild.text_channels[0]
if not text_channel:
print(f"Could not find a suitable text channel in guild {guild.name} for voice transcription context. Aborting.")
return
# Check if this pseudo-channel context should be ignored
if text_channel.id in IGNORED_CHANNEL_IDS:
print(f"Skipping voice transcription as target context channel {text_channel.name} ({text_channel.id}) is ignored.")
return
# Construct a pseudo-message object or dictionary
# This needs to be compatible with what get_ai_response and format_message expect.
# We'll create a dictionary similar to what format_message would produce.
# Create a mock discord.Message object for format_message and get_ai_response
# This is a bit hacky but helps reuse existing logic.
class PseudoMessage:
def __init__(self, author, content, channel, guild_obj, created_at, id_val):
self.author = author
self.content = content
self.channel = channel
self.guild = guild_obj
self.created_at = created_at
self.id = id_val # Needs a unique ID, timestamp can work
self.reference = None # No reply context for voice
self.attachments = []
self.embeds = []
self.stickers = []
self.reactions = []
self.mentions = [] # Could parse mentions from text if needed
self.mention_everyone = "@everyone" in content
self.role_mentions = [] # Could parse role mentions
self.channel_mentions = [] # Could parse channel mentions
self.flags = discord.MessageFlags._from_value(0) # Default flags
self.type = discord.MessageType.default
self.pinned = False
self.tts = False
self.system_content = ""
self.activity = None
self.application = None
self.components = []
self.interaction = None
self.webhook_id = None
self.jump_url = f"https://discord.com/channels/{guild.id}/{channel.id}/{id_val}" # Approximate
def to_reference(self, fail_if_not_exists: bool = True): # Add fail_if_not_exists
return discord.MessageReference(message_id=self.id, channel_id=self.channel.id, guild_id=self.guild.id, fail_if_not_exists=fail_if_not_exists)
pseudo_msg_id = int(time.time() * 1000000) # Create a somewhat unique ID
pseudo_message_obj = PseudoMessage(
author=user,
content=text,
channel=text_channel, # Use the determined text channel for context
guild_obj=guild,
created_at=discord.utils.utcnow(),
id_val=pseudo_msg_id
)
# Update cog's current_channel for the context of this interaction
original_current_channel = cog.current_channel
cog.current_channel = text_channel
try:
# Process the transcribed text as if it were a regular message
# The get_ai_response function will handle tool calls, including speak_in_voice_channel
print(f"Processing transcribed text from {user.name} via get_ai_response: '{text}'")
response_dict, sticker_ids_to_send = await get_ai_response(cog, pseudo_message_obj)
final_response_data = response_dict.get("final_response")
error_msg = response_dict.get("error")
if error_msg:
print(f"Error from AI processing voice transcription: {error_msg}")
# Decide if GURT should say something about the error in voice
# For now, just log it.
return
if final_response_data and final_response_data.get("should_respond"):
response_text = final_response_data.get("content", "")
# If GURT is in a voice channel in this guild, it might have already decided to speak
# via a tool call within get_ai_response (if speak_in_voice_channel was called).
# If not, and there's text, we could make it speak here as a fallback,
# but it's better if the AI decides to use the speak_in_voice_channel tool.
# If there's also a text component to send to the text_channel:
if response_text: # Only send if there's actual text content
# This part is simplified; a more robust solution would reuse the
# send_response_content helper from on_message_listener if possible,
# or adapt its logic here.
try:
# Simulate typing if sending to text channel
async with text_channel.typing():
await simulate_human_typing(cog, text_channel, response_text)
sent_text_msg = await text_channel.send(response_text)
print(f"Sent text response to {text_channel.name} for voice transcription: '{response_text[:50]}...'")
# Cache GURT's text response
bot_response_cache_entry = format_message(cog, sent_text_msg)
cog.message_cache['by_channel'][text_channel.id].append(bot_response_cache_entry)
cog.message_cache['global_recent'].append(bot_response_cache_entry)
cog.bot_last_spoke[text_channel.id] = time.time()
except Exception as send_err:
print(f"Error sending text response for voice transcription: {send_err}")
# Handle reactions if any (similar to on_message)
emoji_to_react = final_response_data.get("react_with_emoji")
if emoji_to_react and isinstance(emoji_to_react, str):
# React to the pseudo_message or a real message if one was sent?
# For simplicity, let's assume reaction isn't the primary mode for voice.
print(f"Voice transcription AI suggested reaction: {emoji_to_react} (currently not implemented for voice-originated interactions)")
except Exception as e:
print(f"Error in on_voice_transcription_received_listener: {e}")
import traceback
traceback.print_exc()
finally:
cog.current_channel = original_current_channel # Restore original current_channel
async def on_voice_state_update_listener(cog: 'GurtCog', member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
"""Listener for voice state updates (e.g., user joining/leaving VC)."""
from .config import IGNORED_CHANNEL_IDS # To respect ignored channels if applicable
# We need access to tools, so we'd call them via cog.bot.get_cog("Gurt").tool_name or similar
# For now, let's assume tools are called through a helper or directly if GurtCog has them.
# This listener might trigger GURT to use join_voice_channel or leave_voice_channel tools.
if member.bot: # Ignore bots, including GURT itself
return
guild = member.guild
gurt_vc = guild.voice_client if guild else None
# Scenario 1: User joins a voice channel
if not before.channel and after.channel:
print(f"User {member.name} joined voice channel {after.channel.name} in guild {guild.name}")
# Conditions for GURT to consider auto-joining:
# 1. GURT is not already in a voice channel in this guild OR is in the same channel.
# 2. The user who joined is someone GURT is actively interacting with or has high relationship.
# 3. The target voice channel is not an ignored context.
if after.channel.id in IGNORED_CHANNEL_IDS: # Or some other form of channel permission check
print(f"GURT will not auto-join {after.channel.name} as it's an ignored/restricted context.")
return
# Check if GURT should consider joining this user
# Simple check: is user in recent conversation participants?
is_interacting_user = False
if guild.id in cog.active_conversations:
if member.id in cog.active_conversations[guild.id]['participants']:
is_interacting_user = True
# More advanced: check relationship score
# relationship_score = cog.user_relationships.get(str(min(member.id, cog.bot.user.id)), {}).get(str(max(member.id, cog.bot.user.id)), 0.0)
# if relationship_score > SOME_THRESHOLD: is_interacting_user = True
if not is_interacting_user:
print(f"User {member.name} joined VC, but GURT is not actively interacting with them. No auto-join.")
return
# If GURT is already in a VC in this guild but it's a *different* channel
if gurt_vc and gurt_vc.is_connected() and gurt_vc.channel != after.channel:
print(f"GURT is already in {gurt_vc.channel.name}. Not auto-joining {member.name} in {after.channel.name} for now.")
# Future: Could ask LLM if it should move.
return
# If GURT is not in a VC in this guild, or is in the same one (but not listening perhaps)
if not gurt_vc or not gurt_vc.is_connected() or gurt_vc.channel != after.channel :
print(f"GURT considering auto-joining {member.name} in {after.channel.name}.")
# Here, GURT's "brain" (LLM or simpler logic) would decide.
# For simplicity, let's make it auto-join if the above conditions are met.
# This would use the `join_voice_channel` tool.
# The tool itself is async and defined in gurt/tools.py
# To call a tool, we'd typically go through the AI's tool-using mechanism.
# For an autonomous action, GURT's core logic would invoke the tool.
# This listener is part of that core logic.
# We need the GurtCog instance to call its methods or access tools.
# The `cog` parameter *is* the GurtCog instance.
gurt_tool_cog = cog # The GurtCog instance itself
if hasattr(gurt_tool_cog, 'TOOL_MAPPING') and "join_voice_channel" in gurt_tool_cog.TOOL_MAPPING:
join_tool_func = gurt_tool_cog.TOOL_MAPPING["join_voice_channel"]
print(f"Attempting to auto-join VC {after.channel.id} for user {member.name}")
try:
# The tool function expects `cog` as its first arg, then params.
# We pass `gurt_tool_cog` (which is `self` if this were a cog method)
# and then the arguments for the tool.
tool_result = await join_tool_func(gurt_tool_cog, channel_id=str(after.channel.id))
if tool_result.get("status") == "success":
print(f"GURT successfully auto-joined {member.name} in {after.channel.name}.")
# Optionally, GURT could say "Hey [user], I'm here!"
if "speak_in_voice_channel" in gurt_tool_cog.TOOL_MAPPING:
speak_tool_func = gurt_tool_cog.TOOL_MAPPING["speak_in_voice_channel"]
await speak_tool_func(gurt_tool_cog, text_to_speak=f"Hey {member.display_name}, I saw you joined so I came too!")
else:
print(f"GURT auto-join failed: {tool_result.get('error')}")
except Exception as e:
print(f"Error during GURT auto-join attempt: {e}")
else:
print("join_voice_channel tool not found in GURT's TOOL_MAPPING.")
# Scenario 2: User leaves a voice channel GURT is in
elif before.channel and not after.channel:
# User disconnected from all VCs or was moved out by admin
print(f"User {member.name} left voice channel {before.channel.name} in guild {guild.name}")
if gurt_vc and gurt_vc.is_connected() and gurt_vc.channel == before.channel:
# Check if GURT is now alone in the channel
if len(gurt_vc.channel.members) == 1 and gurt_vc.channel.members[0] == guild.me:
print(f"GURT is now alone in {gurt_vc.channel.name}. Auto-leaving.")
gurt_tool_cog = cog
if hasattr(gurt_tool_cog, 'TOOL_MAPPING') and "leave_voice_channel" in gurt_tool_cog.TOOL_MAPPING:
leave_tool_func = gurt_tool_cog.TOOL_MAPPING["leave_voice_channel"]
try:
tool_result = await leave_tool_func(gurt_tool_cog)
if tool_result.get("status") == "success":
print(f"GURT successfully auto-left {before.channel.name}.")
else:
print(f"GURT auto-leave failed: {tool_result.get('error')}")
except Exception as e:
print(f"Error during GURT auto-leave attempt: {e}")
else:
print("leave_voice_channel tool not found in GURT's TOOL_MAPPING.")
# Scenario 3: User moves between voice channels
elif before.channel and after.channel and before.channel != after.channel:
print(f"User {member.name} moved from {before.channel.name} to {after.channel.name} in guild {guild.name}")
# If GURT was in the `before.channel` with the user, and is now alone, it might leave.
if gurt_vc and gurt_vc.is_connected() and gurt_vc.channel == before.channel:
if len(gurt_vc.channel.members) == 1 and gurt_vc.channel.members[0] == guild.me:
print(f"GURT is now alone in {before.channel.name} after {member.name} moved. Auto-leaving.")
# (Same auto-leave logic as above)
gurt_tool_cog = cog
if hasattr(gurt_tool_cog, 'TOOL_MAPPING') and "leave_voice_channel" in gurt_tool_cog.TOOL_MAPPING:
leave_tool_func = gurt_tool_cog.TOOL_MAPPING["leave_voice_channel"]
await leave_tool_func(gurt_tool_cog) # Fire and forget for now
# If GURT is not in a VC, or was not in the user's new VC, and user is interacting, consider joining `after.channel`
# This logic is similar to Scenario 1.
if after.channel.id not in IGNORED_CHANNEL_IDS:
is_interacting_user = False
if guild.id in cog.active_conversations:
if member.id in cog.active_conversations[guild.id]['participants']:
is_interacting_user = True
if is_interacting_user:
if not gurt_vc or not gurt_vc.is_connected() or gurt_vc.channel != after.channel:
print(f"GURT considering auto-joining {member.name} in their new channel {after.channel.name}.")
gurt_tool_cog = cog
if hasattr(gurt_tool_cog, 'TOOL_MAPPING') and "join_voice_channel" in gurt_tool_cog.TOOL_MAPPING:
join_tool_func = gurt_tool_cog.TOOL_MAPPING["join_voice_channel"]
try:
tool_result = await join_tool_func(gurt_tool_cog, channel_id=str(after.channel.id))
if tool_result.get("status") == "success":
print(f"GURT successfully auto-joined {member.name} in {after.channel.name} after they moved.")
if "speak_in_voice_channel" in gurt_tool_cog.TOOL_MAPPING:
speak_tool_func = gurt_tool_cog.TOOL_MAPPING["speak_in_voice_channel"]
await speak_tool_func(gurt_tool_cog, text_to_speak=f"Found you, {member.display_name}!")
else:
print(f"GURT auto-join (move) failed: {tool_result.get('error')}")
except Exception as e:
print(f"Error during GURT auto-join (move) attempt: {e}")
else:
print("join_voice_channel tool not found for auto-join (move).")

View File

@ -2950,6 +2950,143 @@ TOOL_MAPPING = {
"get_user_highest_role_color": get_user_highest_role_color,
}
# --- Voice Channel Tools ---
async def join_voice_channel(cog: commands.Cog, channel_id: str) -> Dict[str, Any]:
"""Connects GURT to a specified voice channel by its ID. GURT will automatically start listening for speech in this channel once connected. Use get_channel_id to find the ID if you only have the name."""
print(f"Executing join_voice_channel tool for channel ID: {channel_id}.")
voice_gateway_cog = cog.bot.get_cog("VoiceGatewayCog")
if not voice_gateway_cog:
return {"status": "error", "error": "VoiceGatewayCog not loaded."}
if not hasattr(voice_gateway_cog, 'connect_to_voice'):
return {"status": "error", "error": "VoiceGatewayCog is missing 'connect_to_voice' method."}
try:
channel_id_int = int(channel_id)
channel = cog.bot.get_channel(channel_id_int)
if not channel:
# Try fetching if not in cache
channel = await cog.bot.fetch_channel(channel_id_int)
if not channel or not isinstance(channel, discord.VoiceChannel):
return {"status": "error", "error": f"Voice channel {channel_id} not found or is not a voice channel."}
vc, message = await voice_gateway_cog.connect_to_voice(channel)
if vc:
return {"status": "success", "message": message, "channel_id": str(vc.channel.id), "channel_name": vc.channel.name}
else:
return {"status": "error", "error": message, "channel_id": channel_id}
except ValueError:
return {"status": "error", "error": f"Invalid channel ID format: {channel_id}."}
except discord.NotFound:
return {"status": "error", "error": f"Channel {channel_id} not found."}
except Exception as e:
error_message = f"Unexpected error in join_voice_channel: {str(e)}"
print(error_message)
traceback.print_exc()
return {"status": "error", "error": error_message}
async def leave_voice_channel(cog: commands.Cog) -> Dict[str, Any]:
"""Disconnects GURT from its current voice channel."""
print("Executing leave_voice_channel tool.")
voice_gateway_cog = cog.bot.get_cog("VoiceGatewayCog")
if not voice_gateway_cog:
return {"status": "error", "error": "VoiceGatewayCog not loaded."}
if not hasattr(voice_gateway_cog, 'disconnect_from_voice'):
return {"status": "error", "error": "VoiceGatewayCog is missing 'disconnect_from_voice' method."}
if not cog.current_channel or not cog.current_channel.guild:
# This tool implies a guild context for voice_client
# However, GURT might be in a VC without a current_channel if joined autonomously
# Let's try to find a guild GURT is in a VC in.
active_vc_guild = None
for vc in cog.bot.voice_clients:
if vc.is_connected(): # Found one
active_vc_guild = vc.guild
break
if not active_vc_guild:
return {"status": "error", "error": "GURT is not currently in any voice channel or guild context is unclear."}
guild_to_leave = active_vc_guild
else:
guild_to_leave = cog.current_channel.guild
if not guild_to_leave:
return {"status": "error", "error": "Could not determine the guild to leave voice from."}
success, message = await voice_gateway_cog.disconnect_from_voice(guild_to_leave)
if success:
return {"status": "success", "message": message}
else:
return {"status": "error", "error": message}
async def speak_in_voice_channel(cog: commands.Cog, text_to_speak: str, tts_provider: Optional[str] = None) -> Dict[str, Any]:
"""Converts the given text to speech and plays it in GURT's current voice channel. If GURT is not in a voice channel, this tool will indicate an error. The bot will choose a suitable TTS provider automatically if none is specified."""
print(f"Executing speak_in_voice_channel: Text='{text_to_speak[:50]}...', Provider={tts_provider}")
# Determine which voice client to use
# Prefer current_channel's guild if available and bot is in VC there
active_vc = None
if cog.current_channel and cog.current_channel.guild:
if cog.current_channel.guild.voice_client and cog.current_channel.guild.voice_client.is_connected():
active_vc = cog.current_channel.guild.voice_client
# If not found via current_channel, check all bot's voice_clients
if not active_vc:
if cog.bot.voice_clients:
active_vc = cog.bot.voice_clients[0] # Use the first available one
else:
return {"status": "error", "error": "GURT is not currently in any voice channel."}
if not active_vc or not active_vc.is_connected():
return {"status": "error", "error": "GURT is not connected to a voice channel."}
tts_cog = cog.bot.get_cog("TTSProviderCog")
if not tts_cog:
return {"status": "error", "error": "TTSProviderCog not loaded."}
if not hasattr(tts_cog, 'generate_tts_directly'):
return {"status": "error", "error": "TTSProviderCog is missing 'generate_tts_directly' method."}
voice_gateway_cog = cog.bot.get_cog("VoiceGatewayCog")
if not voice_gateway_cog:
return {"status": "error", "error": "VoiceGatewayCog not loaded."}
if not hasattr(voice_gateway_cog, 'play_audio_file'):
return {"status": "error", "error": "VoiceGatewayCog is missing 'play_audio_file' method."}
# Determine TTS provider
chosen_provider = tts_provider
if not chosen_provider:
# You might want a default provider from config, e.g., cog.config.default_tts_provider
# For now, let TTSProviderCog handle it or pick one like 'google_cloud_tts' if available
# Let's try 'google_cloud_tts' first if available, then 'gtts'
if importlib.util.find_spec("google.cloud.texttospeech"):
chosen_provider = "google_cloud_tts"
elif importlib.util.find_spec("gtts"):
chosen_provider = "gtts"
else: # Fallback to first available or error
# This logic could be more sophisticated in TTSProviderCog itself
return {"status": "error", "error": "No suitable default TTS provider found or configured."}
print(f"No TTS provider specified, defaulting to: {chosen_provider}")
success, audio_path_or_error = await tts_cog.generate_tts_directly(provider=chosen_provider, text=text_to_speak)
if not success:
return {"status": "error", "error": f"TTS generation failed: {audio_path_or_error}"}
audio_file_path = audio_path_or_error
play_success, play_message = await voice_gateway_cog.play_audio_file(active_vc, audio_file_path)
if play_success:
return {"status": "success", "message": play_message, "text_spoken": text_to_speak, "provider_used": chosen_provider}
else:
# TTSProviderCog's cleanup should handle the audio_file_path if play fails
return {"status": "error", "error": f"Failed to play audio: {play_message}"}
# --- End Voice Channel Tools ---
# --- List Files Tool ---
async def list_files_tool(cog: commands.Cog, path: str, recursive: bool = False) -> Dict[str, Any]:
"""Lists files and directories within a specified path."""
@ -3252,3 +3389,6 @@ async def send_tenor_gif(cog: commands.Cog, query: str, limit: int = 8) -> Dict[
TOOL_MAPPING["search_tenor_gifs"] = tool_search_tenor_gifs
TOOL_MAPPING["send_tenor_gif"] = send_tenor_gif
TOOL_MAPPING["list_files"] = list_files_tool
TOOL_MAPPING["join_voice_channel"] = join_voice_channel
TOOL_MAPPING["leave_voice_channel"] = leave_voice_channel
TOOL_MAPPING["speak_in_voice_channel"] = speak_in_voice_channel