Skip to content

feat: add worker-side task batching#634

Open
KatantDev wants to merge 4 commits into
taskiq-python:masterfrom
KatantDev:master
Open

feat: add worker-side task batching#634
KatantDev wants to merge 4 commits into
taskiq-python:masterfrom
KatantDev:master

Conversation

@KatantDev

Copy link
Copy Markdown

Allow grouping same-type tasks into batches that are executed together, reducing per-call overhead for tasks like DB writes, external API calls, ML inference, event publishing and indexing.

Usage:

@broker.task(batch=True, batch_size=100, batch_timeout=3)
async def process_items(items: list[Item]) -> None:
    ...

await process_items.kiq(item)  # one item per kiq

Each .kiq sends a single item; the worker buffers messages per task name and invokes the function once with the collected list. A batch is flushed when batch_size is reached or batch_timeout seconds pass since the first buffered item (whichever comes first), and on graceful shutdown.

Details:

  • AsyncBatchedTaskiqDecoratedTask types .kiq to accept a single element while the function body receives list[item].
  • Batch config is validated at registration (TaskiqBatchConfigError).
  • The whole batch shares one result, stored under every task_id, and every message is acked per the configured AcknowledgeType. pre_execute, post_execute, post_save and on_error middlewares run per message.
  • InMemoryBroker supports batching too (flush on size, on wait_all, or immediately when await_inplace=True) for testing.

Allow grouping same-type tasks into batches that are executed together,
reducing per-call overhead for tasks like DB writes, external API calls,
ML inference, event publishing and indexing.

Usage:

    @broker.task(batch=True, batch_size=100, batch_timeout=3)
    async def process_items(items: list[Item]) -> None:
        ...

    await process_items.kiq(item)  # one item per kiq

Each .kiq sends a single item; the worker buffers messages per task name
and invokes the function once with the collected list. A batch is flushed
when batch_size is reached or batch_timeout seconds pass since the first
buffered item (whichever comes first), and on graceful shutdown.

Details:
- AsyncBatchedTaskiqDecoratedTask types .kiq to accept a single element
  while the function body receives list[item].
- Batch config is validated at registration (TaskiqBatchConfigError).
- The whole batch shares one result, stored under every task_id, and every
  message is acked per the configured AcknowledgeType. pre_execute,
  post_execute, post_save and on_error middlewares run per message.
- InMemoryBroker supports batching too (flush on size, on wait_all, or
  immediately when await_inplace=True) for testing.
@codecov

codecov Bot commented Jun 16, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 93.41564% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.14%. Comparing base (9f8db96) to head (61c0c7a).

Files with missing lines Patch % Lines
taskiq/receiver/receiver.py 89.51% 15 Missing ⚠️
taskiq/receiver/batcher.py 98.43% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #634      +/-   ##
==========================================
+ Coverage   79.79%   81.14%   +1.34%     
==========================================
  Files          69       70       +1     
  Lines        2529     2736     +207     
==========================================
+ Hits         2018     2220     +202     
- Misses        511      516       +5     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@s3rius s3rius left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good intention. And I like the implementation. However, there are few things about it.

First, I managed to find a way to make worker hang completely with this script:

Script to hang
import asyncio

from taskiq import InMemoryBroker

broker = InMemoryBroker()


@broker.task(batch=True, batch_size=60, batch_timeout=3, task_name="tfunc")
async def tfunc(test: list[int]) -> bool:
    print(len(test))  # noqa: T201
    return True


async def main() -> None:
    await broker.startup()
    # Each `.kiq` sends a single item. They are buffered and run
    # together once the batch is flushed.
    tasks = await asyncio.gather(*[tfunc.kiq(i) for i in range(400)])
    # In tests, `wait_all` flushes any pending batches and waits
    # for them to finish before we read the results.
    await broker.wait_all()
    res = await asyncio.gather(*[task.wait_result(timeout=10) for task in tasks])
    len(res)
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Also, this implementation does not support:

  • More than one argument.
  • Sync functions.

I also left some comments.

Comment thread taskiq/decor.py
:param item: one element that becomes a member of the batched list.
:returns: taskiq task for this individual message.
"""
kicker = cast(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you need this cast?

Comment thread taskiq/abc/broker.py
batch_timeout: float | None = None,
**labels: Any,
) -> Callable[
[Callable[[list[_Item]], Awaitable[_ReturnType]]],

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definition ignores possible use of sync functions. Is it by design?

Comment thread taskiq/abc/broker.py
*,
batch: Literal[True],
batch_size: int | None = None,
batch_timeout: float | None = None,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot task_name here.

Comment thread taskiq/abc/broker.py
Comment on lines +288 to +291
) -> Callable[
[Callable[[list[_Item]], Awaitable[_ReturnType]]],
AsyncBatchedTaskiqDecoratedTask[_Item, ..., _ReturnType],
]: # pragma: no cover

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we need this decorator class. Since it's used only for type annotations, I see that there is a more straightforward way for deconstructing ParamSpec.

You can leverage the power of Concatenate and actually deconstruct ParamSpec to later reconstruct a desired variant.

Suggested change
) -> Callable[
[Callable[[list[_Item]], Awaitable[_ReturnType]]],
AsyncBatchedTaskiqDecoratedTask[_Item, ..., _ReturnType],
]: # pragma: no cover
) -> Callable[
[Callable[Concatenate[list[_Item], _FuncParams], Awaitable[_ReturnType]]],
AsyncTaskiqDecoratedTask[Concatenate[_Item, _FuncParams], _ReturnType],
]: # pragma: no cover

This way, we ensure that first argument of the function is actually a list of items by deconstructing its type signature. It's kinda like pattern matching on types.
And after that we reconstruct a desired form of ParamSpec by concatenating the rest of params with a first argument.

However, this implementation allows for more than one argument. Which is might be not as preferable. Because it would complicate the batcher's logic a bit. But might be worth looking into.

If you want to support your original single item type, it can be easily done with the following signature:

Suggested change
) -> Callable[
[Callable[[list[_Item]], Awaitable[_ReturnType]]],
AsyncBatchedTaskiqDecoratedTask[_Item, ..., _ReturnType],
]: # pragma: no cover
) -> Callable[
[Callable[[list[_Item]], Awaitable[_ReturnType]]],
AsyncTaskiqDecoratedTask[[_Item], _ReturnType],
]: # pragma: no cover

Comment thread taskiq/abc/broker.py
return_type = sign["return"]

decorator_cls = self.decorator_class
if inner_labels.get("batch"):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for custom decorator class as shown above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants