feat: add worker-side task batching#634
Conversation
Allow grouping same-type tasks into batches that are executed together,
reducing per-call overhead for tasks like DB writes, external API calls,
ML inference, event publishing and indexing.
Usage:
@broker.task(batch=True, batch_size=100, batch_timeout=3)
async def process_items(items: list[Item]) -> None:
...
await process_items.kiq(item) # one item per kiq
Each .kiq sends a single item; the worker buffers messages per task name
and invokes the function once with the collected list. A batch is flushed
when batch_size is reached or batch_timeout seconds pass since the first
buffered item (whichever comes first), and on graceful shutdown.
Details:
- AsyncBatchedTaskiqDecoratedTask types .kiq to accept a single element
while the function body receives list[item].
- Batch config is validated at registration (TaskiqBatchConfigError).
- The whole batch shares one result, stored under every task_id, and every
message is acked per the configured AcknowledgeType. pre_execute,
post_execute, post_save and on_error middlewares run per message.
- InMemoryBroker supports batching too (flush on size, on wait_all, or
immediately when await_inplace=True) for testing.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #634 +/- ##
==========================================
+ Coverage 79.79% 81.14% +1.34%
==========================================
Files 69 70 +1
Lines 2529 2736 +207
==========================================
+ Hits 2018 2220 +202
- Misses 511 516 +5 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
s3rius
left a comment
There was a problem hiding this comment.
That's a good intention. And I like the implementation. However, there are few things about it.
First, I managed to find a way to make worker hang completely with this script:
Script to hang
import asyncio
from taskiq import InMemoryBroker
broker = InMemoryBroker()
@broker.task(batch=True, batch_size=60, batch_timeout=3, task_name="tfunc")
async def tfunc(test: list[int]) -> bool:
print(len(test)) # noqa: T201
return True
async def main() -> None:
await broker.startup()
# Each `.kiq` sends a single item. They are buffered and run
# together once the batch is flushed.
tasks = await asyncio.gather(*[tfunc.kiq(i) for i in range(400)])
# In tests, `wait_all` flushes any pending batches and waits
# for them to finish before we read the results.
await broker.wait_all()
res = await asyncio.gather(*[task.wait_result(timeout=10) for task in tasks])
len(res)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())Also, this implementation does not support:
- More than one argument.
- Sync functions.
I also left some comments.
| :param item: one element that becomes a member of the batched list. | ||
| :returns: taskiq task for this individual message. | ||
| """ | ||
| kicker = cast( |
| batch_timeout: float | None = None, | ||
| **labels: Any, | ||
| ) -> Callable[ | ||
| [Callable[[list[_Item]], Awaitable[_ReturnType]]], |
There was a problem hiding this comment.
This definition ignores possible use of sync functions. Is it by design?
| *, | ||
| batch: Literal[True], | ||
| batch_size: int | None = None, | ||
| batch_timeout: float | None = None, |
| ) -> Callable[ | ||
| [Callable[[list[_Item]], Awaitable[_ReturnType]]], | ||
| AsyncBatchedTaskiqDecoratedTask[_Item, ..., _ReturnType], | ||
| ]: # pragma: no cover |
There was a problem hiding this comment.
I don't think that we need this decorator class. Since it's used only for type annotations, I see that there is a more straightforward way for deconstructing ParamSpec.
You can leverage the power of Concatenate and actually deconstruct ParamSpec to later reconstruct a desired variant.
| ) -> Callable[ | |
| [Callable[[list[_Item]], Awaitable[_ReturnType]]], | |
| AsyncBatchedTaskiqDecoratedTask[_Item, ..., _ReturnType], | |
| ]: # pragma: no cover | |
| ) -> Callable[ | |
| [Callable[Concatenate[list[_Item], _FuncParams], Awaitable[_ReturnType]]], | |
| AsyncTaskiqDecoratedTask[Concatenate[_Item, _FuncParams], _ReturnType], | |
| ]: # pragma: no cover | |
This way, we ensure that first argument of the function is actually a list of items by deconstructing its type signature. It's kinda like pattern matching on types.
And after that we reconstruct a desired form of ParamSpec by concatenating the rest of params with a first argument.
However, this implementation allows for more than one argument. Which is might be not as preferable. Because it would complicate the batcher's logic a bit. But might be worth looking into.
If you want to support your original single item type, it can be easily done with the following signature:
| ) -> Callable[ | |
| [Callable[[list[_Item]], Awaitable[_ReturnType]]], | |
| AsyncBatchedTaskiqDecoratedTask[_Item, ..., _ReturnType], | |
| ]: # pragma: no cover | |
| ) -> Callable[ | |
| [Callable[[list[_Item]], Awaitable[_ReturnType]]], | |
| AsyncTaskiqDecoratedTask[[_Item], _ReturnType], | |
| ]: # pragma: no cover |
| return_type = sign["return"] | ||
|
|
||
| decorator_cls = self.decorator_class | ||
| if inner_labels.get("batch"): |
There was a problem hiding this comment.
No need for custom decorator class as shown above.
Allow grouping same-type tasks into batches that are executed together, reducing per-call overhead for tasks like DB writes, external API calls, ML inference, event publishing and indexing.
Usage:
Each .kiq sends a single item; the worker buffers messages per task name and invokes the function once with the collected list. A batch is flushed when batch_size is reached or batch_timeout seconds pass since the first buffered item (whichever comes first), and on graceful shutdown.
Details: