Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion peri-tui/src/app/agent_shell_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl ShellExecutor for AgentShellExecutor {
// output_rx 全程写磁盘(agent 路径不显示在 UI 输出流,仅写磁盘供详情面板 / 通知读取)。
// 与 !command 路径不同:那条路径 output_rx 由 App drain 丢弃;本路径交给 DiskOutput。
peri_agent::task_output::DiskOutput::spawn_writer(output_path.clone(), execution.output_rx);
let started_instant = execution.started_instant;
let process_abort = execution.abort.clone();

// 真正的进程退出信号在 execution.result(peri-tui 的 oneshot)。
Expand Down Expand Up @@ -241,7 +242,7 @@ impl ShellExecutor for AgentShellExecutor {
Some(background_tx)
},
kill: process_abort.clone(),
started_instant: std::time::Instant::now(),
started_instant,
direct_background: run_in_background,
};
if run_in_background {
Expand Down
37 changes: 37 additions & 0 deletions peri-tui/src/app/message_pipeline/message_pipeline_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,43 @@ fn test_handle_event_tool_lifecycle() {
assert!(matches!(actions[0], PipelineAction::None));
}

#[test]
fn test_bash_tool_start_uses_spawn_started_at() {
let mut pipeline = MessagePipeline::new("/tmp".to_string());
let _ = pipeline.handle_event(AgentEvent::ToolStart {
tool_call_id: "bash1".into(),
name: "Bash".into(),
display: "Bash".into(),
args: "sleep 10".into(),
input: json!({"command": "sleep 10"}),
source_agent_id: None,
});
let tail_vms = pipeline.build_tail_vms();
let Some(MessageViewModel::ToolBlock { started_at, .. }) = tail_vms.last() else {
panic!("Bash ToolStart 应生成 pending ToolBlock");
};
assert!(
started_at.is_none(),
"收到 shell 注册前不应开始 Ctrl+B 计时"
);

let spawned_at = std::time::Instant::now() - std::time::Duration::from_secs(3);
assert!(
pipeline.set_bash_tool_started_at("sleep 10", spawned_at),
"应能按 Bash command 回填真实启动时间"
);
let tail_vms = pipeline.build_tail_vms();
let Some(MessageViewModel::ToolBlock { started_at, .. }) = tail_vms.last() else {
panic!("Bash ToolStart 应仍生成 pending ToolBlock");
};
assert!(
started_at
.as_ref()
.is_some_and(|t| t.elapsed() >= std::time::Duration::from_secs(2)),
"Ctrl+B 提示计时应基于真实 spawn 时间"
);
}

/// 测试:handle_event StateSnapshot 更新 completed
#[test]
fn test_handle_event_state_snapshot() {
Expand Down
21 changes: 21 additions & 0 deletions peri-tui/src/app/message_pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ pub(crate) struct PendingTool {
tool_call_id: String,
name: String,
input: serde_json::Value,
/// Bash 子进程真实启动时刻;ToolStart 早于 spawn 时保持 None。
started_at: Option<Instant>,
}

/// ToolEnd 后、StateSnapshot 前的工具结果(用于在 reconcile gap 期间显示)
Expand Down Expand Up @@ -322,6 +324,23 @@ impl MessagePipeline {
&self.cwd
}

pub(crate) fn set_bash_tool_started_at(&mut self, command: &str, started_at: Instant) -> bool {
for pending in self.pending_tools.values_mut() {
if pending.name != "Bash" || pending.started_at.is_some() {
continue;
}
let Some(pending_command) = pending.input.get("command").and_then(|v| v.as_str())
else {
continue;
};
if pending_command == command {
pending.started_at = Some(started_at);
return true;
}
}
false
}

/// 获取当前流式渲染模式
pub(crate) fn streaming_mode(&self) -> StreamingMode {
self.streaming_mode
Expand Down Expand Up @@ -438,6 +457,7 @@ impl MessagePipeline {
tool_call_id: tool_call_id.to_string(),
name: name.to_string(),
input,
started_at: None,
},
);
} else {
Expand Down Expand Up @@ -692,6 +712,7 @@ impl MessagePipeline {
tool_call_id: tool_call_id.to_string(),
name: name.to_string(),
input,
started_at: None,
},
);
}
Expand Down
7 changes: 6 additions & 1 deletion peri-tui/src/app/message_pipeline/reconcile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,12 @@ impl MessagePipeline {
for tc in &self.current_ai_tool_calls {
if let Some(pending) = self.pending_tools.get(&tc.id) {
if pending.name != "Agent" {
tail_vms.push(self.build_tool_start_vm(&tc.id, &pending.name, &pending.input));
tail_vms.push(self.build_tool_start_vm(
&tc.id,
&pending.name,
&pending.input,
pending.started_at,
));
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion peri-tui/src/app/message_pipeline/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl MessagePipeline {
tool_call_id: &str,
name: &str,
input: &serde_json::Value,
started_at: Option<std::time::Instant>,
) -> MessageViewModel {
let display_name = tool_display::format_tool_name(name);
let args_display = tool_display::format_tool_args(name, input, Some(&self.cwd));
Expand All @@ -139,7 +140,7 @@ impl MessagePipeline {
collapsed: true,
color: tool_color(name),
diff_input: None,
started_at: Some(std::time::Instant::now()),
started_at,
content_hash: 0,
};
vm.recompute_hash();
Expand Down
69 changes: 67 additions & 2 deletions peri-tui/src/app/shell_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,50 @@ impl ShellCommandPool {
}
}

fn set_pending_bash_tool_started_at_in_view(
view_messages: &mut [MessageViewModel],
command: &str,
started_at: std::time::Instant,
) -> bool {
for vm in view_messages.iter_mut().rev() {
let matched = match vm {
MessageViewModel::ToolBlock {
tool_name,
args_display,
content,
is_error,
started_at: vm_started_at,
..
} => {
tool_name == "Bash"
&& content.is_empty()
&& !*is_error
&& vm_started_at.is_none()
&& args_display
.as_deref()
.is_some_and(|args| bash_args_display_matches_command(args, command))
}
_ => false,
};
if matched {
if let MessageViewModel::ToolBlock {
started_at: vm_started_at,
..
} = vm
{
*vm_started_at = Some(started_at);
}
vm.recompute_hash();
return true;
}
}
false
}

fn bash_args_display_matches_command(args_display: &str, command: &str) -> bool {
args_display == command || args_display == super::tool_display::truncate(command, 400)
}

impl App {
pub(crate) fn running_background_shell_task_count(&self) -> usize {
let session = self.session_mgr.current();
Expand Down Expand Up @@ -161,7 +205,7 @@ impl App {
},
output_rx: Some(execution.output_rx),
result_rx: Some(execution.result),
started_instant: std::time::Instant::now(),
started_instant: execution.started_instant,
};
}

Expand Down Expand Up @@ -493,6 +537,9 @@ impl App {
pub fn register_agent_shell(&mut self, reg: super::AgentShellRegistration) {
let direct_background = reg.direct_background;
let mut slot = super::AgentShellSlot::from_registration(reg);
if !direct_background {
self.set_agent_bash_tool_started_at(&slot.command, slot.started_instant);
}
// 直接后台命令启动 stall watchdog(检测卡在等待输入)
if direct_background {
let watchdog = super::background_shell::spawn_stall_watchdog(
Expand All @@ -507,6 +554,24 @@ impl App {
self.render_rebuild();
}

fn set_agent_bash_tool_started_at(
&mut self,
command: &str,
started_at: std::time::Instant,
) -> bool {
let session = self.session_mgr.current_mut();
let pipeline_changed = session
.messages
.pipeline
.set_bash_tool_started_at(command, started_at);
let view_changed = set_pending_bash_tool_started_at_in_view(
&mut session.messages.view_messages,
command,
started_at,
);
pipeline_changed || view_changed
}

/// 把当前会话中所有前台 agent shell 后台化(Ctrl+B 触发)。
///
/// 与 [`Self::background_foreground`](!command 路径)协同:Ctrl+B 时先尝试
Expand Down Expand Up @@ -672,7 +737,7 @@ impl App {
output_path,
execution.result,
execution.abort,
std::time::Instant::now(),
execution.started_instant,
);
bg.stall_watchdog = Some(watchdog);
self.session_mgr.current_mut().background_shells.push(bg);
Expand Down
27 changes: 27 additions & 0 deletions peri-tui/src/app/shell_command_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,33 @@ fn make_agent_shell_slot(
(AgentShellSlot::from_registration(reg), exit_signal)
}

#[test]
fn test_set_pending_bash_tool_started_at_in_view() {
let command = "echo hello";
let mut view_messages = vec![MessageViewModel::tool_block(
"Bash".to_string(),
"Bash".to_string(),
Some(command.to_string()),
false,
)];
let spawned_at = std::time::Instant::now() - std::time::Duration::from_secs(3);

assert!(
set_pending_bash_tool_started_at_in_view(&mut view_messages, command, spawned_at),
"应能回填当前 view 中的 pending Bash ToolBlock"
);

let MessageViewModel::ToolBlock { started_at, .. } = &view_messages[0] else {
panic!("测试数据应为 ToolBlock");
};
assert!(
started_at
.as_ref()
.is_some_and(|t| t.elapsed() >= std::time::Duration::from_secs(2)),
"回填后 Ctrl+B 提示应按真实 spawn 时间计时"
);
}

#[tokio::test]
async fn test_merge_shell_records_inserts_after_anchor_without_origin_messages() {
let (app, _handle) = App::new_headless(80, 24).await;
Expand Down
67 changes: 45 additions & 22 deletions peri-tui/src/shell_exec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::process::Stdio;
use std::{process::Stdio, time::Instant};

use anyhow::{Context, Result};
use peri_agent::encoding::decode_output_bytes;
Expand Down Expand Up @@ -108,6 +108,8 @@ pub struct ShellExecution {
pub abort: ShellAbortHandle,
/// 流式输出 channel(stdout + stderr 合并推送)
pub output_rx: mpsc::Receiver<Vec<u8>>,
/// 子进程成功 spawn 后的真实启动时刻。
pub started_instant: Instant,
}

/// 流式执行 shell 命令:stdout/stderr 通过 `output_rx` 流式推送,进程退出时
Expand All @@ -126,38 +128,48 @@ pub fn execute_shell_command_streaming(
let (output_tx, output_rx) = mpsc::channel::<Vec<u8>>(256);
let (result_tx, result_rx) = oneshot::channel::<Result<CommandOutput>>();

let cmd_str = command.to_string();
let cwd_str = cwd.to_string();
let handle = tokio::spawn(async move {
let result = run_streaming(&cmd_str, &cwd_str, stdin_rx, output_tx).await;
// task 被 abort 时 result_tx drop,result_rx 收到 Canceled,调用方需处理
let _ = result_tx.send(result);
});
let abort = ShellAbortHandle::from_tokio_abort(handle.abort_handle());
drop(handle);
match spawn_streaming_child(command, cwd, stdin_rx.is_some()) {
Ok((child, started_instant)) => {
let handle = tokio::spawn(async move {
let result = run_streaming_child(child, stdin_rx, output_tx).await;
// task 被 abort 时 result_tx drop,result_rx 收到 Canceled,调用方需处理
let _ = result_tx.send(result);
});
let abort = ShellAbortHandle::from_tokio_abort(handle.abort_handle());
drop(handle);

ShellExecution {
result: result_rx,
abort,
output_rx,
ShellExecution {
result: result_rx,
abort,
output_rx,
started_instant,
}
}
Err(error) => {
drop(output_tx);
let _ = result_tx.send(Err(error));
ShellExecution {
result: result_rx,
abort: ShellAbortHandle::noop(),
output_rx,
started_instant: Instant::now(),
}
}
}
}

/// 流式执行的实际逻辑:spawn 子进程,stdout/stderr 流式读取推送 + 累积,
/// 进程退出后返回累积的 CommandOutput。
async fn run_streaming(
fn spawn_streaming_child(
command: &str,
cwd: &str,
mut stdin_rx: Option<mpsc::Receiver<String>>,
output_tx: mpsc::Sender<Vec<u8>>,
) -> Result<CommandOutput> {
has_stdin: bool,
) -> Result<(tokio::process::Child, Instant)> {
let command = streaming_command_with_unbuffered_interpreters(command);
let mut cmd = peri_middlewares::process::shell_command(&command, &[]);
apply_streaming_unbuffered_env(&mut cmd);
if !cwd.trim().is_empty() {
cmd.current_dir(cwd);
}
if stdin_rx.is_some() {
if has_stdin {
cmd.stdin(Stdio::piped());
} else {
cmd.stdin(Stdio::null());
Expand All @@ -166,10 +178,21 @@ async fn run_streaming(
.stderr(Stdio::piped())
.kill_on_drop(true);

let mut child = cmd
let child = cmd
.spawn()
.with_context(|| format!("Failed to spawn shell command: {}", command))?;
let started_instant = Instant::now();

Ok((child, started_instant))
}

/// 流式执行的实际逻辑:stdout/stderr 流式读取推送 + 累积,
/// 进程退出后返回累积的 CommandOutput。
async fn run_streaming_child(
mut child: tokio::process::Child,
mut stdin_rx: Option<mpsc::Receiver<String>>,
output_tx: mpsc::Sender<Vec<u8>>,
) -> Result<CommandOutput> {
// stdin 写入 task(与 execute_shell_command_with_stdin 一致)
if let Some(mut rx) = stdin_rx.take() {
if let Some(mut stdin) = child.stdin.take() {
Expand Down
Loading
Loading