discordbot/api_service/code_verifier_store.py
2025-05-03 15:33:40 -06:00

101 lines
3.3 KiB
Python

"""
Code verifier store for the API service.
This module provides a simple in-memory store for code verifiers used in the OAuth flow.
It also includes a file-based backup to ensure code verifiers persist across restarts.
"""
import os
import json
import time
from typing import Dict, Optional, Any
# In-memory storage for code verifiers
code_verifiers: Dict[str, Dict[str, Any]] = {}
# File path for persistent storage
STORAGE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
STORAGE_FILE = os.path.join(STORAGE_DIR, "code_verifiers.json")
# Ensure the storage directory exists
os.makedirs(STORAGE_DIR, exist_ok=True)
def _load_from_file() -> None:
"""Load code verifiers from file."""
try:
if os.path.exists(STORAGE_FILE):
with open(STORAGE_FILE, 'r') as f:
stored_data = json.load(f)
# Filter out expired entries (older than 10 minutes)
current_time = time.time()
for state, data in stored_data.items():
if data.get("timestamp", 0) + 600 > current_time: # 10 minutes = 600 seconds
code_verifiers[state] = data
print(f"Loaded {len(code_verifiers)} valid code verifiers from file")
except Exception as e:
print(f"Error loading code verifiers from file: {e}")
def _save_to_file() -> None:
"""Save code verifiers to file."""
try:
with open(STORAGE_FILE, 'w') as f:
json.dump(code_verifiers, f)
print(f"Saved {len(code_verifiers)} code verifiers to file")
except Exception as e:
print(f"Error saving code verifiers to file: {e}")
# Load existing code verifiers on module import
_load_from_file()
def store_code_verifier(state: str, code_verifier: str) -> None:
"""Store a code verifier for a state."""
# Store with timestamp for expiration
code_verifiers[state] = {
"code_verifier": code_verifier,
"timestamp": time.time()
}
print(f"Stored code verifier for state {state}: {code_verifier[:10]}...")
# Save to file for persistence
_save_to_file()
def get_code_verifier(state: str) -> Optional[str]:
"""Get the code verifier for a state."""
# Check if state exists and is not expired
data = code_verifiers.get(state)
if data:
# Check if expired (older than 10 minutes)
if data.get("timestamp", 0) + 600 > time.time():
return data.get("code_verifier")
else:
# Remove expired entry
remove_code_verifier(state)
print(f"Code verifier for state {state} has expired")
return None
def remove_code_verifier(state: str) -> None:
"""Remove a code verifier for a state."""
if state in code_verifiers:
del code_verifiers[state]
print(f"Removed code verifier for state {state}")
# Update the file
_save_to_file()
def cleanup_expired() -> None:
"""Remove all expired code verifiers."""
current_time = time.time()
expired_states = []
for state, data in code_verifiers.items():
if data.get("timestamp", 0) + 600 <= current_time: # 10 minutes = 600 seconds
expired_states.append(state)
for state in expired_states:
del code_verifiers[state]
if expired_states:
print(f"Cleaned up {len(expired_states)} expired code verifiers")
_save_to_file()