Service for managing webhook subscriptions.
Note: Limited to 3 subscriptions per Affinity instance.
Source code in affinity/services/v1_only.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367 | class WebhookService:
"""
Service for managing webhook subscriptions.
Note: Limited to 3 subscriptions per Affinity instance.
"""
def __init__(self, client: HTTPClient):
self._client = client
def list(self) -> builtins.list[WebhookSubscription]:
"""Get all webhook subscriptions."""
data = self._client.get("/webhook", v1=True)
return [WebhookSubscription.model_validate(w) for w in data.get("data", [])]
def get(self, webhook_id: WebhookId) -> WebhookSubscription:
"""Get a single webhook subscription."""
data = self._client.get(f"/webhook/{webhook_id}", v1=True)
return WebhookSubscription.model_validate(data)
def create(self, data: WebhookCreate) -> WebhookSubscription:
"""
Create a webhook subscription.
The webhook URL will receive a validation request.
"""
payload = data.model_dump(by_alias=True, mode="python", exclude_none=True)
_coerce_isoformat(payload, ("date",))
if not data.subscriptions:
payload.pop("subscriptions", None)
result = self._client.post("/webhook/subscribe", json=payload, v1=True)
return WebhookSubscription.model_validate(result)
def update(self, webhook_id: WebhookId, data: WebhookUpdate) -> WebhookSubscription:
"""Update a webhook subscription."""
payload = data.model_dump(
by_alias=True,
mode="json",
exclude_unset=True,
exclude_none=True,
)
result = self._client.put(f"/webhook/{webhook_id}", json=payload, v1=True)
return WebhookSubscription.model_validate(result)
def delete(self, webhook_id: WebhookId) -> bool:
"""Delete a webhook subscription."""
result = self._client.delete(f"/webhook/{webhook_id}", v1=True)
return bool(result.get("success", False))
|
create(data: WebhookCreate) -> WebhookSubscription
Create a webhook subscription.
The webhook URL will receive a validation request.
Source code in affinity/services/v1_only.py
338
339
340
341
342
343
344
345
346
347
348
349
350 | def create(self, data: WebhookCreate) -> WebhookSubscription:
"""
Create a webhook subscription.
The webhook URL will receive a validation request.
"""
payload = data.model_dump(by_alias=True, mode="python", exclude_none=True)
_coerce_isoformat(payload, ("date",))
if not data.subscriptions:
payload.pop("subscriptions", None)
result = self._client.post("/webhook/subscribe", json=payload, v1=True)
return WebhookSubscription.model_validate(result)
|
delete(webhook_id: WebhookId) -> bool
Delete a webhook subscription.
Source code in affinity/services/v1_only.py
| def delete(self, webhook_id: WebhookId) -> bool:
"""Delete a webhook subscription."""
result = self._client.delete(f"/webhook/{webhook_id}", v1=True)
return bool(result.get("success", False))
|
get(webhook_id: WebhookId) -> WebhookSubscription
Get a single webhook subscription.
Source code in affinity/services/v1_only.py
| def get(self, webhook_id: WebhookId) -> WebhookSubscription:
"""Get a single webhook subscription."""
data = self._client.get(f"/webhook/{webhook_id}", v1=True)
return WebhookSubscription.model_validate(data)
|
list() -> builtins.list[WebhookSubscription]
Get all webhook subscriptions.
Source code in affinity/services/v1_only.py
| def list(self) -> builtins.list[WebhookSubscription]:
"""Get all webhook subscriptions."""
data = self._client.get("/webhook", v1=True)
return [WebhookSubscription.model_validate(w) for w in data.get("data", [])]
|
update(webhook_id: WebhookId, data: WebhookUpdate) -> WebhookSubscription
Update a webhook subscription.
Source code in affinity/services/v1_only.py
352
353
354
355
356
357
358
359
360
361
362 | def update(self, webhook_id: WebhookId, data: WebhookUpdate) -> WebhookSubscription:
"""Update a webhook subscription."""
payload = data.model_dump(
by_alias=True,
mode="json",
exclude_unset=True,
exclude_none=True,
)
result = self._client.put(f"/webhook/{webhook_id}", json=payload, v1=True)
return WebhookSubscription.model_validate(result)
|