diff --git a/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py b/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py index c6899529..1fe65426 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py @@ -234,6 +234,7 @@ class AccumulatorResult: "_result_queue", "_consumer_future", "_latest_watermark", + "_close_window", ) _future: Task @@ -242,6 +243,7 @@ class AccumulatorResult: _result_queue: NonBlockingIterator _consumer_future: Task _latest_watermark: datetime + _close_window: KeyedWindow | None @property def future(self) -> Task: @@ -310,6 +312,29 @@ def update_watermark(self, new_watermark: datetime): raise TypeError("new_watermark must be a datetime object") self._latest_watermark = new_watermark + @property + def close_window(self) -> KeyedWindow | None: + """Returns the keyed window from the CLOSE request, if the task was closed. + + Returns: + KeyedWindow | None: The window carried by the CLOSE request, echoed back in + the EOF response; None if the task has not received a CLOSE. + """ + return self._close_window + + def set_close_window(self, window: KeyedWindow): + """Stashes the CLOSE request's keyed window so the EOF response can echo it. + + Args: + window (KeyedWindow): The keyed window from the CLOSE request. + + Raises: + TypeError: If window is not a KeyedWindow object. + """ + if not isinstance(window, KeyedWindow): + raise TypeError("window must be a KeyedWindow object") + self._close_window = window + @dataclass class AccumulatorRequest: diff --git a/packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py index 886f0e66..0668a938 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py @@ -1,5 +1,6 @@ import asyncio from collections.abc import AsyncIterable +from datetime import datetime from google.protobuf import empty_pb2 as _empty_pb2 @@ -16,6 +17,25 @@ from pynumaflow.shared.server import update_context_err from pynumaflow.types import NumaflowServicerContext +# protobuf Timestamp.ToDatetime() is only valid for years 1..9999, i.e. seconds in +# [-62135596800, 253402300799]. Accumulator windows are global per key, so core sends the +# window end as an "infinite" sentinel (chrono DateTime::::MAX_UTC, ~year 262137) whose +# seconds exceed that range and make ToDatetime() raise. Clamp out-of-range timestamps to the +# representable datetime bounds instead of crashing the UDF. +_TIMESTAMP_MAX_SECONDS = 253402300799 +_TIMESTAMP_MIN_SECONDS = -62135596800 + + +def _to_datetime(ts) -> datetime: + """Convert a protobuf Timestamp to a datetime, clamping values outside the representable + range (year 1..9999) to datetime.max / datetime.min so an infinite window bound does not + crash decoding.""" + if ts.seconds > _TIMESTAMP_MAX_SECONDS: + return datetime.max + if ts.seconds < _TIMESTAMP_MIN_SECONDS: + return datetime.min + return ts.ToDatetime() + async def datum_generator( request_iterator: AsyncIterable[accumulator_pb2.AccumulatorRequest], @@ -24,8 +44,8 @@ async def datum_generator( async for d in request_iterator: # Convert protobuf KeyedWindow to our KeyedWindow dataclass keyed_window = KeyedWindow( - start=d.operation.keyedWindow.start.ToDatetime(), - end=d.operation.keyedWindow.end.ToDatetime(), + start=_to_datetime(d.operation.keyedWindow.start), + end=_to_datetime(d.operation.keyedWindow.end), slot=d.operation.keyedWindow.slot, keys=list(d.operation.keyedWindow.keys), ) diff --git a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py index ee14e3ea..8f00f474 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py +++ b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py @@ -1,6 +1,6 @@ import asyncio from collections.abc import AsyncIterable -from datetime import datetime +from datetime import datetime, timezone from google.protobuf import timestamp_pb2 from pynumaflow._constants import ( @@ -112,6 +112,10 @@ async def close_task(self, req: AccumulatorRequest): curr_task = self.tasks.get(unified_key, None) if curr_task: + # Stash the CLOSE request's keyed window BEFORE signalling EOF so the task's + # consumer (write_to_global_queue) can echo it back in the EOF response. Core + # uses the echoed window to identify and garbage-collect the closed window. + curr_task.set_close_window(req.keyed_window) await self.tasks[unified_key].iterator.put(STREAM_EOF) await curr_task.future await curr_task.consumer_future @@ -167,7 +171,7 @@ async def create_task(self, req: AccumulatorRequest): # Create a new AccumulatorResult object to store the task information curr_task = AccumulatorResult( - task, niter, keys, res_queue, consumer, datetime.fromtimestamp(-1) + task, niter, keys, res_queue, consumer, datetime.fromtimestamp(-1), None ) # Save the result of the accumulator operation to the task list @@ -318,7 +322,7 @@ async def write_to_global_queue( watermark_pb.FromDatetime(msg.watermark) start_dt_pb = timestamp_pb2.Timestamp() - start_dt_pb.FromDatetime(datetime.fromtimestamp(0)) + start_dt_pb.FromDatetime(datetime.fromtimestamp(0, tz=timezone.utc)) end_dt_pb = timestamp_pb2.Timestamp() end_dt_pb.FromDatetime(wm) @@ -339,17 +343,39 @@ async def write_to_global_queue( tags=msg.tags, ) await output_queue.put(res) - # send EOF - start_eof_pb = timestamp_pb2.Timestamp() - start_eof_pb.FromDatetime(datetime.fromtimestamp(0)) + # Send EOF. Echo the CLOSE request's keyed window (start/end/slot/keys) so core can + # match the EOF to the window it is closing and garbage-collect it. Mirrors the + # numaflow-rs accumulator behavior (PR #177). + close_window = task.close_window + if close_window is not None: + start_eof_pb = timestamp_pb2.Timestamp() + start_eof_pb.FromDatetime(close_window.start) + + end_eof_pb = timestamp_pb2.Timestamp() + end_eof_pb.FromDatetime(close_window.end) + + eof_window = accumulator_pb2.KeyedWindow( + start=start_eof_pb, + end=end_eof_pb, + slot=close_window.slot, + keys=close_window.keys, + ) + else: + # Fallback for the stream-close/shutdown path (no CLOSE request, e.g. + # stream_send_eof on SIGTERM): synthesize the window from epoch(0) and the + # latest watermark, preserving prior behavior. + start_eof_pb = timestamp_pb2.Timestamp() + start_eof_pb.FromDatetime(datetime.fromtimestamp(0, tz=timezone.utc)) - end_eof_pb = timestamp_pb2.Timestamp() - end_eof_pb.FromDatetime(wm) + end_eof_pb = timestamp_pb2.Timestamp() + end_eof_pb.FromDatetime(wm) - res = accumulator_pb2.AccumulatorResponse( - window=accumulator_pb2.KeyedWindow( + eof_window = accumulator_pb2.KeyedWindow( start=start_eof_pb, end=end_eof_pb, slot="slot-0", keys=task.keys - ), + ) + + res = accumulator_pb2.AccumulatorResponse( + window=eof_window, EOF=True, ) await output_queue.put(res) diff --git a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py index eb499cdf..c5a7229b 100644 --- a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py +++ b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py @@ -4,6 +4,7 @@ import grpc import pytest from google.protobuf import empty_pb2 as _empty_pb2 +from google.protobuf import timestamp_pb2 from pynumaflow import setup_logging from pynumaflow.accumulator import ( @@ -87,6 +88,69 @@ def request_generator_mixed(count, request, resetkey: bool = False): yield request +# Distinct, recognizable close-window values used to prove the EOF echoes the +# CLOSE request's window rather than the synthesized fallback window. +CLOSE_WINDOW_START_SECONDS = 1000000000 +CLOSE_WINDOW_END_SECONDS = 2000000000 +CLOSE_WINDOW_SLOT = "slot-7" + +# The accumulator's global window carries an "infinite" end (chrono MAX_UTC, ~year 262137) +# whose seconds exceed Python datetime's range. Core sends this on OPEN/APPEND. +GLOBAL_WINDOW_END_SECONDS = 8210266876799 + + +def request_generator_custom_close(count, request): + """Yields OPEN + APPEND requests, then a CLOSE whose keyed window carries + distinct start/end/slot values (mirroring core sending a real + max_event_time + timeout window on close).""" + for i in range(count): + if i == 0: + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.OPEN + else: + request.operation.event = ( + accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND + ) + yield request + + # CLOSE carrying a distinct keyed window to be echoed back in the EOF response. + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.CLOSE + request.operation.keyedWindow.start.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_START_SECONDS) + ) + request.operation.keyedWindow.end.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_END_SECONDS) + ) + request.operation.keyedWindow.slot = CLOSE_WINDOW_SLOT + yield request + + +def request_generator_infinite_then_close(count, request): + """OPEN + APPEND requests whose window end is the global 'infinite' sentinel + (chrono MAX_UTC, out of Python datetime range), then a CLOSE carrying a concrete, + representable window. Mirrors what real core sends to an accumulator.""" + request.operation.keyedWindow.end.CopyFrom( + timestamp_pb2.Timestamp(seconds=GLOBAL_WINDOW_END_SECONDS) + ) + for i in range(count): + if i == 0: + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.OPEN + else: + request.operation.event = ( + accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND + ) + yield request + + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.CLOSE + request.operation.keyedWindow.start.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_START_SECONDS) + ) + request.operation.keyedWindow.end.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_END_SECONDS) + ) + request.operation.keyedWindow.slot = CLOSE_WINDOW_SLOT + yield request + + def start_request() -> accumulator_pb2.AccumulatorRequest: event_time_timestamp, watermark_timestamp = get_time_args() window = accumulator_pb2.KeyedWindow( @@ -289,6 +353,80 @@ def test_accumulate_with_close(accumulator_stub) -> None: assert 1 == eof_count +def test_accumulate_close_echoes_eof_window(accumulator_stub) -> None: + """The EOF response must echo the exact KeyedWindow from the CLOSE request.""" + request = start_request() + generator_response = None + try: + generator_response = accumulator_stub.AccumulateFn( + request_iterator=request_generator_custom_close(count=5, request=request) + ) + except grpc.RpcError as e: + logging.error(e) + + eof_count = 0 + for r in generator_response: + if r.EOF: + eof_count += 1 + assert r.window.start.seconds == CLOSE_WINDOW_START_SECONDS + assert r.window.end.seconds == CLOSE_WINDOW_END_SECONDS + assert r.window.slot == CLOSE_WINDOW_SLOT + assert list(r.window.keys) == ["test_key"] + + assert 1 == eof_count + + +def test_accumulate_infinite_window_end_does_not_crash(accumulator_stub) -> None: + """A global window with an 'infinite' end (out of Python datetime range) on OPEN/APPEND + must not crash decoding; the stream completes and the EOF echoes the CLOSE window.""" + request = start_request() + generator_response = None + try: + generator_response = accumulator_stub.AccumulateFn( + request_iterator=request_generator_infinite_then_close(count=5, request=request) + ) + except grpc.RpcError as e: + logging.error(e) + + count = 0 + eof_count = 0 + for r in generator_response: + if r.EOF: + eof_count += 1 + assert r.window.start.seconds == CLOSE_WINDOW_START_SECONDS + assert r.window.end.seconds == CLOSE_WINDOW_END_SECONDS + assert r.window.slot == CLOSE_WINDOW_SLOT + elif r.payload.value: + count += 1 + + # All 5 datums were processed and exactly one EOF was emitted (no crash). + assert 5 == count + assert 1 == eof_count + + +def test_accumulate_eof_window_fallback_without_close(accumulator_stub) -> None: + """When the stream closes without a CLOSE (e.g. shutdown), the EOF window falls + back to the synthesized window (start=epoch(0), slot='slot-0').""" + request = start_request() + generator_response = None + try: + generator_response = accumulator_stub.AccumulateFn( + request_iterator=request_generator(count=5, request=request, send_close=False) + ) + except grpc.RpcError as e: + logging.error(e) + + eof_count = 0 + for r in generator_response: + if r.EOF: + eof_count += 1 + assert r.window.start.seconds == 0 + assert r.window.slot == "slot-0" + assert list(r.window.keys) == ["test_key"] + + assert 1 == eof_count + + def test_accumulate_append_without_open(accumulator_stub) -> None: request = start_request_without_open() generator_response = None diff --git a/packages/pynumaflow/tests/accumulator/test_datatypes.py b/packages/pynumaflow/tests/accumulator/test_datatypes.py index 0f829024..5561c1e1 100644 --- a/packages/pynumaflow/tests/accumulator/test_datatypes.py +++ b/packages/pynumaflow/tests/accumulator/test_datatypes.py @@ -149,7 +149,9 @@ def test_accumulator_result_create(): consumer_future = None # In real usage, this would be an asyncio.Task watermark = datetime.fromtimestamp(1662998400, timezone.utc) - result = AccumulatorResult(future, iterator, keys, result_queue, consumer_future, watermark) + result = AccumulatorResult( + future, iterator, keys, result_queue, consumer_future, watermark, None + ) assert result.future == future assert result.iterator == iterator @@ -161,7 +163,7 @@ def test_accumulator_result_create(): def test_accumulator_result_update_watermark(): result = AccumulatorResult( - None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc) + None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc), None ) new_watermark = datetime.fromtimestamp(1662998460, timezone.utc) result.update_watermark(new_watermark) @@ -170,7 +172,7 @@ def test_accumulator_result_update_watermark(): def test_accumulator_result_update_watermark_invalid_type(): result = AccumulatorResult( - None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc) + None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc), None ) with pytest.raises(TypeError): result.update_watermark("not a datetime")