107 lines
3.4 KiB
Python
107 lines
3.4 KiB
Python
"""
|
|
Code verifier store for the API service.
|
|
|
|
This module provides a simple in-memory store for code verifiers used in the OAuth flow.
|
|
It also includes a file-based backup to ensure code verifiers persist across restarts.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
from typing import Dict, Optional, Any
|
|
|
|
# In-memory storage for code verifiers
|
|
code_verifiers: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# File path for persistent storage
|
|
STORAGE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
|
|
STORAGE_FILE = os.path.join(STORAGE_DIR, "code_verifiers.json")
|
|
|
|
# Ensure the storage directory exists
|
|
os.makedirs(STORAGE_DIR, exist_ok=True)
|
|
|
|
|
|
def _load_from_file() -> None:
|
|
"""Load code verifiers from file."""
|
|
try:
|
|
if os.path.exists(STORAGE_FILE):
|
|
with open(STORAGE_FILE, "r") as f:
|
|
stored_data = json.load(f)
|
|
|
|
# Filter out expired entries (older than 10 minutes)
|
|
current_time = time.time()
|
|
for state, data in stored_data.items():
|
|
if (
|
|
data.get("timestamp", 0) + 600 > current_time
|
|
): # 10 minutes = 600 seconds
|
|
code_verifiers[state] = data
|
|
|
|
print(f"Loaded {len(code_verifiers)} valid code verifiers from file")
|
|
except Exception as e:
|
|
print(f"Error loading code verifiers from file: {e}")
|
|
|
|
|
|
def _save_to_file() -> None:
|
|
"""Save code verifiers to file."""
|
|
try:
|
|
with open(STORAGE_FILE, "w") as f:
|
|
json.dump(code_verifiers, f)
|
|
print(f"Saved {len(code_verifiers)} code verifiers to file")
|
|
except Exception as e:
|
|
print(f"Error saving code verifiers to file: {e}")
|
|
|
|
|
|
# Load existing code verifiers on module import
|
|
_load_from_file()
|
|
|
|
|
|
def store_code_verifier(state: str, code_verifier: str) -> None:
|
|
"""Store a code verifier for a state."""
|
|
# Store with timestamp for expiration
|
|
code_verifiers[state] = {"code_verifier": code_verifier, "timestamp": time.time()}
|
|
print(f"Stored code verifier for state {state}: {code_verifier[:10]}...")
|
|
|
|
# Save to file for persistence
|
|
_save_to_file()
|
|
|
|
|
|
def get_code_verifier(state: str) -> Optional[str]:
|
|
"""Get the code verifier for a state."""
|
|
# Check if state exists and is not expired
|
|
data = code_verifiers.get(state)
|
|
if data:
|
|
# Check if expired (older than 10 minutes)
|
|
if data.get("timestamp", 0) + 600 > time.time():
|
|
return data.get("code_verifier")
|
|
else:
|
|
# Remove expired entry
|
|
remove_code_verifier(state)
|
|
print(f"Code verifier for state {state} has expired")
|
|
return None
|
|
|
|
|
|
def remove_code_verifier(state: str) -> None:
|
|
"""Remove a code verifier for a state."""
|
|
if state in code_verifiers:
|
|
del code_verifiers[state]
|
|
print(f"Removed code verifier for state {state}")
|
|
# Update the file
|
|
_save_to_file()
|
|
|
|
|
|
def cleanup_expired() -> None:
|
|
"""Remove all expired code verifiers."""
|
|
current_time = time.time()
|
|
expired_states = []
|
|
|
|
for state, data in code_verifiers.items():
|
|
if data.get("timestamp", 0) + 600 <= current_time: # 10 minutes = 600 seconds
|
|
expired_states.append(state)
|
|
|
|
for state in expired_states:
|
|
del code_verifiers[state]
|
|
|
|
if expired_states:
|
|
print(f"Cleaned up {len(expired_states)} expired code verifiers")
|
|
_save_to_file()
|