Skip to content

Tasks

Service for managing long-running operations (tasks).

Provides utilities to poll and wait for async operations like merges.

Example

Start a merge operation

task_url = client.companies.merge(primary_id, duplicate_id)

Wait for completion with timeout

task = client.tasks.wait(task_url, timeout=60.0) if task.status == "success": print("Merge completed!")

Source code in affinity/services/tasks.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class TaskService:
    """
    Service for managing long-running operations (tasks).

    Provides utilities to poll and wait for async operations like merges.

    Example:
        # Start a merge operation
        task_url = client.companies.merge(primary_id, duplicate_id)

        # Wait for completion with timeout
        task = client.tasks.wait(task_url, timeout=60.0)
        if task.status == "success":
            print("Merge completed!")
    """

    def __init__(self, client: HTTPClient):
        self._client = client

    def get(self, task_url: str) -> MergeTask:
        """
        Get the current status of a task.

        Args:
            task_url: The task URL returned from an async operation

        Returns:
            MergeTask with current status
        """
        # Extract task path from full URL if needed
        data = self._client.get_url(task_url)
        return MergeTask.model_validate(data)

    def wait(
        self,
        task_url: str,
        *,
        timeout: float = 300.0,
        poll_interval: float = 2.0,
        max_poll_interval: float = 30.0,
    ) -> MergeTask:
        """
        Wait for a task to complete with exponential backoff.

        Args:
            task_url: The task URL returned from an async operation
            timeout: Maximum time to wait in seconds (default: 5 minutes)
            poll_interval: Initial polling interval in seconds
            max_poll_interval: Maximum polling interval after backoff

        Returns:
            MergeTask with final status

        Raises:
            TimeoutError: If task doesn't complete within timeout
            AffinityError: If task fails
        """
        start_time = time.monotonic()
        current_interval = poll_interval

        while True:
            task = self.get(task_url)

            if task.status in (TaskStatus.SUCCESS, TaskStatus.FAILED):
                if task.status == TaskStatus.FAILED:
                    raise AffinityError(
                        f"Task failed: {task_url}",
                        status_code=None,
                        response_body={"task": task.model_dump()},
                    )
                return task

            # Check timeout
            elapsed = time.monotonic() - start_time
            if elapsed >= timeout:
                raise AffinityTimeoutError(f"Task did not complete within {timeout}s: {task_url}")

            # Wait with jitter before next poll
            jitter = random.uniform(0, current_interval * 0.1)
            time.sleep(current_interval + jitter)

            # Exponential backoff, capped at max
            current_interval = min(current_interval * 1.5, max_poll_interval)

get(task_url: str) -> MergeTask

Get the current status of a task.

Parameters:

Name Type Description Default
task_url str

The task URL returned from an async operation

required

Returns:

Type Description
MergeTask

MergeTask with current status

Source code in affinity/services/tasks.py
50
51
52
53
54
55
56
57
58
59
60
61
62
def get(self, task_url: str) -> MergeTask:
    """
    Get the current status of a task.

    Args:
        task_url: The task URL returned from an async operation

    Returns:
        MergeTask with current status
    """
    # Extract task path from full URL if needed
    data = self._client.get_url(task_url)
    return MergeTask.model_validate(data)

wait(task_url: str, *, timeout: float = 300.0, poll_interval: float = 2.0, max_poll_interval: float = 30.0) -> MergeTask

Wait for a task to complete with exponential backoff.

Parameters:

Name Type Description Default
task_url str

The task URL returned from an async operation

required
timeout float

Maximum time to wait in seconds (default: 5 minutes)

300.0
poll_interval float

Initial polling interval in seconds

2.0
max_poll_interval float

Maximum polling interval after backoff

30.0

Returns:

Type Description
MergeTask

MergeTask with final status

Raises:

Type Description
TimeoutError

If task doesn't complete within timeout

AffinityError

If task fails

Source code in affinity/services/tasks.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def wait(
    self,
    task_url: str,
    *,
    timeout: float = 300.0,
    poll_interval: float = 2.0,
    max_poll_interval: float = 30.0,
) -> MergeTask:
    """
    Wait for a task to complete with exponential backoff.

    Args:
        task_url: The task URL returned from an async operation
        timeout: Maximum time to wait in seconds (default: 5 minutes)
        poll_interval: Initial polling interval in seconds
        max_poll_interval: Maximum polling interval after backoff

    Returns:
        MergeTask with final status

    Raises:
        TimeoutError: If task doesn't complete within timeout
        AffinityError: If task fails
    """
    start_time = time.monotonic()
    current_interval = poll_interval

    while True:
        task = self.get(task_url)

        if task.status in (TaskStatus.SUCCESS, TaskStatus.FAILED):
            if task.status == TaskStatus.FAILED:
                raise AffinityError(
                    f"Task failed: {task_url}",
                    status_code=None,
                    response_body={"task": task.model_dump()},
                )
            return task

        # Check timeout
        elapsed = time.monotonic() - start_time
        if elapsed >= timeout:
            raise AffinityTimeoutError(f"Task did not complete within {timeout}s: {task_url}")

        # Wait with jitter before next poll
        jitter = random.uniform(0, current_interval * 0.1)
        time.sleep(current_interval + jitter)

        # Exponential backoff, capped at max
        current_interval = min(current_interval * 1.5, max_poll_interval)

Async version of TaskService.

Example

Start a merge operation

task_url = await client.companies.merge(primary_id, duplicate_id)

Wait for completion with timeout

task = await client.tasks.wait(task_url, timeout=60.0)

Source code in affinity/services/tasks.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class AsyncTaskService:
    """
    Async version of TaskService.

    Example:
        # Start a merge operation
        task_url = await client.companies.merge(primary_id, duplicate_id)

        # Wait for completion with timeout
        task = await client.tasks.wait(task_url, timeout=60.0)
    """

    def __init__(self, client: AsyncHTTPClient):
        self._client = client

    async def get(self, task_url: str) -> MergeTask:
        """
        Get the current status of a task.

        Args:
            task_url: The task URL returned from an async operation

        Returns:
            MergeTask with current status
        """
        data = await self._client.get_url(task_url)
        return MergeTask.model_validate(data)

    async def wait(
        self,
        task_url: str,
        *,
        timeout: float = 300.0,
        poll_interval: float = 2.0,
        max_poll_interval: float = 30.0,
    ) -> MergeTask:
        """
        Wait for a task to complete with exponential backoff.

        Args:
            task_url: The task URL returned from an async operation
            timeout: Maximum time to wait in seconds (default: 5 minutes)
            poll_interval: Initial polling interval in seconds
            max_poll_interval: Maximum polling interval after backoff

        Returns:
            MergeTask with final status

        Raises:
            TimeoutError: If task doesn't complete within timeout
            AffinityError: If task fails
        """
        start_time = time.monotonic()
        current_interval = poll_interval

        while True:
            task = await self.get(task_url)

            if task.status in (TaskStatus.SUCCESS, TaskStatus.FAILED):
                if task.status == TaskStatus.FAILED:
                    raise AffinityError(
                        f"Task failed: {task_url}",
                        status_code=None,
                        response_body={"task": task.model_dump()},
                    )
                return task

            # Check timeout
            elapsed = time.monotonic() - start_time
            if elapsed >= timeout:
                raise AffinityTimeoutError(f"Task did not complete within {timeout}s: {task_url}")

            # Wait with jitter before next poll
            jitter = random.uniform(0, current_interval * 0.1)
            await asyncio.sleep(current_interval + jitter)

            # Exponential backoff, capped at max
            current_interval = min(current_interval * 1.5, max_poll_interval)

get(task_url: str) -> MergeTask async

Get the current status of a task.

Parameters:

Name Type Description Default
task_url str

The task URL returned from an async operation

required

Returns:

Type Description
MergeTask

MergeTask with current status

Source code in affinity/services/tasks.py
131
132
133
134
135
136
137
138
139
140
141
142
async def get(self, task_url: str) -> MergeTask:
    """
    Get the current status of a task.

    Args:
        task_url: The task URL returned from an async operation

    Returns:
        MergeTask with current status
    """
    data = await self._client.get_url(task_url)
    return MergeTask.model_validate(data)

wait(task_url: str, *, timeout: float = 300.0, poll_interval: float = 2.0, max_poll_interval: float = 30.0) -> MergeTask async

Wait for a task to complete with exponential backoff.

Parameters:

Name Type Description Default
task_url str

The task URL returned from an async operation

required
timeout float

Maximum time to wait in seconds (default: 5 minutes)

300.0
poll_interval float

Initial polling interval in seconds

2.0
max_poll_interval float

Maximum polling interval after backoff

30.0

Returns:

Type Description
MergeTask

MergeTask with final status

Raises:

Type Description
TimeoutError

If task doesn't complete within timeout

AffinityError

If task fails

Source code in affinity/services/tasks.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
async def wait(
    self,
    task_url: str,
    *,
    timeout: float = 300.0,
    poll_interval: float = 2.0,
    max_poll_interval: float = 30.0,
) -> MergeTask:
    """
    Wait for a task to complete with exponential backoff.

    Args:
        task_url: The task URL returned from an async operation
        timeout: Maximum time to wait in seconds (default: 5 minutes)
        poll_interval: Initial polling interval in seconds
        max_poll_interval: Maximum polling interval after backoff

    Returns:
        MergeTask with final status

    Raises:
        TimeoutError: If task doesn't complete within timeout
        AffinityError: If task fails
    """
    start_time = time.monotonic()
    current_interval = poll_interval

    while True:
        task = await self.get(task_url)

        if task.status in (TaskStatus.SUCCESS, TaskStatus.FAILED):
            if task.status == TaskStatus.FAILED:
                raise AffinityError(
                    f"Task failed: {task_url}",
                    status_code=None,
                    response_body={"task": task.model_dump()},
                )
            return task

        # Check timeout
        elapsed = time.monotonic() - start_time
        if elapsed >= timeout:
            raise AffinityTimeoutError(f"Task did not complete within {timeout}s: {task_url}")

        # Wait with jitter before next poll
        jitter = random.uniform(0, current_interval * 0.1)
        await asyncio.sleep(current_interval + jitter)

        # Exponential backoff, capped at max
        current_interval = min(current_interval * 1.5, max_poll_interval)