From cf6d3e7bcd4119f632749862081aa6fda305e6bc Mon Sep 17 00:00:00 2001 From: iHsin Date: Fri, 12 Jun 2026 01:01:07 +0800 Subject: [PATCH 1/3] fix(tuic): make Ctrl-C actually stop the quiche backend The quiche inbound created its own root CancellationToken and its accept loop never checked it, so the binary's ctrl-c handler had nothing to cancel: the listen task and every live connection kept running until main's 10s drain timeout force-aborted them ('Server did not drain within 10s of Ctrl-C'). - wind-tuic (quiche): add TuicheInboundBuilder::cancel_token; the accept loop now selects on the token and per-connection tokens are children of it, so one cancel() stops accepting and closes every connection. - tuic-server: wire ctx.cancel into the quiche builder, mirroring the quinn backend. - wind-quic (quiche): the accept-forwarding task exits once all QuicheAcceptors are dropped, releasing the listener socket/router. - wind-tuic (quinn): close the endpoint and wait_idle() after the listen loop exits so CONNECTION_CLOSE frames flush before the runtime drops. Verified on Windows: pre-fix the quiche backend hung the full 10s after Ctrl-C; post-fix both backends exit gracefully within a second. Co-Authored-By: Claude Fable 5 --- crates/tuic-server/src/wind_adapter.rs | 6 ++++- crates/wind-quic/src/quiche/mod.rs | 12 ++++++++- crates/wind-tuic/src/quiche/inbound.rs | 37 +++++++++++++++++++++++--- crates/wind-tuic/src/quinn/inbound.rs | 7 +++++ 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/crates/tuic-server/src/wind_adapter.rs b/crates/tuic-server/src/wind_adapter.rs index 8723bdc..0f793ba 100644 --- a/crates/tuic-server/src/wind_adapter.rs +++ b/crates/tuic-server/src/wind_adapter.rs @@ -318,11 +318,15 @@ async fn create_quiche_inbound(ctx: &Arc) -> eyre::Result break, + item = stream.next() => match item { + Some(item) => item, + None => break, + }, + }; match item { Ok(conn) => { let peer = conn.peer_addr(); diff --git a/crates/wind-tuic/src/quiche/inbound.rs b/crates/wind-tuic/src/quiche/inbound.rs index d59f1c7..97a803d 100644 --- a/crates/wind-tuic/src/quiche/inbound.rs +++ b/crates/wind-tuic/src/quiche/inbound.rs @@ -37,6 +37,9 @@ pub struct TuicheInbound { /// Hot-swappable certificate served to every handshake; update via /// [`TuicheInbound::cert_store`] for live rotation (e.g. ACME renewal). cert_store: CertStore, + /// Root cancellation token: cancelling it stops the accept loop and tears + /// down every live connection (each gets a child token). + cancel: CancellationToken, } impl TuicheInbound { @@ -62,13 +65,28 @@ impl AbstractInbound for TuicheInbound { let mut acceptor = bind_server(self.listen_addr, &tls, &transport, Some(&self.cert_store)).await?; let users = Arc::new(self.users.clone()); - // quiche has no external cancellation source here; each connection runs - // until the peer disconnects. - let root_cancel = CancellationToken::new(); + // Root of every per-connection token: cancelling `self.cancel` (e.g. from + // a ctrl-c handler via `TuicheInboundBuilder::cancel_token`) stops the + // accept loop *and* winds down every spawned connection handler, whose + // `serve_connection` closes its QUIC connection on cancellation. + let root_cancel = self.cancel.clone(); info!("wind-tuic (quiche) listening loop started"); - while let Some(conn) = acceptor.accept().await { + loop { + let conn = tokio::select! { + _ = root_cancel.cancelled() => { + info!("wind-tuic (quiche) inbound shutting down"); + break; + } + maybe_conn = acceptor.accept() => { + let Some(conn) = maybe_conn else { + info!("wind-tuic (quiche) acceptor closed; shutting down listen loop"); + break; + }; + conn + } + }; let remote = conn.peer_addr().unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0))); let span = tracing::info_span!("conn", peer = %remote); let users = users.clone(); @@ -88,6 +106,7 @@ pub struct TuicheInboundBuilder { cert_path: Option, private_key_path: Option, opts: ConnectionOpts, + cancel: Option, } impl TuicheInboundBuilder { @@ -99,9 +118,18 @@ impl TuicheInboundBuilder { cert_path: None, private_key_path: None, opts: ConnectionOpts::default(), + cancel: None, } } + /// Set the cancellation token driving graceful shutdown. Cancelling it stops + /// the accept loop and closes every live connection. Defaults to a fresh + /// token (i.e. the server only stops when the acceptor closes). + pub fn cancel_token(mut self, cancel: CancellationToken) -> Self { + self.cancel = Some(cancel); + self + } + /// Set the listen address. pub fn listen_addr(mut self, addr: SocketAddr) -> Self { self.listen_addr = Some(addr); @@ -161,6 +189,7 @@ impl TuicheInboundBuilder { cert_path, private_key_path, cert_store, + cancel: self.cancel.unwrap_or_default(), }) } } diff --git a/crates/wind-tuic/src/quinn/inbound.rs b/crates/wind-tuic/src/quinn/inbound.rs index 11c74c9..fabb340 100644 --- a/crates/wind-tuic/src/quinn/inbound.rs +++ b/crates/wind-tuic/src/quinn/inbound.rs @@ -234,6 +234,13 @@ impl AbstractInbound for TuicInbound { } } + // Close every remaining connection (CONNECTION_CLOSE, code 0) and wait + // for the close packets to flush. Without this, returning here lets the + // caller drop the runtime while close frames are still queued, so peers + // only learn about the shutdown via idle timeout. + endpoint.close(VarInt::from_u32(0), b"server shutdown"); + endpoint.wait_idle().await; + Ok(()) } } From b0360f7f2c2a828d115c1e549e5b58a17d16176f Mon Sep 17 00:00:00 2001 From: iHsin Date: Fri, 12 Jun 2026 01:03:40 +0800 Subject: [PATCH 2/3] style: rustfmt --- crates/wind-tuic/src/quiche/inbound.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/wind-tuic/src/quiche/inbound.rs b/crates/wind-tuic/src/quiche/inbound.rs index 97a803d..9e076bf 100644 --- a/crates/wind-tuic/src/quiche/inbound.rs +++ b/crates/wind-tuic/src/quiche/inbound.rs @@ -122,9 +122,9 @@ impl TuicheInboundBuilder { } } - /// Set the cancellation token driving graceful shutdown. Cancelling it stops - /// the accept loop and closes every live connection. Defaults to a fresh - /// token (i.e. the server only stops when the acceptor closes). + /// Set the cancellation token driving graceful shutdown. Cancelling it + /// stops the accept loop and closes every live connection. Defaults to a + /// fresh token (i.e. the server only stops when the acceptor closes). pub fn cancel_token(mut self, cancel: CancellationToken) -> Self { self.cancel = Some(cancel); self From 83190cffe214ee0ee48563144e57b7c26e912df2 Mon Sep 17 00:00:00 2001 From: iHsin Date: Fri, 12 Jun 2026 01:28:11 +0800 Subject: [PATCH 3/3] fix: complete the cancellation chain across all binaries and crates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the quiche ctrl-c fix: a full audit of the cancellation mechanism found several more components that ignored the cancel token, leaked detached tasks past shutdown, or risked blocking a runtime worker. - tuic-client: had no shutdown path at all (ctrl-c hard-killed the process; the server only noticed via idle timeout). Add run_with_cancel(), wire ctrl-c -> cancel -> 10s drain in main (same structure as tuic-server), select the token in the SOCKS5 accept loop and TCP/UDP forwarders, and track per-session tasks in TaskTrackers so shutdown waits for them. - wind-tuic (quinn outbound): on cancellation the heartbeat poll now closes the QUIC connection ("client shutdown") so the server reaps it immediately instead of waiting out its idle timeout. - wind-tuic (quiche inbound): track serve_connection tasks and wait for them after the accept loop exits so CONNECTION_CLOSE frames flush before the caller drops the runtime. - wind-tuic (quinn inbound): spawn connection handlers into the shared ctx.tasks TaskTracker (previously created but unused) so context owners can drain them. - wind-socks: per-connection tasks get a child token and a TaskTracker; the detached UDP association task is cancel-guarded; listen waits for in-flight sessions on shutdown. - wind-naive: stop join()ing the bridge I/O thread from async — a parked blocking read would pin a tokio worker indefinitely. Drop the handle instead (same strategy as the UoT relay). - tuic-server: the CertResolver file watcher now takes a cancel token instead of looping forever. Verified end-to-end on Windows: client/server pair relays traffic, then ctrl-c on the client exits in <1s with the server logging the connection close immediately; ctrl-c on the server exits in <1s on both the quinn and quiche backends. 164 unit tests pass. Co-Authored-By: Claude Fable 5 --- crates/tuic-client/Cargo.toml | 4 +-- crates/tuic-client/src/forward.rs | 49 +++++++++++++++++++------- crates/tuic-client/src/lib.rs | 40 +++++++++++++++++---- crates/tuic-client/src/main.rs | 44 +++++++++++++++++++++-- crates/tuic-client/src/socks5/mod.rs | 36 +++++++++++++++---- crates/tuic-server/src/tls.rs | 36 +++++++++++++------ crates/wind-naive/src/lib.rs | 8 ++++- crates/wind-socks/Cargo.toml | 2 +- crates/wind-socks/src/inbound.rs | 34 +++++++++++++++--- crates/wind-tuic/Cargo.toml | 2 +- crates/wind-tuic/src/quiche/inbound.rs | 10 +++++- crates/wind-tuic/src/quinn/inbound.rs | 5 ++- crates/wind-tuic/src/quinn/outbound.rs | 6 +++- 13 files changed, 226 insertions(+), 50 deletions(-) diff --git a/crates/tuic-client/Cargo.toml b/crates/tuic-client/Cargo.toml index 027a8cf..6b7ee57 100644 --- a/crates/tuic-client/Cargo.toml +++ b/crates/tuic-client/Cargo.toml @@ -48,8 +48,8 @@ quinn-congestions = { workspace = true } # Tokio/Async crossbeam-utils = { version = "0.8", default-features = false, features = ["std"] } -tokio = { version = "1", default-features = false, features = ["io-util", "macros", "net", "parking_lot", "rt-multi-thread", "time"] } -tokio-util = { version = "0.7", default-features = false, features = ["compat"] } +tokio = { version = "1", default-features = false, features = ["io-util", "macros", "net", "parking_lot", "rt-multi-thread", "signal", "time"] } +tokio-util = { version = "0.7", default-features = false, features = ["compat", "rt"] } # TLS rustls = { version = "0.23", default-features = false } diff --git a/crates/tuic-client/src/forward.rs b/crates/tuic-client/src/forward.rs index 206dc59..e96e138 100644 --- a/crates/tuic-client/src/forward.rs +++ b/crates/tuic-client/src/forward.rs @@ -10,9 +10,10 @@ use std::{ use bytes::Bytes; use socket2::{Domain, Protocol, SockAddr, Socket, Type}; use tokio::net::{TcpListener, UdpSocket}; +use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, info, warn}; use wind_core::{ - AbstractOutbound, + AbstractOutbound, AppContext, types::TargetAddr, udp::{UdpPacket, UdpStream}, }; @@ -36,16 +37,19 @@ fn next_assoc_id() -> u16 { // hot path. The local `sessions: HashMap` // in `run_udp_forwarder` is the only routing table in use. -pub async fn start(tcp: Vec, udp: Vec) { +/// Spawn the configured TCP/UDP forwarders into `ctx.tasks`, each driven by a +/// child of `ctx.token` so shutdown stops the accept/recv loops and aborts +/// in-flight per-connection tasks. +pub async fn start(tcp: Vec, udp: Vec, ctx: &Arc) { for entry in tcp { - tokio::spawn(run_tcp_forwarder(entry)); + ctx.tasks.spawn(run_tcp_forwarder(entry, ctx.token.child_token())); } for entry in udp { - tokio::spawn(run_udp_forwarder(entry)); + ctx.tasks.spawn(run_udp_forwarder(entry, ctx.token.child_token())); } } -async fn run_tcp_forwarder(entry: TcpForward) { +async fn run_tcp_forwarder(entry: TcpForward, cancel: CancellationToken) { let listener = match create_tcp_listener(entry.listen) { Ok(l) => l, Err(err) => { @@ -60,13 +64,28 @@ async fn run_tcp_forwarder(entry: TcpForward) { remote = entry.remote ); loop { - match listener.accept().await { - Ok((inbound, peer)) => { - let remote = entry.remote.clone(); - let span = tracing::info_span!("forward_tcp", peer = %peer); - tokio::spawn(handle_tcp_conn(inbound, remote).instrument(span)); + tokio::select! { + _ = cancel.cancelled() => { + info!("[forward-tcp] cancellation received, shutting down"); + break; + } + res = listener.accept() => match res { + Ok((inbound, peer)) => { + let remote = entry.remote.clone(); + let span = tracing::info_span!("forward_tcp", peer = %peer); + let conn_cancel = cancel.child_token(); + tokio::spawn( + async move { + tokio::select! { + _ = conn_cancel.cancelled() => {} + _ = handle_tcp_conn(inbound, remote) => {} + } + } + .instrument(span), + ); + } + Err(err) => warn!("[forward-tcp] accept error: {err}"), } - Err(err) => warn!("[forward-tcp] accept error: {err}"), } } } @@ -121,7 +140,7 @@ struct UdpForwardSession { last_seen: std::time::Instant, } -async fn run_udp_forwarder(entry: UdpForward) { +async fn run_udp_forwarder(entry: UdpForward, cancel: CancellationToken) { let socket = match UdpSocket::bind(entry.listen).await { Ok(s) => s, Err(err) => { @@ -153,6 +172,12 @@ async fn run_udp_forwarder(entry: UdpForward) { loop { tokio::select! { + _ = cancel.cancelled() => { + info!("[forward-udp] cancellation received, shutting down"); + // Dropping `sessions` drops every `tx_to_out`, which closes the + // per-session relay tasks' inbound channels so they exit cleanly. + break; + } recv = socket.recv_from(&mut buf) => match recv { Ok((n, src_addr)) => { let pkt = Bytes::copy_from_slice(&buf[..n]); diff --git a/crates/tuic-client/src/lib.rs b/crates/tuic-client/src/lib.rs index f48d756..c240008 100644 --- a/crates/tuic-client/src/lib.rs +++ b/crates/tuic-client/src/lib.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use tokio_util::sync::CancellationToken; use wind_core::AppContext; pub mod config; @@ -14,16 +15,35 @@ pub mod wind_adapter; pub use config::Config; -/// Run the TUIC client with the given configuration (using wind-tuic) +/// Run the TUIC client with the given configuration (using wind-tuic). +/// +/// Constructs its own [`CancellationToken`] internally; callers that want to +/// drive a graceful shutdown from outside should use [`run_with_cancel`]. pub async fn run(cfg: Config) -> eyre::Result<()> { - // Initialize wind-tuic connection - let ctx = Arc::new(AppContext::default()); - wind_adapter::create_connection(ctx, cfg.relay).await?; + run_with_cancel(cfg, CancellationToken::new()).await +} + +/// Run the TUIC client with a caller-owned cancel token. +/// +/// Cancelling `cancel` stops the SOCKS5 accept loop and the TCP/UDP +/// forwarders, closes the TUIC connection (so the server sees the client go +/// away immediately instead of waiting out its idle timeout), and waits for +/// tracked background tasks to drain. Pair with `tokio::select!` on `ctrl_c()` +/// so signal-triggered shutdown is graceful instead of relying on runtime drop. +pub async fn run_with_cancel(cfg: Config, cancel: CancellationToken) -> eyre::Result<()> { + // The context token is the caller's token, so the outbound's heartbeat poll + // task (which closes the QUIC connection on cancellation) and every UDP + // session task wind down from the same `cancel()`. + let ctx = Arc::new(AppContext { + tasks: tokio_util::task::TaskTracker::new(), + token: cancel.clone(), + }); + wind_adapter::create_connection(ctx.clone(), cfg.relay).await?; tracing::info!("TUIC client initialized with wind-tuic backend"); - // Start forwarders - forward::start(cfg.local.tcp_forward.clone(), cfg.local.udp_forward.clone()).await; + // Start forwarders (tracked in ctx.tasks, cancelled via ctx.token). + forward::start(cfg.local.tcp_forward.clone(), cfg.local.udp_forward.clone(), &ctx).await; // Start SOCKS5 server match socks5::Server::set_config(cfg.local) { @@ -33,6 +53,12 @@ pub async fn run(cfg: Config) -> eyre::Result<()> { } } - socks5::Server::start().await; + socks5::Server::start(cancel.clone()).await; + + // `start` only returns once cancelled; drain the tracked background tasks + // (heartbeat poll, forwarder loops, UDP sessions) before returning so the + // QUIC close frames flush while the runtime is still alive. + ctx.tasks.close(); + ctx.tasks.wait().await; Ok(()) } diff --git a/crates/tuic-client/src/main.rs b/crates/tuic-client/src/main.rs index a127dd5..6582ed2 100644 --- a/crates/tuic-client/src/main.rs +++ b/crates/tuic-client/src/main.rs @@ -1,9 +1,10 @@ -use std::{process, str::FromStr}; +use std::{process, str::FromStr, time::Duration}; use chrono::{Offset, TimeZone}; use clap::Parser; #[cfg(feature = "jemallocator")] use tikv_jemallocator::Jemalloc; +use tokio_util::sync::CancellationToken; use tracing::level_filters::LevelFilter; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tuic_client::config::{Cli, Config, EnvState}; @@ -51,5 +52,44 @@ async fn main() -> eyre::Result<()> { )), ) .try_init()?; - tuic_client::run(cfg).await + // Own the cancel token here so the ctrl-c branch can trigger a graceful + // shutdown: stop the SOCKS5/forwarder accept loops, close the TUIC + // connection (the server learns we left instead of waiting out its idle + // timeout), and drain background tasks — same structure as tuic-server. + let cancel = CancellationToken::new(); + let mut client = tokio::spawn(tuic_client::run_with_cancel(cfg, cancel.clone())); + + tokio::select! { + res = &mut client => { + match res { + Ok(Ok(())) => {} + Ok(Err(err)) => { + tracing::error!("Client exited with error: {err}"); + return Err(err); + } + Err(join_err) => { + tracing::error!("Client task panicked or was cancelled: {join_err}"); + return Err(eyre::eyre!("Client task panicked or was cancelled: {join_err}")); + } + } + } + res = tokio::signal::ctrl_c() => { + if let Err(err) = res { + tracing::error!("Failed to listen for Ctrl-C: {err}"); + return Err(eyre::eyre!("Failed to listen for Ctrl-C: {err}")); + } + tracing::info!("Received Ctrl-C, shutting down."); + cancel.cancel(); + + // Give in-flight sessions up to 10 seconds to drain before dropping + // out of main and letting runtime teardown abort the rest. + match tokio::time::timeout(Duration::from_secs(10), client).await { + Ok(Ok(Ok(()))) => {} + Ok(Ok(Err(err))) => tracing::warn!("Client drained with error: {err}"), + Ok(Err(join_err)) => tracing::warn!("Client task drain join error: {join_err}"), + Err(_) => tracing::warn!("Client did not drain within 10s of Ctrl-C; aborting outstanding tasks"), + } + } + } + Ok(()) } diff --git a/crates/tuic-client/src/socks5/mod.rs b/crates/tuic-client/src/socks5/mod.rs index 186114a..f840ae4 100644 --- a/crates/tuic-client/src/socks5/mod.rs +++ b/crates/tuic-client/src/socks5/mod.rs @@ -14,6 +14,7 @@ use socks5_server::{ auth::{NoAuth, Password}, }; use tokio::{net::TcpListener, sync::RwLock as AsyncRwLock}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::{Instrument, debug, info, warn}; use crate::{config::Local, error::Error}; @@ -111,20 +112,43 @@ impl Server { }) } - pub async fn start() { + /// Accept SOCKS5 connections until `cancel` fires, then wait for in-flight + /// session tasks to wind down (each gets a child token, so cancellation + /// aborts handshakes and relays promptly). + pub async fn start(cancel: CancellationToken) { let server = SERVER.get().unwrap(); warn!("[socks5] server started, listening on {}", server.inner.local_addr().unwrap()); + let conn_tasks = TaskTracker::new(); loop { - match server.inner.accept().await { - Ok((conn, addr)) => { - let span = tracing::info_span!("socks5", peer = %addr); - tokio::spawn(Self::handle_socks5_conn(server, conn).instrument(span)); + tokio::select! { + _ = cancel.cancelled() => { + info!("[socks5] cancellation received, shutting down"); + break; + } + res = server.inner.accept() => match res { + Ok((conn, addr)) => { + let span = tracing::info_span!("socks5", peer = %addr); + let conn_cancel = cancel.child_token(); + conn_tasks.spawn( + async move { + tokio::select! { + _ = conn_cancel.cancelled() => { + debug!("session aborted by shutdown"); + } + _ = Self::handle_socks5_conn(server, conn) => {} + } + } + .instrument(span), + ); + } + Err(err) => warn!("[socks5] failed to establish connection: {err}"), } - Err(err) => warn!("[socks5] failed to establish connection: {err}"), } } + conn_tasks.close(); + conn_tasks.wait().await; } async fn handle_socks5_conn(server: &Server, conn: socks5_server::IncomingConnection) { diff --git a/crates/tuic-server/src/tls.rs b/crates/tuic-server/src/tls.rs index 9ab6fcb..c1c7ffe 100644 --- a/crates/tuic-server/src/tls.rs +++ b/crates/tuic-server/src/tls.rs @@ -14,6 +14,7 @@ use rustls::{ }; use sha2::{Digest, Sha256}; use tokio::fs; +use tokio_util::sync::CancellationToken; use tracing::warn; #[derive(Debug)] @@ -24,7 +25,7 @@ pub struct CertResolver { hash: ArcSwap<[u8; 32]>, } impl CertResolver { - pub async fn new(cert_path: &Path, key_path: &Path, interval: Duration) -> Result> { + pub async fn new(cert_path: &Path, key_path: &Path, interval: Duration, cancel: CancellationToken) -> Result> { let cert_key = load_cert_key(cert_path, key_path).await?; let hash = Self::calc_hash(cert_path, key_path).await?; let resolver = Arc::new(Self { @@ -33,20 +34,24 @@ impl CertResolver { cert_key: ArcSwap::new(cert_key), hash: ArcSwap::new(Arc::new(hash)), }); - // Start file watcher in background + // Start file watcher in background; exits on `cancel` so the task does + // not outlive the server when used as a library. let resolver_clone = resolver.clone(); tokio::spawn(async move { - if let Err(e) = resolver_clone.start_watch(interval).await { + if let Err(e) = resolver_clone.start_watch(interval, cancel).await { warn!("Certificate watcher exited with error: {e}"); } }); Ok(resolver) } - async fn start_watch(&self, interval: Duration) -> Result<()> { + async fn start_watch(&self, interval: Duration, cancel: CancellationToken) -> Result<()> { let mut interval = tokio::time::interval(interval); loop { - interval.tick().await; + tokio::select! { + _ = cancel.cancelled() => return Ok(()), + _ = interval.tick() => {} + } // Treat I/O errors here as transient (the cert file may be in the // middle of an ACME-driven `rename`, the directory may be missing @@ -322,9 +327,14 @@ mod tests { let (cert_der, key_der) = generate_test_cert_der()?; let (cert_file, key_file) = create_temp_cert_file(&cert_der, &key_der).await; - let resolver = CertResolver::new(cert_file.path(), key_file.path(), Duration::from_secs(10)) - .await - .unwrap(); + let resolver = CertResolver::new( + cert_file.path(), + key_file.path(), + Duration::from_secs(10), + CancellationToken::new(), + ) + .await + .unwrap(); let certified_key = resolver.cert_key.load_full(); assert!(!certified_key.cert.is_empty()); @@ -341,7 +351,7 @@ mod tests { tokio::fs::write(&cert_path, &cert_pem.as_bytes()).await.unwrap(); tokio::fs::write(&key_path, &key_pem.as_bytes()).await.unwrap(); - let resolver = CertResolver::new(&cert_path, &key_path, Duration::from_micros(100)) + let resolver = CertResolver::new(&cert_path, &key_path, Duration::from_micros(100), CancellationToken::new()) .await .unwrap(); @@ -372,7 +382,13 @@ mod tests { let load_result = load_cert_key(cert_file.path(), key_file.path()).await; assert!(load_result.is_err()); - let resolver_result = CertResolver::new(cert_file.path(), key_file.path(), Duration::from_secs(10)).await; + let resolver_result = CertResolver::new( + cert_file.path(), + key_file.path(), + Duration::from_secs(10), + CancellationToken::new(), + ) + .await; assert!(resolver_result.is_err()); } diff --git a/crates/wind-naive/src/lib.rs b/crates/wind-naive/src/lib.rs index f7cee92..2e236e2 100644 --- a/crates/wind-naive/src/lib.rs +++ b/crates/wind-naive/src/lib.rs @@ -465,7 +465,13 @@ async fn naive_async_bridge( } } - let _ = io_handle.join(); + // Do not `join` the I/O thread from this async task: it may be parked in + // a blocking `naive.read()` with no traffic, and `join()` would pin a + // tokio worker thread until the remote finally speaks or times out. + // Dropping the channels instead makes the thread's next send/recv fail, + // and the `NaiveConn` is cancelled when the thread unwinds and drops it + // (same strategy as the UoT relay above). + drop(io_handle); Ok(()) } .instrument(span) diff --git a/crates/wind-socks/Cargo.toml b/crates/wind-socks/Cargo.toml index a3509c5..59b65cf 100644 --- a/crates/wind-socks/Cargo.toml +++ b/crates/wind-socks/Cargo.toml @@ -10,7 +10,7 @@ license = "MIT OR Apache-2.0" wind-core = { version = "0.1.1", path = "../wind-core"} # Async tokio = { version = "1", default-features = false, features = ["net"] } -tokio-util = { version = "0.7", features = ["codec"] } +tokio-util = { version = "0.7", features = ["codec", "rt"] } tokio-stream = "0.1" futures-util = { version = "0.3", default-features = false, features = ["sink"] } fast-socks5 = "1.0.0-rc.0" diff --git a/crates/wind-socks/src/inbound.rs b/crates/wind-socks/src/inbound.rs index e107e7a..33a3368 100644 --- a/crates/wind-socks/src/inbound.rs +++ b/crates/wind-socks/src/inbound.rs @@ -45,6 +45,10 @@ pub struct SocksInbound { impl AbstractInbound for SocksInbound { async fn listen(&self, cb: &impl InboundCallback) -> eyre::Result<()> { let listener = TcpListener::bind(self.opts.listen_addr).await?; + // Track per-connection tasks so shutdown can wait for them instead of + // leaving in-flight sessions to be killed by runtime teardown. Each task + // also gets a child token so cancellation aborts the session promptly. + let conn_tasks = tokio_util::task::TaskTracker::new(); loop { tokio::select! { _ = self.cancel.cancelled() => { @@ -62,10 +66,19 @@ impl AbstractInbound for SocksInbound { let opts = self.opts.clone(); let cb = cb.clone(); - tokio::spawn( + let conn_cancel = self.cancel.child_token(); + conn_tasks.spawn( async move { - if let Err(err) = handle_income(opts, stream, client_addr, cb).await { - error!(target: "socks_in_handler" , "{:}", err); + let handler_cancel = conn_cancel.clone(); + tokio::select! { + _ = conn_cancel.cancelled() => { + info!(target: "socks_in_handler", "session aborted by shutdown"); + } + res = handle_income(opts, stream, client_addr, cb, handler_cancel) => { + if let Err(err) = res { + error!(target: "socks_in_handler" , "{:}", err); + } + } } } .in_current_span(), @@ -73,6 +86,8 @@ impl AbstractInbound for SocksInbound { } }; } + conn_tasks.close(); + conn_tasks.wait().await; Ok(()) } } @@ -93,6 +108,7 @@ async fn handle_income( stream: TcpStream, client_addr: SocketAddr, cb: impl InboundCallback, + cancel: CancellationToken, ) -> Result<(), Error> { let proto = match &opts.auth { AuthMode::NoAuth => Socks5ServerProtocol::accept_no_auth(stream).await.context(SocksSnafu)?, @@ -155,10 +171,18 @@ async fn handle_income( }; let cb = cb.clone(); + // Detached from the session task, so it needs its own cancel + // guard — otherwise it would outlive shutdown until runtime drop. + let udp_cancel = cancel.clone(); tokio::spawn( async move { - if let Err(e) = cb.handle_udpstream(udp_stream).await { - error!(target: "socks_in_handler", "UDP association error: {}", e); + tokio::select! { + _ = udp_cancel.cancelled() => {} + res = cb.handle_udpstream(udp_stream) => { + if let Err(e) = res { + error!(target: "socks_in_handler", "UDP association error: {}", e); + } + } } } .in_current_span(), diff --git a/crates/wind-tuic/Cargo.toml b/crates/wind-tuic/Cargo.toml index 7d09e3a..a5bf2c9 100644 --- a/crates/wind-tuic/Cargo.toml +++ b/crates/wind-tuic/Cargo.toml @@ -49,7 +49,7 @@ wind-quic = { version = "0.1.1", path = "../wind-quic", default-features = false # Async tokio = { version = "1", default-features = false, features = ["net", "time", "rt", "sync", "io-util", "macros"] } -tokio-util = { version = "0.7", features = ["codec"] } +tokio-util = { version = "0.7", features = ["codec", "rt"] } tokio-stream = "0.1" futures-util = { version = "0.3", default-features = false, features = ["sink"] } diff --git a/crates/wind-tuic/src/quiche/inbound.rs b/crates/wind-tuic/src/quiche/inbound.rs index 9e076bf..eefe3b5 100644 --- a/crates/wind-tuic/src/quiche/inbound.rs +++ b/crates/wind-tuic/src/quiche/inbound.rs @@ -70,6 +70,11 @@ impl AbstractInbound for TuicheInbound { // accept loop *and* winds down every spawned connection handler, whose // `serve_connection` closes its QUIC connection on cancellation. let root_cancel = self.cancel.clone(); + // Track connection handlers so shutdown can wait for them to finish + // closing — `serve_connection` returns right after issuing `conn.close`, + // and waiting here keeps the tokio-quiche workers alive long enough to + // flush the CONNECTION_CLOSE frames before the caller drops the runtime. + let conn_tasks = tokio_util::task::TaskTracker::new(); info!("wind-tuic (quiche) listening loop started"); @@ -92,9 +97,12 @@ impl AbstractInbound for TuicheInbound { let users = users.clone(); let cb = cb.clone(); let cancel = root_cancel.child_token(); - tokio::spawn(crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel).instrument(span)); + conn_tasks.spawn(crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel).instrument(span)); } + conn_tasks.close(); + conn_tasks.wait().await; + Ok(()) } } diff --git a/crates/wind-tuic/src/quinn/inbound.rs b/crates/wind-tuic/src/quinn/inbound.rs index fabb340..bdd7a4e 100644 --- a/crates/wind-tuic/src/quinn/inbound.rs +++ b/crates/wind-tuic/src/quinn/inbound.rs @@ -222,7 +222,10 @@ impl AbstractInbound for TuicInbound { let remote = incoming.remote_address(); let span = tracing::info_span!("conn", peer = %remote); - tokio::spawn(spawn_logged( + // Spawn into the shared TaskTracker so the context owner can + // drain connection handlers on shutdown (e.g. wind's + // `tasks.close()` + `tasks.wait()` after cancelling). + self.ctx.tasks.spawn(spawn_logged( "Connection handler", handle_connection(incoming, users, auth_timeout, zero_rtt, cb, conn_cancel), ).instrument(span)); diff --git a/crates/wind-tuic/src/quinn/outbound.rs b/crates/wind-tuic/src/quinn/outbound.rs index 38a2d23..4e3ebd0 100644 --- a/crates/wind-tuic/src/quinn/outbound.rs +++ b/crates/wind-tuic/src/quinn/outbound.rs @@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken; use tracing::{Instrument as _, info, warn}; use uuid::Uuid; use wind_core::{AbstractOutbound, AppContext, tcp::AbstractTcpStream, types::TargetAddr}; -use wind_quic::quinn::QuinnConnection; +use wind_quic::{QuicConnection as _, quinn::QuinnConnection}; use crate::{ Error, @@ -129,6 +129,10 @@ impl TuicOutbound { tokio::select! { _ = cancel_token.cancelled() => { info!(target: "tuic_out", "Heartbeat poll cancelled"); + // Tell the server we are going away so it can reap the + // connection immediately instead of waiting out its idle + // timeout. + connection.close(0, b"client shutdown"); return Ok(()); } _ = hb_interval.tick() => {