Skip to content

feat(moq-net): shared RAM LRU group cache#1898

Closed
kixelated wants to merge 1 commit into
mainfrom
claude/moq-net-cache-lru
Closed

feat(moq-net): shared RAM LRU group cache#1898
kixelated wants to merge 1 commit into
mainfrom
claude/moq-net-cache-lru

Conversation

@kixelated

Copy link
Copy Markdown
Collaborator

Summary

Adds a shared RAM LRU cache for groups, configurable at the origin, broadcast, or track level (cascading: track over broadcast over origin). It lets a track retain history beyond its latest group, bounded by a shared byte budget and a wall-clock age.

  • Shared handle. Cache is a cheap, Arc-backed clone. Everything attached with the same handle draws from one max_bytes/max_age budget; two distinct Cache instances are independent. Clone the handle to share a budget across many tracks/broadcasts/origins.
  • LRU by wall-clock last-access. The eviction key is the wall-clock instant a group was last served (read), not media timestamps or arrival order. A group is evicted when it exceeds max_age since last access, or when the shared total exceeds max_bytes (least-recently-accessed first). Uses tokio::time::Instant so tokio::time::pause() drives it deterministically in tests.
  • Default = latest group only. With no Cache attached a track keeps only its current max_sequence group; every superseded group is dropped at once. This is the floor: even with a Cache, the current max_sequence group is never handed to the cache, so a live subscriber can always grab it.
  • Byte budget, not group count. The per-handle limit is max_bytes, with max_age as the time bound.

How it works

  • A group is registered with the shared cache only once it stops being the latest (a newer group supersedes it). The cache holds a GroupProducer clone (keeping its frame buffers in RAM) and aborts it with Error::Old on eviction, so a parked reader unblocks instead of hanging. The owning track lazily tombstones aborted slots and skips them on read.
  • Cross-track eviction is safe re: lock ordering: the track holds its own state lock, then the cache's internal mutex, then GroupProducer::abort (the group's own lock). No path takes those in the reverse order.
  • A read (recv_group/get_group handing out a GroupConsumer) bumps the group's last-access time in the cache.

Public API additions (all in rs/moq-net)

  • cache::Config#[non_exhaustive], Default (empty/no-op budget: max_bytes = 0, max_age = Duration::MAX), with with_max_bytes / with_max_age setters. Fields max_bytes: u64, max_age: Duration.
  • Cache (re-exported flat) — Cache::new(Config), Cache::is_clone(&self, &Self). Cheaply cloneable Arc-backed handle.
  • TrackProducer::with_cache(self, Cache) -> Self
  • BroadcastProducer::with_cache(self, Cache) -> Self (cascades to created and dynamically-served tracks)
  • OriginProducer::with_cache(self, Cache) -> Self (cascades to create_broadcast and thence its tracks)
  • GroupProducer::cached_size(&self) -> u64 and GroupProducer::is_aborted(&self) -> bool

Breaking / behavior change: the no-cache default is now latest-group-only retention. Previously a track retained groups for a short arrival window. The wire Track shape is unchanged; Cache is a local-only policy object (nothing new on the wire). Per the Branch Targeting rules this is the kind of change that normally targets dev — see the note below.

Branch targeting note

This worktree was based on origin/main, and the change is implemented against main's moq-net model (Track / Broadcast / MAX_GROUP_AGE, tokio::time::Instant). origin/dev currently carries a divergent moq-net refactor (TrackInfo with a wire cache: Duration field, BroadcastInfo, web_async::time::Instant, TrackDynamic), so this commit does not rebase cleanly onto dev and would need reimplementing against that shape. The PR is opened against main to keep the diff clean and reviewable; please retarget or request a dev port as preferred. (CLAUDE.md: "when in doubt, target main; reviewers will redirect to dev if needed.")

Cross-package sync

The Cross-Package Sync table maps rs/moq-net wire/API changes to js/net and doc/concept. This cache is local-only policy with no wire change (the Track shape is untouched, no new framing), so js/net and doc/concept need nothing. No other rows apply.

Test plan

  • cargo test -p moq-net --lib — 358 passed
  • cargo clippy -p moq-net --all-targets — clean
  • cargo fmt -p moq-net --check — clean
  • cargo check -p moq-net --no-default-features — clean
  • cargo check -p moq-relay — builds
  • cargo check -p moq-cli — not run: pulls moq-video → a git dependency (nvidia-video-codec-sdk) blocked by this environment's egress policy. Unrelated to this change (moq-net is additive).

New tests cover: default keeps only the latest group; a Cache retains history up to max_bytes; byte pressure evicts the oldest; max_age evicts by wall-clock; accessing an old group keeps it alive; a shared handle enforces one budget across two tracks (filling one evicts from the other); the max_sequence group is never evicted even under a zero budget; and the cache cascades origin → broadcast → track (including dynamically-served tracks). Existing track tests that relied on multiple retained groups now attach a Cache to reflect the latest-only default.

🤖 Generated with Claude Code

https://claude.ai/code/session_01EviJwrDw3XZ9ZgESJGua28

(Written by Claude)


Generated by Claude Code

Add a `Cache` handle that retains old groups in RAM beyond a track's latest
one, bounded by a shared byte budget and wall-clock age. Attach it at the
origin, broadcast, or track level (cascading: track over broadcast over
origin); clone the handle to share one budget across many tracks.

Eviction is LRU by wall-clock last-access time (when a group was last served),
not by media timestamp or arrival order. A group is dropped once it exceeds
`max_age` since its last access, or once the shared total exceeds `max_bytes`
(least-recently-accessed first). A track's current max_sequence group is never
handed to the cache, so a live subscriber can always grab it.

Default behavior changes to latest-group-only: with no `Cache` attached a track
keeps only its latest group, dropping each superseded group at once. Tracks
that need history attach a `Cache`. The wire `Track` shape is unchanged; the
cache is a local-only policy object.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EviJwrDw3XZ9ZgESJGua28
@coderabbitai

coderabbitai Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Walkthrough

This pull request introduces a shared in-memory LRU Cache for GroupProducer entries in the moq-net model layer. A new cache.rs module defines Config (with max_bytes and max_age) and a Cache handle backed by a shared Arc<Mutex<State>> that performs age-first then byte-budget eviction, aborting displaced producers with Error::Old. GroupProducer gains cached_size() and is_aborted() introspection helpers. Track-level storage is restructured from a timestamped deque to a VecDeque<Option<Cached>>, replacing the old evict_expired path with a retain(now) method that registers superseded groups into the shared cache and tombstones entries aborted by it. TrackProducer gains with_cache. BroadcastProducer cascades a configured cache to both statically created and dynamically served tracks. OriginProducer gains with_cache and propagates it through create_broadcast, scope, and with_root.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(moq-net): shared RAM LRU group cache' clearly and concisely summarizes the main change—adding a shared RAM LRU cache for groups in moq-net. It is specific, relates directly to the changeset, and highlights the primary feature being added.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, detailing the shared RAM LRU cache implementation, public API additions, behavior changes, testing, and cross-package considerations. It provides sufficient context for understanding the PR's intent and impact.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch claude/moq-net-cache-lru

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/moq-net/src/model/track.rs (1)

116-120: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Touch cached groups when read_frame() returns a frame.

poll_read_frame returns data from a cached group without calling self.touch(cached), so frame-level reads do not update LRU recency while recv_group and get_group do. That lets actively read groups age out or lose byte-pressure priority incorrectly.

Proposed fix
 			let mut consumer = group.consume();
 			match consumer.poll_read_frame(waiter) {
 				Poll::Ready(Ok(Some(frame))) => {
+					self.touch(cached);
 					return Poll::Ready(Ok(Some((frame, self.offset + i, group.sequence))));
 				}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-net/src/model/track.rs` around lines 116 - 120, The
Poll::Ready(Ok(Some(frame))) match arm in the consumer.poll_read_frame call does
not update LRU recency for the cached group, causing actively read groups to
potentially age out incorrectly. After successfully polling a frame from the
group consumer, call self.touch(group) to update the LRU recency before
returning the Poll::Ready result, matching the behavior implemented in
recv_group and get_group methods.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-net/src/model/cache.rs`:
- Around line 77-79: The public `Cache` struct is missing the
`#[non_exhaustive]` attribute which is required by the repository's public API
guidelines to allow for future extensions without breaking changes. Add the
`#[non_exhaustive]` attribute above the `#[derive(Clone)]` attribute on the
`Cache` struct definition to mark it as non-exhaustive.

In `@rs/moq-net/src/model/track.rs`:
- Around line 79-84: The cache read paths in the slot access logic are updating
the recency timestamp via the touch method without first performing age-based
eviction, allowing expired entries older than max_age to be revived instead of
being properly evicted. Modify the read path checks to run age eviction logic
before calling touch on the cached entry, and only return the group and offset
when touch returns true to confirm the entry is still valid after eviction
checks have been applied. This applies to all three read path locations
mentioned (lines 79-84, 143-148, and 236-241).
- Around line 201-206: The byte accounting in the cache is measured only once
when a group is first superseded (captured via cached_size() in the block where
cached.token is assigned), but the group can continue to grow as frames are
appended through the returned GroupProducer after that point. This causes the
cache to undercount memory usage. Make the byte accounting dynamic by either
refreshing the entry size before cache eviction by querying the current
cached_size() at eviction time, or by wiring frame append operations on the
GroupProducer to update the cached token's tracked byte count dynamically as the
group grows.

---

Outside diff comments:
In `@rs/moq-net/src/model/track.rs`:
- Around line 116-120: The Poll::Ready(Ok(Some(frame))) match arm in the
consumer.poll_read_frame call does not update LRU recency for the cached group,
causing actively read groups to potentially age out incorrectly. After
successfully polling a frame from the group consumer, call self.touch(group) to
update the LRU recency before returning the Poll::Ready result, matching the
behavior implemented in recv_group and get_group methods.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 565ae45e-87ed-4b70-bee1-694b47d546d5

📥 Commits

Reviewing files that changed from the base of the PR and between 8469563 and edc3e92.

📒 Files selected for processing (6)
  • rs/moq-net/src/model/broadcast.rs
  • rs/moq-net/src/model/cache.rs
  • rs/moq-net/src/model/group.rs
  • rs/moq-net/src/model/mod.rs
  • rs/moq-net/src/model/origin.rs
  • rs/moq-net/src/model/track.rs

Comment on lines +77 to +79
#[derive(Clone)]
pub struct Cache {
state: Arc<Mutex<State>>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win

Mark the public Cache type as non-exhaustive.

Cache is a new public API type but is not marked #[non_exhaustive]. Add the attribute to match the repository public-API rule.

As per coding guidelines, "Public APIs must be marked #[non_exhaustive] to allow for future extension without breaking changes."

Proposed fix
 #[derive(Clone)]
+#[non_exhaustive]
 pub struct Cache {
 	state: Arc<Mutex<State>>,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[derive(Clone)]
pub struct Cache {
state: Arc<Mutex<State>>,
#[derive(Clone)]
#[non_exhaustive]
pub struct Cache {
state: Arc<Mutex<State>>,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-net/src/model/cache.rs` around lines 77 - 79, The public `Cache`
struct is missing the `#[non_exhaustive]` attribute which is required by the
repository's public API guidelines to allow for future extensions without
breaking changes. Add the `#[non_exhaustive]` attribute above the
`#[derive(Clone)]` attribute on the `Cache` struct definition to mark it as
non-exhaustive.

Source: Coding guidelines

Comment on lines +79 to +84
if let Some(cached) = slot
&& !cached.group.is_aborted()
&& cached.group.sequence >= min_sequence
{
return Poll::Ready(Ok(Some((group.consume(), self.offset + i))));
self.touch(cached);
return Poll::Ready(Ok(Some((cached.group.consume(), self.offset + i))));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Evict expired cache entries before refreshing their recency.

The read paths check !is_aborted() and then call touch, but touch updates last_access without first running age eviction. A group older than max_age can therefore be read and revived instead of being evicted by the configured age limit.

Proposed direction
-	fn touch(&self, cached: &Cached) {
+	fn touch(&self, cached: &Cached) -> bool {
+		let now = tokio::time::Instant::now();
+		if let Some(cache) = &self.cache {
+			cache.evict(now);
+			if cached.group.is_aborted() {
+				return false;
+			}
+			if let Some(token) = cached.token {
+				cache.touch(token, now);
+			}
+		}
-		if let (Some(cache), Some(token)) = (&self.cache, cached.token) {
-			cache.touch(token, tokio::time::Instant::now());
-		}
+		!cached.group.is_aborted()
 	}

Then only return the group/frame from each read path when touch(cached) still returns true.

Also applies to: 143-148, 236-241

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-net/src/model/track.rs` around lines 79 - 84, The cache read paths in
the slot access logic are updating the recency timestamp via the touch method
without first performing age-based eviction, allowing expired entries older than
max_age to be revived instead of being properly evicted. Modify the read path
checks to run age eviction logic before calling touch on the cached entry, and
only return the group and offset when touch returns true to confirm the entry is
still valid after eviction checks have been applied. This applies to all three
read path locations mentioned (lines 79-84, 143-148, and 236-241).

Comment on lines +201 to +206
Some(cache) => {
// Hand the group to the shared budget the first time it is superseded.
if cached.token.is_none() {
let bytes = cached.group.cached_size();
cached.token = Some(cache.insert(cached.group.clone(), bytes, now));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift

Keep cache byte accounting in sync as cached groups grow.

cached_size() is captured only when a group is first superseded. A caller can still append frames through the returned GroupProducer after that point, so the shared cache can undercount RAM and keep entries well past max_bytes.

Consider making cached byte accounting dynamic, for example by refreshing entry sizes from the stored GroupProducer before byte eviction, or by wiring frame writes to update the cache token’s byte count.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-net/src/model/track.rs` around lines 201 - 206, The byte accounting in
the cache is measured only once when a group is first superseded (captured via
cached_size() in the block where cached.token is assigned), but the group can
continue to grow as frames are appended through the returned GroupProducer after
that point. This causes the cache to undercount memory usage. Make the byte
accounting dynamic by either refreshing the entry size before cache eviction by
querying the current cached_size() at eviction time, or by wiring frame append
operations on the GroupProducer to update the cached token's tracked byte count
dynamically as the group grows.

Copy link
Copy Markdown
Collaborator Author

Superseded by #1899, which reimplements this against dev.

The maintainer decided the shared RAM LRU cache should land on dev rather than main: it carries a breaking behavior change (the local retention default becomes latest-group-only) plus new public API, and dev carries a divergent moq-net model refactor (TrackInfo/TrackDynamic/OriginDynamic, web_async::time::Instant) that this branch doesn't rebase onto cleanly. #1899 ports the same logic onto dev's shape and also folds in this PR's CI fix (the unresolved TrackProducer::with_cache rustdoc link) and the four CodeRabbit findings (#[non_exhaustive] on Cache, touch-before-age-evict ordering, read_frame LRU touch, and dynamic byte accounting for groups that grow after supersession).

Closing in favor of #1899. (Written by Claude)


Generated by Claude Code

@kixelated kixelated closed this Jun 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants