Service for managing webhook subscriptions.
Note: Limited to 3 subscriptions per Affinity instance.
Source code in affinity/services/v1_only.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508 | class WebhookService:
"""
Service for managing webhook subscriptions.
Note: Limited to 3 subscriptions per Affinity instance.
"""
def __init__(self, client: HTTPClient):
self._client = client
def list(self) -> builtins.list[WebhookSubscription]:
"""Get all webhook subscriptions."""
data = self._client.get("/webhook", v1=True)
return [WebhookSubscription.model_validate(w) for w in data.get("data", [])]
def get(self, webhook_id: WebhookId) -> WebhookSubscription:
"""Get a single webhook subscription."""
data = self._client.get(f"/webhook/{webhook_id}", v1=True)
return WebhookSubscription.model_validate(data)
def create(self, data: WebhookCreate) -> WebhookSubscription:
"""
Create a webhook subscription.
The webhook URL will receive a validation request.
"""
payload = data.model_dump(by_alias=True, mode="python", exclude_none=True)
_coerce_isoformat(payload, ("date",))
if not data.subscriptions:
payload.pop("subscriptions", None)
result = self._client.post("/webhook/subscribe", json=payload, v1=True)
return WebhookSubscription.model_validate(result)
def update(self, webhook_id: WebhookId, data: WebhookUpdate) -> WebhookSubscription:
"""Update a webhook subscription."""
payload = data.model_dump(
by_alias=True,
mode="json",
exclude_unset=True,
exclude_none=True,
)
result = self._client.put(f"/webhook/{webhook_id}", json=payload, v1=True)
return WebhookSubscription.model_validate(result)
def delete(self, webhook_id: WebhookId) -> bool:
"""Delete a webhook subscription."""
result = self._client.delete(f"/webhook/{webhook_id}", v1=True)
return bool(result.get("success", False))
|
create(data: WebhookCreate) -> WebhookSubscription
Create a webhook subscription.
The webhook URL will receive a validation request.
Source code in affinity/services/v1_only.py
479
480
481
482
483
484
485
486
487
488
489
490
491 | def create(self, data: WebhookCreate) -> WebhookSubscription:
"""
Create a webhook subscription.
The webhook URL will receive a validation request.
"""
payload = data.model_dump(by_alias=True, mode="python", exclude_none=True)
_coerce_isoformat(payload, ("date",))
if not data.subscriptions:
payload.pop("subscriptions", None)
result = self._client.post("/webhook/subscribe", json=payload, v1=True)
return WebhookSubscription.model_validate(result)
|
delete(webhook_id: WebhookId) -> bool
Delete a webhook subscription.
Source code in affinity/services/v1_only.py
| def delete(self, webhook_id: WebhookId) -> bool:
"""Delete a webhook subscription."""
result = self._client.delete(f"/webhook/{webhook_id}", v1=True)
return bool(result.get("success", False))
|
get(webhook_id: WebhookId) -> WebhookSubscription
Get a single webhook subscription.
Source code in affinity/services/v1_only.py
| def get(self, webhook_id: WebhookId) -> WebhookSubscription:
"""Get a single webhook subscription."""
data = self._client.get(f"/webhook/{webhook_id}", v1=True)
return WebhookSubscription.model_validate(data)
|
list() -> builtins.list[WebhookSubscription]
Get all webhook subscriptions.
Source code in affinity/services/v1_only.py
| def list(self) -> builtins.list[WebhookSubscription]:
"""Get all webhook subscriptions."""
data = self._client.get("/webhook", v1=True)
return [WebhookSubscription.model_validate(w) for w in data.get("data", [])]
|
update(webhook_id: WebhookId, data: WebhookUpdate) -> WebhookSubscription
Update a webhook subscription.
Source code in affinity/services/v1_only.py
493
494
495
496
497
498
499
500
501
502
503 | def update(self, webhook_id: WebhookId, data: WebhookUpdate) -> WebhookSubscription:
"""Update a webhook subscription."""
payload = data.model_dump(
by_alias=True,
mode="json",
exclude_unset=True,
exclude_none=True,
)
result = self._client.put(f"/webhook/{webhook_id}", json=payload, v1=True)
return WebhookSubscription.model_validate(result)
|