diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index e0db5a9c4..2ff3026d1 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -95,10 +95,12 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -135,6 +137,13 @@ public class ChannelManager { private final ChannelPool channelPool; private final ChannelGroup openChannels; private final ConcurrentHashMap http2Connections = new ConcurrentHashMap<>(); + // Requests that could not acquire a connection permit and are waiting — off the event loop, WITHOUT + // blocking their caller thread — for a sibling HTTP/2 connection to the same origin to be registered so + // they can multiplex onto it. Keyed by the same partition key {@link #registerHttp2Connection} uses. + // Each waiter is invoked with the registered channel when one appears, or with {@code null} when the + // client is closing so it can fail its request rather than hang (its request-timeout is not scheduled + // yet at this point). See NettyRequestSender's HTTP/2 deferral. + private final ConcurrentHashMap>> http2ConnectionWaiters = new ConcurrentHashMap<>(); private AsyncHttpClientHandler wsHandler; private Http2Handler http2Handler; @@ -395,6 +404,52 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) { "HTTP/2 connection closed before a stream could be opened")); } }); + + // Wake any requests parked waiting for an HTTP/2 connection to this origin (they failed to acquire a + // connection permit and can multiplex onto this one without one). Wake with the currently-registered + // canonical connection — which may be an already-registered one this call lost the race to — so a + // "redundant" duplicate still lets the waiters resume onto the live connection. + Channel registered = http2Connections.get(partitionKey); + if (registered != null && registered.isActive()) { + wakeHttp2ConnectionWaiters(partitionKey, registered); + } + } + + /** + * Registers a one-shot waiter to be invoked when an HTTP/2 connection is registered under + * {@code partitionKey} (or with {@code null} on client close). See the {@link #http2ConnectionWaiters} + * field and NettyRequestSender's HTTP/2 deferral. The waiter must be idempotent — it may be invoked by + * a registration, by the client-close sweep, or removed and invoked by its own timeout concurrently. + */ + public void addHttp2ConnectionWaiter(Object partitionKey, Consumer onConnection) { + http2ConnectionWaiters.computeIfAbsent(partitionKey, k -> ConcurrentHashMap.newKeySet()).add(onConnection); + } + + public void removeHttp2ConnectionWaiter(Object partitionKey, Consumer onConnection) { + Set> waiters = http2ConnectionWaiters.get(partitionKey); + if (waiters != null) { + waiters.remove(onConnection); + } + } + + private void wakeHttp2ConnectionWaiters(Object partitionKey, Channel channel) { + Set> waiters = http2ConnectionWaiters.remove(partitionKey); + if (waiters != null) { + for (Consumer waiter : waiters) { + waiter.accept(channel); + } + } + } + + private void failHttp2ConnectionWaiters() { + for (Object key : http2ConnectionWaiters.keySet()) { + Set> waiters = http2ConnectionWaiters.remove(key); + if (waiters != null) { + for (Consumer waiter : waiters) { + waiter.accept(null); + } + } + } } /** @@ -469,6 +524,12 @@ private void doClose() { } public void close() { + // Fail any requests parked waiting for a sibling HTTP/2 connection to register (see the + // http2ConnectionWaiters field): the client is closing, so no connection will arrive and their + // request-timeout backstop is not scheduled yet. Do this synchronously up front — doClose() only + // runs after the (possibly long) graceful EventLoopGroup shutdown, and the nettyTimer that would + // otherwise fire their deadline is being stopped in parallel. + failHttp2ConnectionWaiters(); // Close the resolver group first while the EventLoopGroup is still active, // since Netty DNS resolvers may need a live EventLoop for clean shutdown. if (addressResolverGroup != null) { diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index a92789fb7..bad16e27a 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -38,6 +38,7 @@ import io.netty.resolver.AddressResolver; import io.netty.resolver.AddressResolverGroup; import io.netty.util.AsciiString; +import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; @@ -91,6 +92,8 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT; @@ -430,12 +433,14 @@ private ListenableFuture sendRequestWithNewChannel(Request request, Proxy try { future.acquirePartitionLockLazily(); } catch (IOException semaphoreException) { - // If HTTP/2 is enabled, another thread may be establishing an H2 connection. - // Poll the H2 registry with brief retries before giving up. + // The per-host permit is exhausted, but a sibling request may be establishing an HTTP/2 + // connection to this origin we can multiplex onto (stream reuse needs no permit). Reuse one + // if it exists; otherwise wait for one WITHOUT blocking the caller thread. Returns the + // pending future when handled; null means we should fail with the permit exception. if (config.isHttp2Enabled()) { - Channel h2Channel = waitForHttp2Connection(request, proxy, future); - if (h2Channel != null) { - return sendRequestWithOpenChannel(future, asyncHandler, h2Channel); + ListenableFuture handled = reuseOrDeferHttp2Connection(request, proxy, future, asyncHandler, semaphoreException); + if (handled != null) { + return handled; } } throw semaphoreException; @@ -1098,60 +1103,132 @@ private static void validateWebSocketRequest(Request request, AsyncHandler as } /** - * Waits briefly for an HTTP/2 connection to appear in the registry, so a request that just failed to - * acquire a connection permit can still multiplex onto an HTTP/2 connection another thread is - * establishing to the same origin. + * Tries to reuse — or, failing that, wait for — an HTTP/2 connection to this origin that a sibling + * request is establishing, after this request failed to acquire a connection permit. HTTP/2 stream reuse + * multiplexes onto an existing connection and needs no permit, so this lets an over-cap request proceed. * - *

{@link org.asynchttpclient.LoadBalance#ROUND_ROBIN} limitation. The HTTP/2 registry is an - * exact-key map, and in round-robin mode each connection is registered under its per-IP key - * ({@code RoundRobinPartitionKey(base, IP)}; see {@link org.asynchttpclient.netty.channel.NettyConnectListener}). - * A request pinned to {@code IP_B} therefore polls only {@code (base, IP_B)} and never discovers a - * sibling HTTP/2 connection already open on {@code IP_A}. The connection permit, however, is per host - * ({@code maxConnectionsPerHost}), so once the host is at its cap such a request can neither open a new - * connection nor reuse the sibling one: off the event loop it spins here for the full - * {@code connectTimeout} and then fails with the original permit exception. This only bites when - * {@code maxConnectionsPerHost} is configured (the default is unlimited). Note that falling back to a - * poll on the per-host base key would be a no-op — nothing is ever registered under it — so reusing a - * sibling-IP connection requires indexing the registry by base key (tracked in issue #2214). + *

If a connection is already registered it is used immediately. Otherwise, off the event loop, a + * one-shot {@link Http2ConnectionWaiter} is registered that resumes the send when a matching connection is + * registered ({@link ChannelManager#registerHttp2Connection}) — WITHOUT parking the caller thread — bounded + * by a {@code connectTimeout} deadline that fails the request. The previous implementation instead + * {@code Thread.sleep}-polled the registry here, blocking the caller thread (the synchronous part of + * {@code execute()}) for up to the full {@code connectTimeout} and burning CPU. + * + *

On the event loop we can neither block nor usefully defer: a redirect / 401 / 407 retry + * re-enters here on the loop, and the connection we would wait for is being established on that SAME loop, + * so waiting could self-deadlock. There we do the single immediate poll and give up. + * + *

{@link org.asynchttpclient.LoadBalance#ROUND_ROBIN} limitation (#2214). The registry is keyed + * per-IP in round-robin mode, so a request pinned to {@code IP_B} is only woken by a connection registered + * for {@code IP_B}, never a sibling on {@code IP_A}; such a request waits out the deadline and then fails + * with the permit exception — the same outcome as before, but now without occupying the caller thread. + * + * @return the (pending) future when the request was reused or deferred; {@code null} if it should be + * failed with {@code semaphoreException} (a WebSocket request, or on the event loop with no + * connection available) */ - private Channel waitForHttp2Connection(Request request, ProxyServer proxy, NettyResponseFuture future) { - Uri uri = request.getUri(); + private ListenableFuture reuseOrDeferHttp2Connection(Request request, ProxyServer proxy, + NettyResponseFuture future, AsyncHandler asyncHandler, IOException semaphoreException) { // WebSocket requests must never multiplex onto an HTTP/2 connection (no RFC 8441 support). See #2160. - if (uri.isWebSocket()) { + if (request.getUri().isWebSocket()) { return null; } - String virtualHost = request.getVirtualHost(); // In round-robin mode, only multiplex onto the H2 connection for the IP this request is pinned to. - Object override = future != null ? future.getPartitionKeyOverride() : null; - - Channel h2Channel = pollHttp2(override, uri, virtualHost, proxy, request); + Object override = future.getPartitionKeyOverride(); + Channel h2Channel = pollHttp2(override, request.getUri(), request.getVirtualHost(), proxy, request); if (h2Channel != null) { - return h2Channel; + return sendRequestWithOpenChannel(future, asyncHandler, h2Channel); } - - // NEVER block an event-loop thread here. A redirect / 401 / 407 retry re-enters sendRequest ON the - // event loop, and the HTTP/2 connection we would wait for is being established on that SAME loop — - // a Thread.sleep would freeze the loop and can self-deadlock (the connection never finishes because - // its loop is parked here). On the loop, do the single non-blocking poll above and give up; the - // caller then proceeds as if no poolable connection was found. if (isOnEventLoop()) { return null; } + new Http2ConnectionWaiter<>(request, proxy, future, asyncHandler, override, semaphoreException).arm(); + return future; + } - long deadline = System.nanoTime() + config.getConnectTimeout().toNanos(); - while (System.nanoTime() < deadline) { - h2Channel = pollHttp2(override, uri, virtualHost, proxy, request); - if (h2Channel != null) { - return h2Channel; + /** + * A one-shot, off-event-loop waiter for a sibling HTTP/2 connection to the request's origin, used when a + * request could not acquire a connection permit. Instead of blocking the caller thread polling the + * registry, it registers itself with the {@link ChannelManager} and returns; it fires exactly once — + * whichever of these happens first: + *

    + *
  • a matching connection is registered → resume the send onto it via {@link #sendRequestWithOpenChannel};
  • + *
  • the {@code connectTimeout} deadline elapses → fail the request with the original permit exception;
  • + *
  • the client closes → {@link ChannelManager} invokes it with {@code null} → fail the request.
  • + *
+ * The {@link #claimed} CAS makes those sources mutually exclusive. + */ + private final class Http2ConnectionWaiter implements Consumer { + + private final Request request; + private final ProxyServer proxy; + private final NettyResponseFuture future; + private final AsyncHandler asyncHandler; + private final Object override; + private final IOException semaphoreException; + // The key a matching connection registers under (registerHttp2Connection uses future.getPartitionKey()) + // — the same key pollHttp2 effectively polls, so a registration wakes exactly the right waiters. + private final Object waitKey; + private final AtomicBoolean claimed = new AtomicBoolean(); + private volatile Timeout deadline; + + Http2ConnectionWaiter(Request request, ProxyServer proxy, NettyResponseFuture future, + AsyncHandler asyncHandler, Object override, IOException semaphoreException) { + this.request = request; + this.proxy = proxy; + this.future = future; + this.asyncHandler = asyncHandler; + this.override = override; + this.semaphoreException = semaphoreException; + this.waitKey = future.getPartitionKey(); + } + + void arm() { + channelManager.addHttp2ConnectionWaiter(waitKey, this); + // Assign the deadline before the recheck so it exists (and is cancellable) if a wake races in. + deadline = nettyTimer.newTimeout(t -> fireTimeout(), + config.getConnectTimeout().toMillis(), TimeUnit.MILLISECONDS); + if (claimed.get()) { + // A registration woke us between addHttp2ConnectionWaiter and assigning `deadline`; cancel + // the now-orphaned timeout (accept() could not, as `deadline` was still null then). + deadline.cancel(); + return; } - try { - Thread.sleep(10); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; + // A connection may have registered between the caller's poll (in reuseOrDeferHttp2Connection) and + // addHttp2ConnectionWaiter above — a lost wakeup. Re-poll now that the waiter is registered. + Channel raced = pollHttp2(override, request.getUri(), request.getVirtualHost(), proxy, request); + if (raced != null) { + accept(raced); + } + } + + private void fireTimeout() { + if (claimed.compareAndSet(false, true)) { + channelManager.removeHttp2ConnectionWaiter(waitKey, this); + abort(null, future, semaphoreException); + } + } + + // Invoked with the registered connection to resume onto, or with null when the client is closing. + @Override + public void accept(Channel channel) { + if (!claimed.compareAndSet(false, true)) { + return; + } + channelManager.removeHttp2ConnectionWaiter(waitKey, this); + Timeout d = deadline; + if (d != null) { + d.cancel(); + } + if (future.isDone()) { + return; + } + if (channel == null) { + abort(null, future, semaphoreException); + } else { + sendRequestWithOpenChannel(future, asyncHandler, channel); } } - return null; } // Polls the HTTP/2 registry, using the IP-aware key in round-robin mode and the regular key otherwise. diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/ChannelManagerHttp2WaiterTest.java b/client/src/test/java/org/asynchttpclient/netty/channel/ChannelManagerHttp2WaiterTest.java new file mode 100644 index 000000000..ebd6a9b98 --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/netty/channel/ChannelManagerHttp2WaiterTest.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.channel.Channel; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Covers the HTTP/2 connection-waiter registry {@link ChannelManager} exposes for NettyRequestSender's + * non-blocking deferral: a request parked (off the event loop, without blocking its caller thread) after + * failing to acquire a connection permit is woken when a matching HTTP/2 connection registers, and is failed + * (invoked with {@code null}) when the client closes. + */ +public class ChannelManagerHttp2WaiterTest { + + private static final Object KEY = "partition-key"; + + private static ChannelManager newChannelManager(Timer timer) { + return new ChannelManager(new DefaultAsyncHttpClientConfig.Builder().build(), timer); + } + + @Test + @Timeout(unit = TimeUnit.SECONDS, value = 30) + public void waiterWokenWithChannelOnRegistration() { + Timer timer = new HashedWheelTimer(); + ChannelManager cm = newChannelManager(timer); + Channel channel = new EmbeddedChannel(); + try { + AtomicReference woken = new AtomicReference<>(); + AtomicBoolean called = new AtomicBoolean(); + Consumer waiter = c -> { + woken.set(c); + called.set(true); + }; + + cm.addHttp2ConnectionWaiter(KEY, waiter); + cm.registerHttp2Connection(KEY, channel); + + assertTrue(called.get(), "waiter must be woken when a connection registers for its key"); + assertSame(channel, woken.get(), "waiter must be woken with the registered connection"); + } finally { + channel.close(); + cm.close(); + timer.stop(); + } + } + + @Test + @Timeout(unit = TimeUnit.SECONDS, value = 30) + public void waiterForOtherKeyNotWoken() { + Timer timer = new HashedWheelTimer(); + ChannelManager cm = newChannelManager(timer); + Channel channel = new EmbeddedChannel(); + try { + AtomicBoolean called = new AtomicBoolean(); + cm.addHttp2ConnectionWaiter(KEY, c -> called.set(true)); + cm.registerHttp2Connection("a-different-key", channel); + assertFalse(called.get(), "a waiter must not be woken by a connection for a different key"); + } finally { + channel.close(); + cm.close(); + timer.stop(); + } + } + + @Test + @Timeout(unit = TimeUnit.SECONDS, value = 30) + public void removedWaiterNotWoken() { + Timer timer = new HashedWheelTimer(); + ChannelManager cm = newChannelManager(timer); + Channel channel = new EmbeddedChannel(); + try { + AtomicBoolean called = new AtomicBoolean(); + Consumer waiter = c -> called.set(true); + cm.addHttp2ConnectionWaiter(KEY, waiter); + cm.removeHttp2ConnectionWaiter(KEY, waiter); + cm.registerHttp2Connection(KEY, channel); + assertFalse(called.get(), "a removed waiter must not be woken"); + } finally { + channel.close(); + cm.close(); + timer.stop(); + } + } + + @Test + @Timeout(unit = TimeUnit.SECONDS, value = 30) + public void waiterFailedWithNullOnClose() { + Timer timer = new HashedWheelTimer(); + ChannelManager cm = newChannelManager(timer); + try { + AtomicBoolean called = new AtomicBoolean(); + AtomicReference arg = new AtomicReference<>(); + Consumer waiter = c -> { + arg.set(c); + called.set(true); + }; + cm.addHttp2ConnectionWaiter(KEY, waiter); + + cm.close(); // must fail pending waiters synchronously, before the async EventLoopGroup shutdown + + assertTrue(called.get(), "close must invoke pending waiters"); + assertNull(arg.get(), "close must invoke pending waiters with null (the abort signal)"); + } finally { + timer.stop(); + } + } +}