feat(moq-net): shared RAM LRU group cache#1898
Conversation
Add a `Cache` handle that retains old groups in RAM beyond a track's latest one, bounded by a shared byte budget and wall-clock age. Attach it at the origin, broadcast, or track level (cascading: track over broadcast over origin); clone the handle to share one budget across many tracks. Eviction is LRU by wall-clock last-access time (when a group was last served), not by media timestamp or arrival order. A group is dropped once it exceeds `max_age` since its last access, or once the shared total exceeds `max_bytes` (least-recently-accessed first). A track's current max_sequence group is never handed to the cache, so a live subscriber can always grab it. Default behavior changes to latest-group-only: with no `Cache` attached a track keeps only its latest group, dropping each superseded group at once. Tracks that need history attach a `Cache`. The wire `Track` shape is unchanged; the cache is a local-only policy object. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01EviJwrDw3XZ9ZgESJGua28
WalkthroughThis pull request introduces a shared in-memory LRU 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-net/src/model/track.rs (1)
116-120: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick winTouch cached groups when
read_frame()returns a frame.
poll_read_framereturns data from a cached group without callingself.touch(cached), so frame-level reads do not update LRU recency whilerecv_groupandget_groupdo. That lets actively read groups age out or lose byte-pressure priority incorrectly.Proposed fix
let mut consumer = group.consume(); match consumer.poll_read_frame(waiter) { Poll::Ready(Ok(Some(frame))) => { + self.touch(cached); return Poll::Ready(Ok(Some((frame, self.offset + i, group.sequence)))); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-net/src/model/track.rs` around lines 116 - 120, The Poll::Ready(Ok(Some(frame))) match arm in the consumer.poll_read_frame call does not update LRU recency for the cached group, causing actively read groups to potentially age out incorrectly. After successfully polling a frame from the group consumer, call self.touch(group) to update the LRU recency before returning the Poll::Ready result, matching the behavior implemented in recv_group and get_group methods.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-net/src/model/cache.rs`:
- Around line 77-79: The public `Cache` struct is missing the
`#[non_exhaustive]` attribute which is required by the repository's public API
guidelines to allow for future extensions without breaking changes. Add the
`#[non_exhaustive]` attribute above the `#[derive(Clone)]` attribute on the
`Cache` struct definition to mark it as non-exhaustive.
In `@rs/moq-net/src/model/track.rs`:
- Around line 79-84: The cache read paths in the slot access logic are updating
the recency timestamp via the touch method without first performing age-based
eviction, allowing expired entries older than max_age to be revived instead of
being properly evicted. Modify the read path checks to run age eviction logic
before calling touch on the cached entry, and only return the group and offset
when touch returns true to confirm the entry is still valid after eviction
checks have been applied. This applies to all three read path locations
mentioned (lines 79-84, 143-148, and 236-241).
- Around line 201-206: The byte accounting in the cache is measured only once
when a group is first superseded (captured via cached_size() in the block where
cached.token is assigned), but the group can continue to grow as frames are
appended through the returned GroupProducer after that point. This causes the
cache to undercount memory usage. Make the byte accounting dynamic by either
refreshing the entry size before cache eviction by querying the current
cached_size() at eviction time, or by wiring frame append operations on the
GroupProducer to update the cached token's tracked byte count dynamically as the
group grows.
---
Outside diff comments:
In `@rs/moq-net/src/model/track.rs`:
- Around line 116-120: The Poll::Ready(Ok(Some(frame))) match arm in the
consumer.poll_read_frame call does not update LRU recency for the cached group,
causing actively read groups to potentially age out incorrectly. After
successfully polling a frame from the group consumer, call self.touch(group) to
update the LRU recency before returning the Poll::Ready result, matching the
behavior implemented in recv_group and get_group methods.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 565ae45e-87ed-4b70-bee1-694b47d546d5
📒 Files selected for processing (6)
rs/moq-net/src/model/broadcast.rsrs/moq-net/src/model/cache.rsrs/moq-net/src/model/group.rsrs/moq-net/src/model/mod.rsrs/moq-net/src/model/origin.rsrs/moq-net/src/model/track.rs
| #[derive(Clone)] | ||
| pub struct Cache { | ||
| state: Arc<Mutex<State>>, |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win
Mark the public Cache type as non-exhaustive.
Cache is a new public API type but is not marked #[non_exhaustive]. Add the attribute to match the repository public-API rule.
As per coding guidelines, "Public APIs must be marked #[non_exhaustive] to allow for future extension without breaking changes."
Proposed fix
#[derive(Clone)]
+#[non_exhaustive]
pub struct Cache {
state: Arc<Mutex<State>>,
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[derive(Clone)] | |
| pub struct Cache { | |
| state: Arc<Mutex<State>>, | |
| #[derive(Clone)] | |
| #[non_exhaustive] | |
| pub struct Cache { | |
| state: Arc<Mutex<State>>, |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-net/src/model/cache.rs` around lines 77 - 79, The public `Cache`
struct is missing the `#[non_exhaustive]` attribute which is required by the
repository's public API guidelines to allow for future extensions without
breaking changes. Add the `#[non_exhaustive]` attribute above the
`#[derive(Clone)]` attribute on the `Cache` struct definition to mark it as
non-exhaustive.
Source: Coding guidelines
| if let Some(cached) = slot | ||
| && !cached.group.is_aborted() | ||
| && cached.group.sequence >= min_sequence | ||
| { | ||
| return Poll::Ready(Ok(Some((group.consume(), self.offset + i)))); | ||
| self.touch(cached); | ||
| return Poll::Ready(Ok(Some((cached.group.consume(), self.offset + i)))); |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Evict expired cache entries before refreshing their recency.
The read paths check !is_aborted() and then call touch, but touch updates last_access without first running age eviction. A group older than max_age can therefore be read and revived instead of being evicted by the configured age limit.
Proposed direction
- fn touch(&self, cached: &Cached) {
+ fn touch(&self, cached: &Cached) -> bool {
+ let now = tokio::time::Instant::now();
+ if let Some(cache) = &self.cache {
+ cache.evict(now);
+ if cached.group.is_aborted() {
+ return false;
+ }
+ if let Some(token) = cached.token {
+ cache.touch(token, now);
+ }
+ }
- if let (Some(cache), Some(token)) = (&self.cache, cached.token) {
- cache.touch(token, tokio::time::Instant::now());
- }
+ !cached.group.is_aborted()
}Then only return the group/frame from each read path when touch(cached) still returns true.
Also applies to: 143-148, 236-241
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-net/src/model/track.rs` around lines 79 - 84, The cache read paths in
the slot access logic are updating the recency timestamp via the touch method
without first performing age-based eviction, allowing expired entries older than
max_age to be revived instead of being properly evicted. Modify the read path
checks to run age eviction logic before calling touch on the cached entry, and
only return the group and offset when touch returns true to confirm the entry is
still valid after eviction checks have been applied. This applies to all three
read path locations mentioned (lines 79-84, 143-148, and 236-241).
| Some(cache) => { | ||
| // Hand the group to the shared budget the first time it is superseded. | ||
| if cached.token.is_none() { | ||
| let bytes = cached.group.cached_size(); | ||
| cached.token = Some(cache.insert(cached.group.clone(), bytes, now)); | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift
Keep cache byte accounting in sync as cached groups grow.
cached_size() is captured only when a group is first superseded. A caller can still append frames through the returned GroupProducer after that point, so the shared cache can undercount RAM and keep entries well past max_bytes.
Consider making cached byte accounting dynamic, for example by refreshing entry sizes from the stored GroupProducer before byte eviction, or by wiring frame writes to update the cache token’s byte count.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-net/src/model/track.rs` around lines 201 - 206, The byte accounting in
the cache is measured only once when a group is first superseded (captured via
cached_size() in the block where cached.token is assigned), but the group can
continue to grow as frames are appended through the returned GroupProducer after
that point. This causes the cache to undercount memory usage. Make the byte
accounting dynamic by either refreshing the entry size before cache eviction by
querying the current cached_size() at eviction time, or by wiring frame append
operations on the GroupProducer to update the cached token's tracked byte count
dynamically as the group grows.
|
Superseded by #1899, which reimplements this against The maintainer decided the shared RAM LRU cache should land on Closing in favor of #1899. (Written by Claude) Generated by Claude Code |
Summary
Adds a shared RAM LRU cache for groups, configurable at the origin, broadcast, or track level (cascading: track over broadcast over origin). It lets a track retain history beyond its latest group, bounded by a shared byte budget and a wall-clock age.
Cacheis a cheap,Arc-backed clone. Everything attached with the same handle draws from onemax_bytes/max_agebudget; two distinctCacheinstances are independent. Clone the handle to share a budget across many tracks/broadcasts/origins.max_agesince last access, or when the shared total exceedsmax_bytes(least-recently-accessed first). Usestokio::time::Instantsotokio::time::pause()drives it deterministically in tests.Cacheattached a track keeps only its currentmax_sequencegroup; every superseded group is dropped at once. This is the floor: even with aCache, the current max_sequence group is never handed to the cache, so a live subscriber can always grab it.max_bytes, withmax_ageas the time bound.How it works
GroupProducerclone (keeping its frame buffers in RAM) and aborts it withError::Oldon eviction, so a parked reader unblocks instead of hanging. The owning track lazily tombstones aborted slots and skips them on read.GroupProducer::abort(the group's own lock). No path takes those in the reverse order.recv_group/get_grouphanding out aGroupConsumer) bumps the group's last-access time in the cache.Public API additions (all in
rs/moq-net)cache::Config—#[non_exhaustive],Default(empty/no-op budget:max_bytes = 0,max_age = Duration::MAX), withwith_max_bytes/with_max_agesetters. Fieldsmax_bytes: u64,max_age: Duration.Cache(re-exported flat) —Cache::new(Config),Cache::is_clone(&self, &Self). Cheaply cloneableArc-backed handle.TrackProducer::with_cache(self, Cache) -> SelfBroadcastProducer::with_cache(self, Cache) -> Self(cascades to created and dynamically-served tracks)OriginProducer::with_cache(self, Cache) -> Self(cascades tocreate_broadcastand thence its tracks)GroupProducer::cached_size(&self) -> u64andGroupProducer::is_aborted(&self) -> boolBreaking / behavior change: the no-cache default is now latest-group-only retention. Previously a track retained groups for a short arrival window. The wire
Trackshape is unchanged;Cacheis a local-only policy object (nothing new on the wire). Per the Branch Targeting rules this is the kind of change that normally targetsdev— see the note below.Branch targeting note
This worktree was based on
origin/main, and the change is implemented againstmain's moq-net model (Track/Broadcast/MAX_GROUP_AGE,tokio::time::Instant).origin/devcurrently carries a divergent moq-net refactor (TrackInfowith a wirecache: Durationfield,BroadcastInfo,web_async::time::Instant,TrackDynamic), so this commit does not rebase cleanly ontodevand would need reimplementing against that shape. The PR is opened againstmainto keep the diff clean and reviewable; please retarget or request adevport as preferred. (CLAUDE.md: "when in doubt, targetmain; reviewers will redirect todevif needed.")Cross-package sync
The Cross-Package Sync table maps
rs/moq-netwire/API changes tojs/netanddoc/concept. This cache is local-only policy with no wire change (theTrackshape is untouched, no new framing), sojs/netanddoc/conceptneed nothing. No other rows apply.Test plan
cargo test -p moq-net --lib— 358 passedcargo clippy -p moq-net --all-targets— cleancargo fmt -p moq-net --check— cleancargo check -p moq-net --no-default-features— cleancargo check -p moq-relay— buildscargo check -p moq-cli— not run: pullsmoq-video→ a git dependency (nvidia-video-codec-sdk) blocked by this environment's egress policy. Unrelated to this change (moq-net is additive).New tests cover: default keeps only the latest group; a
Cacheretains history up tomax_bytes; byte pressure evicts the oldest;max_ageevicts by wall-clock; accessing an old group keeps it alive; a shared handle enforces one budget across two tracks (filling one evicts from the other); the max_sequence group is never evicted even under a zero budget; and the cache cascades origin → broadcast → track (including dynamically-served tracks). Existing track tests that relied on multiple retained groups now attach aCacheto reflect the latest-only default.🤖 Generated with Claude Code
https://claude.ai/code/session_01EviJwrDw3XZ9ZgESJGua28
(Written by Claude)
Generated by Claude Code