Add guild_permissions property and tests (#97)

This commit is contained in:
Slipstream 2025-06-15 18:55:52 -06:00 committed by GitHub
parent f5f8f6908c
commit 80f64c1f73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 278 additions and 160 deletions

View File

@ -2,12 +2,12 @@
Data models for Discord objects.
"""
import asyncio
import datetime
import io
import json
import os
import re
import asyncio
import datetime
import io
import json
import os
import re
from dataclasses import dataclass
from typing import (
Any,
@ -123,8 +123,8 @@ class Message:
self.guild_id: Optional[str] = data.get("guild_id")
self.author: User = User(data["author"], client_instance)
self.content: str = data["content"]
self.timestamp: str = data["timestamp"]
self.edited_timestamp: Optional[str] = data.get("edited_timestamp")
self.timestamp: str = data["timestamp"]
self.edited_timestamp: Optional[str] = data.get("edited_timestamp")
if data.get("components"):
self.components: Optional[List[ActionRow]] = [
ActionRow.from_dict(c, client_instance)
@ -149,26 +149,26 @@ class Message:
return f"https://discord.com/channels/{guild_or_dm}/{self.channel_id}/{self.id}"
@property
def clean_content(self) -> str:
"""Returns message content without user, role, or channel mentions."""
pattern = re.compile(r"<@!?\d+>|<#\d+>|<@&\d+>")
cleaned = pattern.sub("", self.content)
return " ".join(cleaned.split())
@property
def created_at(self) -> datetime.datetime:
"""Return message timestamp as a :class:`~datetime.datetime`."""
return datetime.datetime.fromisoformat(self.timestamp)
@property
def edited_at(self) -> Optional[datetime.datetime]:
"""Return edited timestamp as :class:`~datetime.datetime` if present."""
if self.edited_timestamp is None:
return None
return datetime.datetime.fromisoformat(self.edited_timestamp)
def clean_content(self) -> str:
"""Returns message content without user, role, or channel mentions."""
pattern = re.compile(r"<@!?\d+>|<#\d+>|<@&\d+>")
cleaned = pattern.sub("", self.content)
return " ".join(cleaned.split())
@property
def created_at(self) -> datetime.datetime:
"""Return message timestamp as a :class:`~datetime.datetime`."""
return datetime.datetime.fromisoformat(self.timestamp)
@property
def edited_at(self) -> Optional[datetime.datetime]:
"""Return edited timestamp as :class:`~datetime.datetime` if present."""
if self.edited_timestamp is None:
return None
return datetime.datetime.fromisoformat(self.edited_timestamp)
async def pin(self) -> None:
"""|coro|
@ -193,17 +193,17 @@ class Message:
HTTPException
Unpinning the message failed.
"""
await self._client._http.unpin_message(self.channel_id, self.id)
self.pinned = False
async def crosspost(self) -> "Message":
"""|coro|
Crossposts this message to all follower channels and return the resulting message.
"""
data = await self._client._http.crosspost_message(self.channel_id, self.id)
return self._client.parse_message(data)
await self._client._http.unpin_message(self.channel_id, self.id)
self.pinned = False
async def crosspost(self) -> "Message":
"""|coro|
Crossposts this message to all follower channels and return the resulting message.
"""
data = await self._client._http.crosspost_message(self.channel_id, self.id)
return self._client.parse_message(data)
async def reply(
self,
@ -833,13 +833,13 @@ class Member(User): # Member inherits from User
self.deaf: bool = data.get("deaf", False)
self.mute: bool = data.get("mute", False)
self.pending: bool = data.get("pending", False)
self.permissions: Optional[str] = data.get(
"permissions"
) # Permissions in the channel, if applicable
self.communication_disabled_until: Optional[str] = data.get(
"communication_disabled_until"
) # ISO8601 timestamp
self.voice_state = data.get("voice_state")
self.permissions: Optional[str] = data.get(
"permissions"
) # Permissions in the channel, if applicable
self.communication_disabled_until: Optional[str] = data.get(
"communication_disabled_until"
) # ISO8601 timestamp
self.voice_state = data.get("voice_state")
# If 'user' object was present, ensure User attributes are from there
if user_data:
@ -918,15 +918,42 @@ class Member(User): # Member inherits from User
if not role_objects:
return None
return max(role_objects, key=lambda r: r.position)
@property
def voice(self) -> Optional["VoiceState"]:
"""Return the member's cached voice state as a :class:`VoiceState`."""
if self.voice_state is None:
return None
return VoiceState.from_dict(self.voice_state)
return max(role_objects, key=lambda r: r.position)
@property
def guild_permissions(self) -> "Permissions":
"""Return the member's guild-level permissions."""
if not self.guild_id or not self._client:
return Permissions(0)
guild = self._client.get_guild(self.guild_id)
if guild is None:
return Permissions(0)
base = Permissions(0)
everyone = guild.get_role(guild.id)
if everyone is not None:
base |= Permissions(int(everyone.permissions))
for rid in self.roles:
role = guild.get_role(rid)
if role is not None:
base |= Permissions(int(role.permissions))
if base & Permissions.ADMINISTRATOR:
return Permissions(~0)
return base
def voice(self) -> Optional["VoiceState"]:
"""Return the member's cached voice state as a :class:`VoiceState`."""
if self.voice_state is None:
return None
return VoiceState.from_dict(self.voice_state)
class PartialEmoji:
@ -1378,30 +1405,30 @@ class Guild:
)
member_data = await asyncio.wait_for(future, timeout=60.0)
return [Member(m, self._client) for m in member_data]
except asyncio.TimeoutError:
if nonce in self._client._gateway._member_chunk_requests:
del self._client._gateway._member_chunk_requests[nonce]
raise
async def prune_members(self, days: int, *, compute_count: bool = True) -> int:
"""|coro| Remove inactive members from the guild.
Parameters
----------
days: int
Number of days of inactivity required to be pruned.
compute_count: bool
Whether to return the number of members pruned.
Returns
-------
int
The number of members pruned.
"""
return await self._client._http.begin_guild_prune(
self.id, days=days, compute_count=compute_count
)
except asyncio.TimeoutError:
if nonce in self._client._gateway._member_chunk_requests:
del self._client._gateway._member_chunk_requests[nonce]
raise
async def prune_members(self, days: int, *, compute_count: bool = True) -> int:
"""|coro| Remove inactive members from the guild.
Parameters
----------
days: int
Number of days of inactivity required to be pruned.
compute_count: bool
Whether to return the number of members pruned.
Returns
-------
int
The number of members pruned.
"""
return await self._client._http.begin_guild_prune(
self.id, days=days, compute_count=compute_count
)
async def create_text_channel(
self,
@ -1688,11 +1715,11 @@ class TextChannel(Channel, Messageable):
messages_data = await self._client._http.get_pinned_messages(self.id)
return [self._client.parse_message(m) for m in messages_data]
async def create_thread(
self,
name: str,
*,
type: ChannelType = ChannelType.PUBLIC_THREAD,
async def create_thread(
self,
name: str,
*,
type: ChannelType = ChannelType.PUBLIC_THREAD,
auto_archive_duration: Optional[AutoArchiveDuration] = None,
invitable: Optional[bool] = None,
rate_limit_per_user: Optional[int] = None,
@ -1735,33 +1762,33 @@ class TextChannel(Channel, Messageable):
if rate_limit_per_user is not None:
payload["rate_limit_per_user"] = rate_limit_per_user
data = await self._client._http.start_thread_without_message(self.id, payload)
return cast("Thread", self._client.parse_channel(data))
async def create_invite(
self,
*,
max_age: Optional[int] = None,
max_uses: Optional[int] = None,
temporary: Optional[bool] = None,
unique: Optional[bool] = None,
reason: Optional[str] = None,
) -> "Invite":
"""|coro| Create an invite to this channel."""
payload: Dict[str, Any] = {}
if max_age is not None:
payload["max_age"] = max_age
if max_uses is not None:
payload["max_uses"] = max_uses
if temporary is not None:
payload["temporary"] = temporary
if unique is not None:
payload["unique"] = unique
return await self._client._http.create_channel_invite(
self.id, payload, reason=reason
)
data = await self._client._http.start_thread_without_message(self.id, payload)
return cast("Thread", self._client.parse_channel(data))
async def create_invite(
self,
*,
max_age: Optional[int] = None,
max_uses: Optional[int] = None,
temporary: Optional[bool] = None,
unique: Optional[bool] = None,
reason: Optional[str] = None,
) -> "Invite":
"""|coro| Create an invite to this channel."""
payload: Dict[str, Any] = {}
if max_age is not None:
payload["max_age"] = max_age
if max_uses is not None:
payload["max_uses"] = max_uses
if temporary is not None:
payload["temporary"] = temporary
if unique is not None:
payload["unique"] = unique
return await self._client._http.create_channel_invite(
self.id, payload, reason=reason
)
class VoiceChannel(Channel):
@ -2769,7 +2796,7 @@ class TypingStart:
return f"<TypingStart channel_id='{self.channel_id}' user_id='{self.user_id}'>"
class VoiceStateUpdate:
class VoiceStateUpdate:
"""Represents a VOICE_STATE_UPDATE event."""
def __init__(
@ -2788,53 +2815,53 @@ class VoiceStateUpdate:
self.self_deaf: bool = data.get("self_deaf", False)
self.self_mute: bool = data.get("self_mute", False)
self.self_stream: Optional[bool] = data.get("self_stream")
self.self_video: bool = data.get("self_video", False)
self.suppress: bool = data.get("suppress", False)
def __repr__(self) -> str:
return (
f"<VoiceStateUpdate guild_id='{self.guild_id}' user_id='{self.user_id}' "
f"channel_id='{self.channel_id}'>"
)
@dataclass
class VoiceState:
"""Represents a cached voice state for a member."""
guild_id: Optional[str]
channel_id: Optional[str]
user_id: Optional[str]
session_id: Optional[str]
deaf: bool = False
mute: bool = False
self_deaf: bool = False
self_mute: bool = False
self_stream: Optional[bool] = None
self_video: bool = False
suppress: bool = False
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "VoiceState":
return cls(
guild_id=data.get("guild_id"),
channel_id=data.get("channel_id"),
user_id=data.get("user_id"),
session_id=data.get("session_id"),
deaf=data.get("deaf", False),
mute=data.get("mute", False),
self_deaf=data.get("self_deaf", False),
self_mute=data.get("self_mute", False),
self_stream=data.get("self_stream"),
self_video=data.get("self_video", False),
suppress=data.get("suppress", False),
)
def __repr__(self) -> str:
return (
f"<VoiceState guild_id='{self.guild_id}' user_id='{self.user_id}' "
f"channel_id='{self.channel_id}'>"
)
self.self_video: bool = data.get("self_video", False)
self.suppress: bool = data.get("suppress", False)
def __repr__(self) -> str:
return (
f"<VoiceStateUpdate guild_id='{self.guild_id}' user_id='{self.user_id}' "
f"channel_id='{self.channel_id}'>"
)
@dataclass
class VoiceState:
"""Represents a cached voice state for a member."""
guild_id: Optional[str]
channel_id: Optional[str]
user_id: Optional[str]
session_id: Optional[str]
deaf: bool = False
mute: bool = False
self_deaf: bool = False
self_mute: bool = False
self_stream: Optional[bool] = None
self_video: bool = False
suppress: bool = False
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "VoiceState":
return cls(
guild_id=data.get("guild_id"),
channel_id=data.get("channel_id"),
user_id=data.get("user_id"),
session_id=data.get("session_id"),
deaf=data.get("deaf", False),
mute=data.get("mute", False),
self_deaf=data.get("self_deaf", False),
self_mute=data.get("self_mute", False),
self_stream=data.get("self_stream"),
self_video=data.get("self_video", False),
suppress=data.get("suppress", False),
)
def __repr__(self) -> str:
return (
f"<VoiceState guild_id='{self.guild_id}' user_id='{self.user_id}' "
f"channel_id='{self.channel_id}'>"
)
class Reaction:

View File

@ -1,4 +1,21 @@
from disagreement.models import Member
import pytest # pylint: disable=E0401
from disagreement.client import Client
from disagreement.enums import (
VerificationLevel,
MessageNotificationLevel,
ExplicitContentFilterLevel,
MFALevel,
GuildNSFWLevel,
PremiumTier,
)
from disagreement.models import Member, Guild, Role
from disagreement.permissions import Permissions
class DummyClient(Client):
def __init__(self):
super().__init__(token="test")
def _make_member(member_id: str, username: str, nick: str | None):
@ -12,6 +29,58 @@ def _make_member(member_id: str, username: str, nick: str | None):
return Member(data, client_instance=None)
def _base_guild(client: Client) -> Guild:
data = {
"id": "1",
"name": "g",
"owner_id": "1",
"afk_timeout": 60,
"verification_level": VerificationLevel.NONE.value,
"default_message_notifications": MessageNotificationLevel.ALL_MESSAGES.value,
"explicit_content_filter": ExplicitContentFilterLevel.DISABLED.value,
"roles": [],
"emojis": [],
"features": [],
"mfa_level": MFALevel.NONE.value,
"system_channel_flags": 0,
"premium_tier": PremiumTier.NONE.value,
"nsfw_level": GuildNSFWLevel.DEFAULT.value,
}
guild = Guild(data, client_instance=client)
client._guilds.set(guild.id, guild)
return guild
def _role(guild: Guild, rid: str, perms: Permissions) -> Role:
role = Role(
{
"id": rid,
"name": f"r{rid}",
"color": 0,
"hoist": False,
"position": 0,
"permissions": str(int(perms)),
"managed": False,
"mentionable": False,
}
)
guild.roles.append(role)
return role
def _member(guild: Guild, client: Client, *roles: Role) -> Member:
data = {
"user": {"id": "10", "username": "u", "discriminator": "0001"},
"joined_at": "t",
"roles": [r.id for r in roles] or [guild.id],
}
member = Member(data, client_instance=client)
member.guild_id = guild.id
member._client = client
guild._members.set(member.id, member)
return member
def test_display_name_prefers_nick():
member = _make_member("1", "u", "nickname")
assert member.display_name == "nickname"
@ -20,3 +89,25 @@ def test_display_name_prefers_nick():
def test_display_name_falls_back_to_username():
member = _make_member("2", "u2", None)
assert member.display_name == "u2"
def test_guild_permissions_from_roles():
client = DummyClient()
guild = _base_guild(client)
everyone = _role(guild, guild.id, Permissions.VIEW_CHANNEL)
mod = _role(guild, "2", Permissions.MANAGE_MESSAGES)
member = _member(guild, client, everyone, mod)
perms = member.guild_permissions
assert perms & Permissions.VIEW_CHANNEL
assert perms & Permissions.MANAGE_MESSAGES
assert not perms & Permissions.BAN_MEMBERS
def test_guild_permissions_administrator_role_grants_all():
client = DummyClient()
guild = _base_guild(client)
admin = _role(guild, "2", Permissions.ADMINISTRATOR)
member = _member(guild, client, admin)
assert member.guild_permissions == Permissions(~0)