Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions Framework/Core/scripts/hyperloop-server/hyperloop_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,105 @@ async def analysis_trains(analysis_id: int, days: int = 14,
return "\n".join(lines)


@mcp.tool()
async def composition_trend(analysis_ids: str = "21674,50446,50462,50570",
dataset: str = "", days: int = 30,
daily_only: bool = True) -> str:
"""Trend of train composition & splitting over recent releases.

For the given analyses, groups their trains by package date and reports, per
date: number of trains, total wagons, wagons-per-train (mean / max), and how
many trains are in a ``decomposed`` (split-for-submission) state. Rising
wagons-per-train together with a falling train-count / decomposed-count means
more wagons are running together (fewer splits) — the downstream effect of
per-device memory wins, which is exactly what frees room under the per-train
memory budget.

Wagon count comes from the ``wagons_names`` field (comma-separated), so it is
approximate if that field is truncated server-side. Most informative on
production / splitting analyses; fixed-composition daily *test* analyses
(e.g. the benchmark set) never decompose, so they will look flat by design.

Args:
analysis_ids: comma-separated analysis ids (default: the benchmark set).
dataset: if set, ignore analysis_ids and group a single dataset's
trains by release — sub-trains/day = the split factor of that
(cross-analysis, merged) submission. The right lens for
*production* splits (a heavy merged train decomposing).
days: look-back window by package date (default 30).
daily_only: keep only daily builds (default True).
"""
cutoff = (datetime.date.today() - datetime.timedelta(days=days)).strftime("%Y%m%d")
trains: list = []
if dataset:
raw = await _get("trains/all-trains.jsp", {"dataset_name": dataset})
trains = [t for t in (raw or []) if t.get("dataset_name") == dataset]
src = f"dataset '{dataset}'"
else:
aids = [int(x) for x in str(analysis_ids).split(",") if str(x).strip()]
for aid in aids:
try:
raw = await _get("analysis/trains-by-analyses.jsp", {"analysis_ids": aid})
except Exception:
continue
c = raw[0] if isinstance(raw, list) and raw else raw
trains.extend(c.get("trains", []) if isinstance(c, dict) else [])
src = f"analyses {aids}"
# keep only trains within the look-back window
kept = []
for t in trains:
d = _tag_date(t.get("package_tag"))
if not d or d < cutoff:
continue
if daily_only and "daily" not in (t.get("package_tag") or "").lower():
continue
kept.append((d, t))
# Wagon count per train. Analysis-mode trains carry `wagons_names`; the
# dataset-mode (all-trains.jsp) ones do not, so fetch the count per train
# (concurrency-bounded; capped to the most recent trains to bound load —
# uncounted trains contribute to the train/decomp counts but not w/train).
if dataset:
kept.sort(key=lambda dt: (dt[0], dt[1].get("id", 0)), reverse=True)
sem = asyncio.Semaphore(8)

async def _wcount(tid):
async with sem:
try:
tj = await _get("trains/train.jsp", {"train_id": tid})
ts = tj.get("wagons_timestamp") or tj.get("dataset_timestamp")
if not ts:
return None
wd = await _get("trains/wagons_derived_data.jsp",
{"train_id": tid, "wagons_timestamp": ts})
return len(wd) if isinstance(wd, dict) else None
except Exception:
return None
fetched = await asyncio.gather(*[_wcount(t.get("id")) for _, t in kept[:120]])
counts = list(fetched) + [None] * (len(kept) - len(fetched))
else:
counts = [len([x for x in (t.get("wagons_names") or "").split(",") if x.strip()])
for _, t in kept]
per_date: dict = collections.defaultdict(list) # date -> [(nwagons|None, state)]
for (d, t), nw in zip(kept, counts):
per_date[d].append((nw, str(t.get("state") or "").lower()))
if not per_date:
return f"No trains for {src} in last {days}d."
lines = [f"Composition / split trend — {src}, last {days}d"
+ (", daily" if daily_only else "") + ":\n",
f"{'date':>8} {'trains':>6} {'wagons':>8} {'w/train':>8} {'maxw':>5} {'decomp':>7}"]
lines.append("-" * 54)
for d in sorted(per_date, reverse=True):
rows = per_date[d]
ntr = len(rows)
wcs = [w for w, _ in rows if w is not None]
tot = sum(wcs)
mx = max(wcs, default=0)
mean = tot / len(wcs) if wcs else 0.0
dec = sum(1 for _, s in rows if "decompos" in s or "split" in s)
lines.append(f"{d:>8} {ntr:>6} {tot:>8} {mean:>8.1f} {mx:>5} {dec:>7}")
return "\n".join(lines)


@mcp.tool()
async def test_metrics(train_id: int, per_device: bool = False) -> str:
"""Resource metrics for one test train (from performanceMetrics_processed.json).
Expand Down