feat(moq-net): usage stats in the model (BroadcastInfo-carried, origin-attributed)#1895
feat(moq-net): usage stats in the model (BroadcastInfo-carried, origin-attributed)#1895kixelated wants to merge 7 commits into
Conversation
Move per-broadcast usage counting into the transport-agnostic model, the foundation for metering every transport (moq-lite, IETF, and the non-MoQ gateways) uniformly without re-instrumenting each data path. New `model/usage.rs`: a `Usage` struct (groups/frames/bytes payload counters plus opened/closed lifecycle counters, all atomics) and a `BroadcastStats` pair (one `Usage` per direction). `BroadcastInfo` gains a `stats` field, so the immutable broadcast handle carries the sinks down to every track, group, and frame through a shared `Arc<BroadcastInfo>`. The group also carries an `Arc<TrackInfo>`, so `timescale` is read from there instead of threaded separately. Producer-side handles bump the ingress sink (groups at create_group/ append_group, frames+bytes at append_frame); consumer-side handles bump the egress sink as a subscriber receives groups and reads frames. A fetch re-serves an existing group, so it bumps frames/bytes but not groups. Behavior-preserving: the sinks default to unreferenced no-op atomics, so a standalone broadcast is unmetered and the existing stats-layer wiring is untouched. The stats-layer `BroadcastStats` is renamed `BroadcastHandle` to free the name for the new model type. `TrackProducer::new` keeps its signature (standalone tracks get the shared no-op broadcast). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011tHLbSdh7JuEL8Uyahsd3d
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011tHLbSdh7JuEL8Uyahsd3d
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011tHLbSdh7JuEL8Uyahsd3d
Wire the model's usage sinks to the stats layer and meter every transport uniformly, removing the per-handler payload counting. - Stats layer: each per-(tier, role) `Counters` holds an `Arc<Usage>` shared with the model (the same `Arc` baked into the broadcast), and the snapshot reads payload from it. `BroadcastHandle` vends the ingress/egress `Usage` for baking/stamping. The per-track `frame()`/`bytes()`/`group()` guard methods are gone; the track guards now track only the `subscriptions` lifecycle. - Ingress baked at construction: the lite/IETF subscriber loops build the broadcast with `stats.producer` already set, so create_group/create_frame meter ingress. The handler's per-frame/byte/group bumps are removed. - Egress attributed by the origin: `OriginConsumer::with_egress` carries a per-session sink provider; `get_broadcast`, the dynamic `request_broadcast` path, and `AnnounceConsumer` delivery stamp each `BroadcastConsumer`'s `stats.consumer` with the session's egress sink. The lite/IETF sessions attach it. The publisher's per-frame/byte/group bumps are removed; the model meters as the consumer reads (live via next_frame, fetch via get_frame; a fetch counts frames/bytes but not groups). Egress is per-consumer (N viewers count N times) and ingress is single-writer. Per the design, the model counts raw (decompressed) bytes; only the catalog track compresses, so the difference is noise (a follow-up will cache compressed bytes). Tests: model-level ingress-once/egress-per-viewer and origin egress-stamping; all 400 moq-net lib tests pass. Dependent crates compile; clippy/doc/fmt clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011tHLbSdh7JuEL8Uyahsd3d
Move the live viewer/publisher count into the model and retire the stats-layer `SessionBroadcasts` sentinel. The published `broadcasts` / `broadcasts_closed` schema is unchanged; it's now driven from a cleaner place. - `usage.rs`: a `Live` refcount over a `Usage` sink. The `0 -> 1` transition bumps `opened`, the last `1 -> 0` drop bumps `closed`, so `opened - closed` is the live count. Clones of a handle share one `Live` (one logical viewer/publisher). - Consumer side: `BroadcastConsumer` carries a viewers `Live` over its egress sink (re-keyed when the origin stamps the session sink). Each `TrackConsumer` from `track()` holds a token; it propagates to the `TrackSubscriber` so a subscription counts as a viewer for its whole life, not just while the `TrackConsumer` handle exists. N tracks on one consumer is still one viewer. - Producer side: a publishers `Live` over the ingress sink lives on the shared broadcast state, so every track-creation path (`create_track`, `reserve_track`, and the dynamic handler's `requested_track` -> `accept`, which is the relay's ingress path) takes a token. A broadcast with >= 1 live track is one publisher. - `stats.rs`: `Counters` drops its `broadcasts`/`broadcasts_closed` atoms and reads them from the model's `Usage.opened/closed` in the snapshot. `SessionBroadcasts` / `BroadcastSubscription` and the `publisher_broadcasts` / `subscriber_broadcasts` handle methods are removed. - handlers: drop the `SessionBroadcasts` plumbing; the model counts viewers (egress) and publishers (ingress) automatically. Tests: model-level live-count test (publisher per broadcast, viewer dedup across tracks) plus the reworked snapshot-mapping test. 400 moq-net lib tests pass; clippy/doc/fmt clean; dependent crates compile. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011tHLbSdh7JuEL8Uyahsd3d
| track_stats: std::sync::Arc<crate::PublisherTrack>, | ||
| // Held for the group's lifetime so it counts as one subscription; payload | ||
| // is metered by the model when the frames are read. | ||
| _track_stats: std::sync::Arc<crate::PublisherTrack>, |
There was a problem hiding this comment.
This should be in model now right?
| let publish = { | ||
| let stats = stats.clone(); | ||
| publish.with_egress(std::sync::Arc::new(move |abs: &str| { | ||
| stats.broadcast(abs).egress_usage() |
There was a problem hiding this comment.
Why isn't this done in origin?
|
|
||
| /// Shared no-op broadcast metadata for tracks created outside a broadcast. | ||
| /// | ||
| /// A standalone [`TrackProducer::new`] track has no broadcast to inherit from, so |
There was a problem hiding this comment.
Delete TrackProducer::new and delete this. All tracks must be created/owned by a broadcast.
| state: Default::default(), | ||
| let info = Arc::new(info); | ||
| let state = kio::Producer::<BroadcastState>::default(); | ||
| if let Ok(mut s) = state.write() { |
There was a problem hiding this comment.
Don't make a default state then grab the lock. Set the init state instead
| let track = TrackProducer::new(name, info); | ||
| let mut state = BroadcastState::modify(&self.state)?; | ||
| let publisher = state.publishers.enter(); | ||
| let track = TrackProducer::for_broadcast(name, info, self.info.clone(), Some(publisher)); |
There was a problem hiding this comment.
rename to new and make pub(super)
| } | ||
|
|
||
| /// The parent track's negotiated timescale, or `None` for untimed tracks. | ||
| pub fn timescale(&self) -> Option<Timescale> { |
There was a problem hiding this comment.
Should delete this
| @@ -342,7 +359,18 @@ impl std::ops::Deref for GroupConsumer { | |||
| impl GroupConsumer { | |||
| /// The parent track's negotiated timescale, or `None` for untimed tracks. | |||
| pub fn timescale(&self) -> Option<Timescale> { | |||
| // When set, stamp the resolved (dynamically served) consumer with this | ||
| // session's egress sink for the absolute path. The already-announced path | ||
| // (`Ready`) is stamped by `get_broadcast` instead. | ||
| stamp: Option<(EgressSink, PathOwned)>, |
There was a problem hiding this comment.
Maybe we have Arc instead? IDK
|
|
||
| impl Usage { | ||
| /// Record one group. | ||
| pub(crate) fn add_group(&self) { |
There was a problem hiding this comment.
Maybe all should be pub for custom stats.
| pub(crate) struct Live { | ||
| usage: Arc<Usage>, | ||
| count: AtomicUsize, | ||
| } |
There was a problem hiding this comment.
Maybe use Weak? If it fails to upgrade, we make a new LiveToken?
- `BroadcastProducer::new`: build `BroadcastState` with its publisher count set up front instead of default-then-lock. - Drop `GroupProducer::timescale` (unused) and `GroupConsumer::timescale`; the lite fetch path now reads the timescale from the track's `TrackInfo` (the same `info` it already fetches for compression), so the group carries `Arc<TrackInfo>` purely for the model's own use. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011tHLbSdh7JuEL8Uyahsd3d
|
Thanks for the thorough pass. Pushed
On the bigger ones I'd like your call before I sink the effort, since a couple ripple beyond moq-net:
Exploratory ones I'll treat as follow-ups unless you want them now: make (Written by Claude) Generated by Claude Code |
A frame writer needs the track's timescale to decide whether to attach a per-frame timestamp; `hang::container::Frame::encode` relies on it. Only `GroupConsumer::timescale` was redundant (its lone caller now reads the timescale from `TrackInfo`). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_011tHLbSdh7JuEL8Uyahsd3d
|
Correction on the timescale deletions: CI caught that (Written by Claude) Generated by Claude Code |
Implements the design in
rs/moq-net/DESIGN-stats.md(PR #1894), which supersedes thewith_meter/set_meterapproach on #1873. Per-broadcast usage sinks live inBroadcastInfo, are set at construction, and ride the immutableArc<BroadcastInfo>down to every track, group, and frame.Usageis atomics, so the model bumps through a shared&Arc<Usage>with no mutation, no setter, and noArc::make_mut.This takes over #1873 (which should be closed in favor of this).
What changed
The design's three phases, each landed as its own commit:
The model carries usage stats. New
model/usage.rs:Usage(groups/frames/bytes payload + opened/closed lifecycle atomics) andBroadcastStats(oneUsageper direction).BroadcastInfogains astatsfield;Arc<BroadcastInfo>+Arc<TrackInfo>are threaded through Broadcast/Track/Group; payload bumps move into the model (ingress at create_group/append_frame, egress as the consumer reads). The stats-layerBroadcastStatsis renamedBroadcastHandle.Origin attributes egress, ingress baked at construction. The lite/IETF subscriber loops build the broadcast with
stats.produceralready set. The per-sessionOriginConsumer::with_egressprovider stamps eachBroadcastConsumerit yields (announce stream,request_broadcast, dynamic accept) with that session's egress sink, so per-tier egress survives with zero mutation. The handler/gateway per-frame/byte/group bumps are deleted; the stats layer reads the sharedArc<Usage>instead of vending payload guards.Model-tracked live viewer/publisher counts. A
Liverefcount over each sink: aBroadcastConsumeris one live viewer while it has ≥1 outstandingTrackConsumer(the token rides into theTrackSubscriber, so a subscription counts for its whole life); a broadcast with ≥1 liveTrackProduceris one publisher (the publisher token rides the shared broadcast state, so the relay's dynamic-accept ingress path is covered).SessionBroadcastsis gone; the publish loop mapsopened - closedonto the existingbroadcasts/broadcasts_closedfields, so the published schema is unchanged.Semantics
groups.compress, so the difference is noise; a follow-up will cache compressed bytes so the count is the wire size again. (Confirmed acceptable.)Testing
cargo test -p moq-netpasses (400 lib + 4 integration), including new model tests for ingress-once/egress-per-viewer, origin egress-stamping, and live viewer/publisher counts.clippy/doc/fmtclean; dependent crates (moq-mux,hang,moq-relay,moq-native,moq-srt,moq-json,moq-ffi) compile.Cross-package sync
js/net: not mirrored. The stats aggregator / usage sinks are relay-side Rust only; the browser client has no equivalent. Nodoc/conceptwire change (this is API, not wire).Breaking changes (target
dev)Public API in
rs/moq-net:BroadcastInfogainsstats;Broadcast{Producer,Consumer,Dynamic}carryArc<BroadcastInfo>;GroupProducer::newtakesArc<TrackInfo>+Arc<BroadcastInfo>(wasOption<Timescale>); the stats-layerBroadcastStatsis renamedBroadcastHandle;SessionBroadcasts/BroadcastSubscriptionand thepublisher_broadcasts/subscriber_broadcasts/ per-trackframe/bytes/groupmethods are removed;Countersswaps its payload + broadcasts atoms for a sharedArc<Usage>.TrackProducer::new(name, info)keeps its signature (standalone tracks get a shared no-op broadcast).Follow-ups
with_egressand an ingress-baked broadcast).🤖 Generated with Claude Code
(Written by Claude)