Add webhook send capability (#33)
This commit is contained in:
parent
71097c6fbe
commit
92b0bc5804
@ -421,6 +421,92 @@ class HTTPClient:
|
|||||||
|
|
||||||
await self.request("DELETE", f"/webhooks/{webhook_id}")
|
await self.request("DELETE", f"/webhooks/{webhook_id}")
|
||||||
|
|
||||||
|
async def execute_webhook(
|
||||||
|
self,
|
||||||
|
webhook_id: "Snowflake",
|
||||||
|
token: str,
|
||||||
|
*,
|
||||||
|
content: Optional[str] = None,
|
||||||
|
tts: bool = False,
|
||||||
|
embeds: Optional[List[Dict[str, Any]]] = None,
|
||||||
|
components: Optional[List[Dict[str, Any]]] = None,
|
||||||
|
allowed_mentions: Optional[dict] = None,
|
||||||
|
attachments: Optional[List[Any]] = None,
|
||||||
|
files: Optional[List[Any]] = None,
|
||||||
|
flags: Optional[int] = None,
|
||||||
|
username: Optional[str] = None,
|
||||||
|
avatar_url: Optional[str] = None,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Executes a webhook and returns the created message."""
|
||||||
|
|
||||||
|
payload: Dict[str, Any] = {}
|
||||||
|
if content is not None:
|
||||||
|
payload["content"] = content
|
||||||
|
if tts:
|
||||||
|
payload["tts"] = True
|
||||||
|
if embeds:
|
||||||
|
payload["embeds"] = embeds
|
||||||
|
if components:
|
||||||
|
payload["components"] = components
|
||||||
|
if allowed_mentions:
|
||||||
|
payload["allowed_mentions"] = allowed_mentions
|
||||||
|
if username:
|
||||||
|
payload["username"] = username
|
||||||
|
if avatar_url:
|
||||||
|
payload["avatar_url"] = avatar_url
|
||||||
|
|
||||||
|
all_files: List["File"] = []
|
||||||
|
if attachments is not None:
|
||||||
|
payload["attachments"] = []
|
||||||
|
for a in attachments:
|
||||||
|
if hasattr(a, "data") and hasattr(a, "filename"):
|
||||||
|
idx = len(all_files)
|
||||||
|
all_files.append(a)
|
||||||
|
payload["attachments"].append({"id": idx, "filename": a.filename})
|
||||||
|
else:
|
||||||
|
payload["attachments"].append(
|
||||||
|
a.to_dict() if hasattr(a, "to_dict") else a
|
||||||
|
)
|
||||||
|
if files is not None:
|
||||||
|
for f in files:
|
||||||
|
if hasattr(f, "data") and hasattr(f, "filename"):
|
||||||
|
idx = len(all_files)
|
||||||
|
all_files.append(f)
|
||||||
|
if "attachments" not in payload:
|
||||||
|
payload["attachments"] = []
|
||||||
|
payload["attachments"].append({"id": idx, "filename": f.filename})
|
||||||
|
else:
|
||||||
|
raise TypeError("files must be File objects")
|
||||||
|
if flags:
|
||||||
|
payload["flags"] = flags
|
||||||
|
|
||||||
|
if all_files:
|
||||||
|
form = aiohttp.FormData()
|
||||||
|
form.add_field(
|
||||||
|
"payload_json", json.dumps(payload), content_type="application/json"
|
||||||
|
)
|
||||||
|
for idx, f in enumerate(all_files):
|
||||||
|
form.add_field(
|
||||||
|
f"files[{idx}]",
|
||||||
|
f.data,
|
||||||
|
filename=f.filename,
|
||||||
|
content_type="application/octet-stream",
|
||||||
|
)
|
||||||
|
return await self.request(
|
||||||
|
"POST",
|
||||||
|
f"/webhooks/{webhook_id}/{token}",
|
||||||
|
payload=form,
|
||||||
|
is_json=False,
|
||||||
|
use_auth_header=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
return await self.request(
|
||||||
|
"POST",
|
||||||
|
f"/webhooks/{webhook_id}/{token}",
|
||||||
|
payload=payload,
|
||||||
|
use_auth_header=False,
|
||||||
|
)
|
||||||
|
|
||||||
async def get_user(self, user_id: "Snowflake") -> Dict[str, Any]:
|
async def get_user(self, user_id: "Snowflake") -> Dict[str, Any]:
|
||||||
"""Fetches a user object for a given user ID."""
|
"""Fetches a user object for a given user ID."""
|
||||||
return await self.request("GET", f"/users/{user_id}")
|
return await self.request("GET", f"/users/{user_id}")
|
||||||
|
@ -506,6 +506,7 @@ class InteractionCallbackData:
|
|||||||
self.content: Optional[str] = data.get("content")
|
self.content: Optional[str] = data.get("content")
|
||||||
self.embeds: Optional[List[Embed]] = (
|
self.embeds: Optional[List[Embed]] = (
|
||||||
[Embed(e) for e in data.get("embeds", [])] if data.get("embeds") else None
|
[Embed(e) for e in data.get("embeds", [])] if data.get("embeds") else None
|
||||||
|
[Embed(e) for e in data.get("embeds", [])] if data.get("embeds") else None
|
||||||
)
|
)
|
||||||
self.allowed_mentions: Optional[AllowedMentions] = (
|
self.allowed_mentions: Optional[AllowedMentions] = (
|
||||||
AllowedMentions(data["allowed_mentions"])
|
AllowedMentions(data["allowed_mentions"])
|
||||||
@ -573,5 +574,5 @@ class InteractionResponsePayload:
|
|||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"<InteractionResponsePayload type={self.type!r}>"
|
return f"<InteractionResponsePayload type={self.type!r}>"
|
||||||
|
|
||||||
def __getitem__(self, item: str) -> Any:
|
def __getitem__(self, key: str) -> Any:
|
||||||
return self.to_dict()[item]
|
return self.to_dict()[key]
|
||||||
|
@ -1412,6 +1412,57 @@ class Webhook:
|
|||||||
|
|
||||||
return cls({"id": webhook_id, "token": token, "url": url})
|
return cls({"id": webhook_id, "token": token, "url": url})
|
||||||
|
|
||||||
|
async def send(
|
||||||
|
self,
|
||||||
|
content: Optional[str] = None,
|
||||||
|
*,
|
||||||
|
username: Optional[str] = None,
|
||||||
|
avatar_url: Optional[str] = None,
|
||||||
|
tts: bool = False,
|
||||||
|
embed: Optional["Embed"] = None,
|
||||||
|
embeds: Optional[List["Embed"]] = None,
|
||||||
|
components: Optional[List["ActionRow"]] = None,
|
||||||
|
allowed_mentions: Optional[Dict[str, Any]] = None,
|
||||||
|
attachments: Optional[List[Any]] = None,
|
||||||
|
files: Optional[List[Any]] = None,
|
||||||
|
flags: Optional[int] = None,
|
||||||
|
) -> "Message":
|
||||||
|
"""Send a message using this webhook."""
|
||||||
|
|
||||||
|
if not self._client:
|
||||||
|
raise DisagreementException("Webhook is not bound to a Client")
|
||||||
|
assert self.token is not None, "Webhook token missing"
|
||||||
|
|
||||||
|
if embed and embeds:
|
||||||
|
raise ValueError("Cannot provide both embed and embeds.")
|
||||||
|
|
||||||
|
final_embeds_payload: Optional[List[Dict[str, Any]]] = None
|
||||||
|
if embed:
|
||||||
|
final_embeds_payload = [embed.to_dict()]
|
||||||
|
elif embeds:
|
||||||
|
final_embeds_payload = [e.to_dict() for e in embeds]
|
||||||
|
|
||||||
|
components_payload: Optional[List[Dict[str, Any]]] = None
|
||||||
|
if components:
|
||||||
|
components_payload = [c.to_dict() for c in components]
|
||||||
|
|
||||||
|
message_data = await self._client._http.execute_webhook(
|
||||||
|
self.id,
|
||||||
|
self.token,
|
||||||
|
content=content,
|
||||||
|
tts=tts,
|
||||||
|
embeds=final_embeds_payload,
|
||||||
|
components=components_payload,
|
||||||
|
allowed_mentions=allowed_mentions,
|
||||||
|
attachments=attachments,
|
||||||
|
files=files,
|
||||||
|
flags=flags,
|
||||||
|
username=username,
|
||||||
|
avatar_url=avatar_url,
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._client.parse_message(message_data)
|
||||||
|
|
||||||
|
|
||||||
# --- Message Components ---
|
# --- Message Components ---
|
||||||
|
|
||||||
|
@ -42,3 +42,12 @@ from disagreement.models import Webhook
|
|||||||
webhook = Webhook.from_url("https://discord.com/api/webhooks/123/token")
|
webhook = Webhook.from_url("https://discord.com/api/webhooks/123/token")
|
||||||
print(webhook.id, webhook.token)
|
print(webhook.id, webhook.token)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Send a message through a Webhook
|
||||||
|
|
||||||
|
Once you have a `Webhook` instance bound to a :class:`Client`, you can send messages using it:
|
||||||
|
|
||||||
|
```python
|
||||||
|
webhook = await client.create_webhook("123", {"name": "Bot Webhook"})
|
||||||
|
await webhook.send(content="Hello from my webhook!", username="Bot")
|
||||||
|
```
|
||||||
|
@ -149,3 +149,46 @@ def test_webhook_from_url_parses_id_and_token():
|
|||||||
assert webhook.id == "123"
|
assert webhook.id == "123"
|
||||||
assert webhook.token == "token"
|
assert webhook.token == "token"
|
||||||
assert webhook.url == url
|
assert webhook.url == url
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_execute_webhook_calls_request():
|
||||||
|
http = HTTPClient(token="t")
|
||||||
|
http.request = AsyncMock(return_value={"id": "1"})
|
||||||
|
await http.execute_webhook("1", "tok", content="hi")
|
||||||
|
http.request.assert_called_once_with(
|
||||||
|
"POST",
|
||||||
|
"/webhooks/1/tok",
|
||||||
|
payload={"content": "hi"},
|
||||||
|
use_auth_header=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_webhook_send_uses_http():
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from disagreement.client import Client
|
||||||
|
from disagreement.models import Webhook, Message
|
||||||
|
|
||||||
|
http = SimpleNamespace(
|
||||||
|
execute_webhook=AsyncMock(
|
||||||
|
return_value={
|
||||||
|
"id": "2",
|
||||||
|
"channel_id": "c",
|
||||||
|
"author": {"id": "1", "username": "u", "discriminator": "0001"},
|
||||||
|
"content": "hi",
|
||||||
|
"timestamp": "t",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
client = Client.__new__(Client)
|
||||||
|
client._http = http
|
||||||
|
client._messages = {}
|
||||||
|
client._webhooks = {}
|
||||||
|
|
||||||
|
webhook = Webhook({"id": "1", "token": "tok"}, client_instance=client)
|
||||||
|
|
||||||
|
msg = await webhook.send(content="hi")
|
||||||
|
|
||||||
|
http.execute_webhook.assert_awaited_once()
|
||||||
|
assert isinstance(msg, Message)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user