diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/Rfc6724AddressSelectingDnsResolver.java b/httpclient5/src/main/java/org/apache/hc/client5/http/Rfc6724AddressSelectingDnsResolver.java new file mode 100644 index 0000000000..0257306811 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/Rfc6724AddressSelectingDnsResolver.java @@ -0,0 +1,602 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http; + +import java.net.DatagramSocket; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hc.client5.http.config.ProtocolFamilyPreference; +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code Rfc6724AddressSelectingDnsResolver} wraps a delegate {@link DnsResolver} + * and applies RFC 6724 destination address selection rules to the returned + * addresses. It can also enforce or bias a protocol family preference. + * + *

The canonical hostname lookup is delegated unchanged.

+ * + * @since 5.6 + */ +@Contract(threading = ThreadingBehavior.IMMUTABLE) +public final class Rfc6724AddressSelectingDnsResolver implements DnsResolver { + + private static final Logger LOG = LoggerFactory.getLogger(Rfc6724AddressSelectingDnsResolver.class); + + + private static final int PROBE_PORT = 53; // UDP connect trick; no packets sent + + private final DnsResolver delegate; + private final ProtocolFamilyPreference familyPreference; + + /** + * Creates a new resolver that applies RFC 6724 ordering with no family bias (INTERLEAVE). + * + * @param delegate underlying resolver to use. + */ + public Rfc6724AddressSelectingDnsResolver(final DnsResolver delegate) { + this(delegate, ProtocolFamilyPreference.INTERLEAVE); + } + + /** + * Creates a new resolver that applies RFC 6724 ordering and a specific protocol family preference. + * + * @param delegate underlying resolver to use. + * @param familyPreference family preference to apply (e.g. PREFER_IPV6, IPV4_ONLY). + */ + public Rfc6724AddressSelectingDnsResolver( + final DnsResolver delegate, + final ProtocolFamilyPreference familyPreference) { + this.delegate = java.util.Objects.requireNonNull(delegate, "delegate"); + this.familyPreference = familyPreference != null ? familyPreference : ProtocolFamilyPreference.INTERLEAVE; + } + + @Override + public InetAddress[] resolve(final String host) throws UnknownHostException { + if (LOG.isDebugEnabled()) { + LOG.debug("{} resolving host '{}' via delegate {}", simpleName(), host, delegate.getClass().getName()); + LOG.debug("{} familyPreference={}", simpleName(), familyPreference); + } + + final InetAddress[] resolved = delegate.resolve(host); + + if (resolved == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} delegate returned null for '{}'", simpleName(), host); + } + return null; + } + if (LOG.isDebugEnabled()) { + LOG.debug("{} delegate returned {} addresses for '{}': {}", simpleName(), resolved.length, host, fmt(resolved)); + } + if (resolved.length <= 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} nothing to sort/filter (<=1 address). Returning as-is.", simpleName()); + } + return resolved; + } + + // 1) Filter by family if forced + final List candidates = new ArrayList<>(resolved.length); + switch (familyPreference) { + case IPV4_ONLY: + for (final InetAddress a : resolved) { + if (a instanceof Inet4Address) { + candidates.add(a); + } + } + break; + case IPV6_ONLY: + for (final InetAddress a : resolved) { + if (a instanceof Inet6Address) { + candidates.add(a); + } + } + break; + default: + candidates.addAll(Arrays.asList(resolved)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("{} after family filter {} -> {} candidate(s): {}", simpleName(), familyPreference, candidates.size(), fmt(candidates)); + } + + if (candidates.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} no address of requested family; returning empty for '{}'", simpleName(), host); + } + return new InetAddress[0]; + } + + // 2) RFC 6724 sort (uses UDP connect to infer source addresses; no packets sent) + final List rfcSorted = sortByRfc6724(candidates); + + // 3) Apply family preference ordering where applicable + final List ordered = applyFamilyPreference(rfcSorted, familyPreference); + + if (LOG.isDebugEnabled()) { + LOG.debug("{} final ordered list for '{}': {}", simpleName(), host, fmt(ordered)); + } + + return ordered.toArray(new InetAddress[0]); + } + + @Override + public String resolveCanonicalHostname(final String host) throws UnknownHostException { + if (LOG.isDebugEnabled()) { + LOG.debug("{} resolveCanonicalHostname('{}') via delegate {}", simpleName(), host, delegate.getClass().getName()); + } + return delegate.resolveCanonicalHostname(host); + } + + // --- RFC 6724 helpers (minimal port of the algo you used in the operator) --- + + private static List sortByRfc6724(final List addrs) { + if (addrs.size() < 2) { + return addrs; + } + if (LOG.isDebugEnabled()) { + LOG.debug("RFC6724 input candidates: {}", fmt(addrs)); + } + + final List sockAddrs = new ArrayList<>(addrs.size()); + for (final InetAddress a : addrs) { + sockAddrs.add(new InetSocketAddress(a, PROBE_PORT)); + } + final List srcs = srcAddrs(sockAddrs); + + final List infos = new ArrayList<>(addrs.size()); + for (int i = 0; i < addrs.size(); i++) { + final InetAddress dst = addrs.get(i); + final InetAddress src = srcs.get(i); + infos.add(new Info(dst, src, ipAttrOf(dst), ipAttrOf(src))); + } + + if (LOG.isDebugEnabled()) { + for (final Info info : infos) { + LOG.debug("RFC6724 candidate dst={} src={} dst[scope={},prec={},label={}] src[scope={},prec={},label={}]", + addr(info.dst), addr(info.src), + info.dstAttr.scope, info.dstAttr.precedence, info.dstAttr.label, + info.srcAttr.scope, info.srcAttr.precedence, info.srcAttr.label); + } + } + + infos.sort(RFC6724_COMPARATOR); + + final List out = new ArrayList<>(infos.size()); + for (final Info info : infos) { + out.add(info.dst); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("RFC6724 output order: {}", fmt(out)); + } + return out; + } + + private static List applyFamilyPreference( + final List rfcSorted, + final ProtocolFamilyPreference pref) { + if (rfcSorted.size() <= 1) { + return rfcSorted; + } + switch (pref) { + case PREFER_IPV6: + case PREFER_IPV4: { + final boolean preferV6 = pref == ProtocolFamilyPreference.PREFER_IPV6; + final List first = new ArrayList<>(); + final List second = new ArrayList<>(); + for (final InetAddress a : rfcSorted) { + final boolean isV6 = a instanceof Inet6Address; + if (preferV6 && isV6 || !preferV6 && !isV6) { + first.add(a); + } else { + second.add(a); + } + } + final List merged = new ArrayList<>(rfcSorted.size()); + merged.addAll(first); + merged.addAll(second); + if (LOG.isDebugEnabled()) { + LOG.debug("Family preference {} applied. First bucket={}, second bucket={}", + pref, fmt(first), fmt(second)); + LOG.debug("Family preference output: {}", fmt(merged)); + } + return merged; + } + case IPV4_ONLY: + case IPV6_ONLY: + // already filtered earlier + if (LOG.isDebugEnabled()) { + LOG.debug("Family preference {} enforced earlier. Order unchanged: {}", pref, fmt(rfcSorted)); + } + return rfcSorted; + case INTERLEAVE: + default: { + final List v6 = new LinkedList<>(); + final List v4 = new LinkedList<>(); + for (final InetAddress a : rfcSorted) { + if (a instanceof Inet6Address) { + v6.add(a); + } else { + v4.add(a); + } + } + if (v6.isEmpty() || v4.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("INTERLEAVE requested but only one family present. Order unchanged: {}", fmt(rfcSorted)); + } + return rfcSorted; + } + final boolean startV6 = rfcSorted.get(0) instanceof Inet6Address; + final List merged = new ArrayList<>(rfcSorted.size()); + while (!v6.isEmpty() || !v4.isEmpty()) { + if (startV6) { + if (!v6.isEmpty()) { + merged.add(v6.remove(0)); + } + if (!v4.isEmpty()) { + merged.add(v4.remove(0)); + } + } else { + if (!v4.isEmpty()) { + merged.add(v4.remove(0)); + } + if (!v6.isEmpty()) { + merged.add(v6.remove(0)); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("INTERLEAVE starting family={} -> {}", startV6 ? "IPv6" : "IPv4", fmt(merged)); + } + return merged; + } + } + } + + private static List srcAddrs(final List addrs) { + final List srcs = new ArrayList<>(addrs.size()); + for (final InetSocketAddress dest : addrs) { + InetAddress src = null; + try (final DatagramSocket s = new DatagramSocket()) { + s.connect(dest); // does not send packets; OS picks source addr/if + src = s.getLocalAddress(); + } catch (final SocketException ignore) { + if (LOG.isDebugEnabled()) { + LOG.debug("RFC6724 could not infer source address for {}: {}", dest, ignore.toString()); + } + } + srcs.add(src); + } + if (LOG.isDebugEnabled()) { + final List printable = new ArrayList<>(srcs.size()); + for (final InetAddress a : srcs) { + printable.add(addr(a)); + } + LOG.debug("RFC6724 inferred source addresses: {}", printable); + } + return srcs; + } + + // --- RFC 6724 score structs --- + + private static final class Info { + final InetAddress dst; + final InetAddress src; + final Attr dstAttr; + final Attr srcAttr; + + Info(final InetAddress dst, final InetAddress src, final Attr dstAttr, final Attr srcAttr) { + this.dst = dst; + this.src = src; + this.dstAttr = dstAttr; + this.srcAttr = srcAttr; + } + } + + private static final class Attr { + final Scope scope; + final int precedence; + final int label; + + Attr(final Scope scope, final int precedence, final int label) { + this.scope = scope; + this.precedence = precedence; + this.label = label; + } + } + + private enum Scope { + INTERFACE_LOCAL(0x1), + LINK_LOCAL(0x2), + ADMIN_LOCAL(0x4), + SITE_LOCAL(0x5), + ORG_LOCAL(0x8), + GLOBAL(0xe); + + final int value; + + Scope(final int v) { + this.value = v; + } + + static Scope fromValue(final int v) { + switch (v) { + case 0x1: + return INTERFACE_LOCAL; + case 0x2: + return LINK_LOCAL; + case 0x4: + return ADMIN_LOCAL; + case 0x5: + return SITE_LOCAL; + case 0x8: + return ORG_LOCAL; + default: + return GLOBAL; + } + } + } + + private static Attr ipAttrOf(final InetAddress ip) { + if (ip == null) { + return new Attr(Scope.GLOBAL, 0, 0); + } + final PolicyEntry e = classify(ip); + return new Attr(classifyScope(ip), e.precedence, e.label); + } + + private static Scope classifyScope(final InetAddress ip) { + if (ip.isLoopbackAddress() || ip.isLinkLocalAddress()) { + return Scope.LINK_LOCAL; + } + if (ip.isMulticastAddress()) { + if (ip instanceof Inet6Address) { + return Scope.fromValue(ip.getAddress()[1] & 0x0f); + } + return Scope.GLOBAL; + } + if (ip.isSiteLocalAddress()) { + return Scope.SITE_LOCAL; + } + return Scope.GLOBAL; + } + + private static final class PolicyEntry { + final Network prefix; + final int precedence; + final int label; + + PolicyEntry(final Network prefix, final int precedence, final int label) { + this.prefix = prefix; + this.precedence = precedence; + this.label = label; + } + } + + private static final class Network { + final byte[] ip; + final int bits; + + Network(final byte[] ip, final int bits) { + this.ip = ip; + this.bits = bits; + } + + boolean contains(final InetAddress addr) { + final byte[] a = (addr instanceof Inet4Address) ? v4toMapped(addr.getAddress()) : addr.getAddress(); + if (a.length != ip.length) { + return false; + } + final int fullBytes = bits / 8; + for (int i = 0; i < fullBytes; i++) { + if (a[i] != ip[i]) { + return false; + } + } + final int rem = bits % 8; + if (rem == 0) { + return true; + } + final int mask = 0xff << (8 - rem); + return (a[fullBytes] & mask) == (ip[fullBytes] & mask); + } + + private static byte[] v4toMapped(final byte[] v4) { + final byte[] mapped = new byte[16]; + mapped[10] = mapped[11] = (byte) 0xff; + System.arraycopy(v4, 0, mapped, 12, 4); + return mapped; + } + } + + private static Network toPrefix(final String text, final int bits) { + try { + return new Network(InetAddress.getByName(text).getAddress(), bits); + } catch (final UnknownHostException ex) { + throw new IllegalArgumentException(ex); + } + } + + private static final List POLICY_TABLE = + Collections.unmodifiableList(Arrays.asList( + new PolicyEntry(toPrefix("::1", 128), 50, 0), + new PolicyEntry(toPrefix("::ffff:0:0", 96), 35, 4), + new PolicyEntry(toPrefix("::", 96), 1, 3), + new PolicyEntry(toPrefix("2001::", 32), 5, 5), + new PolicyEntry(toPrefix("2002::", 16), 30, 2), + new PolicyEntry(toPrefix("3ffe::", 16), 1, 12), + new PolicyEntry(toPrefix("fec0::", 10), 1, 11), + new PolicyEntry(toPrefix("fc00::", 7), 3, 13), + new PolicyEntry(toPrefix("::", 0), 40, 1) + )); + + private static PolicyEntry classify(final InetAddress ip) { + for (final PolicyEntry e : POLICY_TABLE) { + if (e.prefix.contains(ip)) { + return e; + } + } + return new PolicyEntry(null, 40, 1); + } + + private static final Comparator RFC6724_COMPARATOR = (a, b) -> { + final InetAddress DA = a.dst, DB = b.dst; + final InetAddress SourceDA = a.src, SourceDB = b.src; + final Attr attrDA = a.dstAttr, attrDB = b.dstAttr; + final Attr attrSourceDA = a.srcAttr, attrSourceDB = b.srcAttr; + + final int preferDA = -1; + final int preferDB = 1; + + // Rule 1: Avoid unusable destinations. + final boolean validA = SourceDA != null && !SourceDA.isAnyLocalAddress(); + final boolean validB = SourceDB != null && !SourceDB.isAnyLocalAddress(); + if (!validA && !validB) { + return 0; + } + if (!validB) { + return preferDA; + } + if (!validA) { + return preferDB; + } + + // Rule 2: Prefer matching scope. + if (attrDA.scope == attrSourceDA.scope && attrDB.scope != attrSourceDB.scope) { + return preferDA; + } + if (attrDA.scope != attrSourceDA.scope && attrDB.scope == attrSourceDB.scope) { + return preferDB; + } + + // TODO Rule 3 & 4: skipped + + // Rule 5: Prefer matching label. + if (attrSourceDA.label == attrDA.label && attrSourceDB.label != attrDB.label) { + return preferDA; + } + if (attrSourceDA.label != attrDA.label && attrSourceDB.label == attrDB.label) { + return preferDB; + } + + // Rule 6: Prefer higher precedence. + if (attrDA.precedence > attrDB.precedence) { + return preferDA; + } + if (attrDA.precedence < attrDB.precedence) { + return preferDB; + } + + // TODO Rule 7: skipped + + // Rule 8: Prefer smaller scope. + if (attrDA.scope.value < attrDB.scope.value) { + return preferDA; + } + if (attrDA.scope.value > attrDB.scope.value) { + return preferDB; + } + + // Rule 9: Longest common prefix (IPv6 only). + if (DA instanceof Inet6Address && DB instanceof Inet6Address) { + final int commonA = commonPrefixLen(SourceDA, DA); + final int commonB = commonPrefixLen(SourceDB, DB); + if (commonA > commonB) { + return preferDA; + } + if (commonA < commonB) { + return preferDB; + } + } + + // Rule 10: Otherwise equal (original order preserved by stable sort). + return 0; + }; + + private static int commonPrefixLen(final InetAddress a, final InetAddress b) { + if (a == null || b == null || a.getClass() != b.getClass()) { + return 0; + } + final byte[] aa = a.getAddress(); + final byte[] bb = b.getAddress(); + final int len = Math.min(aa.length, bb.length); + int bits = 0; + for (int i = 0; i < len; i++) { + final int x = (aa[i] ^ bb[i]) & 0xFF; + if (x == 0) { + bits += 8; + } else { + for (int j = 7; j >= 0; j--) { + if ((x & (1 << j)) != 0) { + return bits; + } + bits++; + } + return bits; + } + } + return bits; + } + + private static String addr(final InetAddress a) { + if (a == null) return "null"; + final boolean v6 = a instanceof Inet6Address; + return v6 ? "IPv6" : "IPv4" + "(" + a.getHostAddress() + ")"; + } + + private static List fmt(final InetAddress[] arr) { + final List out = new ArrayList<>(arr.length); + for (final InetAddress a : arr) out.add(addr(a)); + return out; + } + + private static List fmt(final List arr) { + final List out = new ArrayList<>(arr.size()); + for (final InetAddress a : arr) { + out.add(addr(a)); + } + return out; + } + + private static String simpleName() { + return "Rfc6724Resolver"; + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/config/ConnectionConfig.java b/httpclient5/src/main/java/org/apache/hc/client5/http/config/ConnectionConfig.java index 5c2f654985..a54de443d0 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/config/ConnectionConfig.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/config/ConnectionConfig.java @@ -44,6 +44,15 @@ public class ConnectionConfig implements Cloneable { private static final Timeout DEFAULT_CONNECT_TIMEOUT = Timeout.ofMinutes(3); + /** + * @since 5.6 + */ + private static final TimeValue DEFAULT_HE_ATTEMPT_DELAY = TimeValue.ofMilliseconds(250); + /** + * @since 5.6 + */ + private static final TimeValue DEFAULT_HE_OTHER_FAMILY_DELAY = TimeValue.ofMilliseconds(50); + public static final ConnectionConfig DEFAULT = new Builder().build(); private final Timeout connectTimeout; @@ -52,11 +61,28 @@ public class ConnectionConfig implements Cloneable { private final TimeValue validateAfterInactivity; private final TimeValue timeToLive; + /** + * @since 5.6 + */ + private final boolean staggeredConnectEnabled; + /** + * @since 5.6 + */ + private final TimeValue happyEyeballsAttemptDelay; + /** + * @since 5.6 + */ + private final TimeValue happyEyeballsOtherFamilyDelay; + /** + * @since 5.6 + */ + private final ProtocolFamilyPreference protocolFamilyPreference; + /** * Intended for CDI compatibility */ protected ConnectionConfig() { - this(DEFAULT_CONNECT_TIMEOUT, null, null, null, null); + this(DEFAULT_CONNECT_TIMEOUT, null, null, null, null, false, DEFAULT_HE_ATTEMPT_DELAY, DEFAULT_HE_OTHER_FAMILY_DELAY, ProtocolFamilyPreference.INTERLEAVE); } ConnectionConfig( @@ -64,13 +90,21 @@ protected ConnectionConfig() { final Timeout socketTimeout, final Timeout idleTimeout, final TimeValue validateAfterInactivity, - final TimeValue timeToLive) { + final TimeValue timeToLive, + final boolean staggeredConnectEnabled, + final TimeValue happyEyeballsAttemptDelay, + final TimeValue happyEyeballsOtherFamilyDelay, + final ProtocolFamilyPreference protocolFamilyPreference) { super(); this.connectTimeout = connectTimeout; this.socketTimeout = socketTimeout; this.idleTimeout = idleTimeout; this.validateAfterInactivity = validateAfterInactivity; this.timeToLive = timeToLive; + this.staggeredConnectEnabled = staggeredConnectEnabled; + this.happyEyeballsAttemptDelay = happyEyeballsAttemptDelay != null ? happyEyeballsAttemptDelay : DEFAULT_HE_ATTEMPT_DELAY; + this.happyEyeballsOtherFamilyDelay = happyEyeballsOtherFamilyDelay != null ? happyEyeballsOtherFamilyDelay : DEFAULT_HE_OTHER_FAMILY_DELAY; + this.protocolFamilyPreference = protocolFamilyPreference != null ? protocolFamilyPreference : ProtocolFamilyPreference.INTERLEAVE; } /** @@ -108,6 +142,46 @@ public TimeValue getTimeToLive() { return timeToLive; } + /** + * Whether staggered (Happy Eyeballs–style) connection attempts are enabled. + * + * @see Builder#setStaggeredConnectEnabled(boolean) + * @since 5.6 + */ + public boolean isStaggeredConnectEnabled() { + return staggeredConnectEnabled; + } + + /** + * Delay between subsequent staggered connection attempts. + * + * @see Builder#setHappyEyeballsAttemptDelay(TimeValue) + * @since 5.6 + */ + public TimeValue getHappyEyeballsAttemptDelay() { + return happyEyeballsAttemptDelay; + } + + /** + * Initial delay before launching the first address of the other protocol family. + * + * @see Builder#setHappyEyeballsOtherFamilyDelay(TimeValue) + * @since 5.6 + */ + public TimeValue getHappyEyeballsOtherFamilyDelay() { + return happyEyeballsOtherFamilyDelay; + } + + /** + * Protocol family preference controlling address selection and ordering. + * + * @see Builder#setProtocolFamilyPreference(ProtocolFamilyPreference) + * @since 5.6 + */ + public ProtocolFamilyPreference getProtocolFamilyPreference() { + return protocolFamilyPreference; + } + @Override protected ConnectionConfig clone() throws CloneNotSupportedException { return (ConnectionConfig) super.clone(); @@ -122,6 +196,10 @@ public String toString() { builder.append(", idleTimeout=").append(idleTimeout); builder.append(", validateAfterInactivity=").append(validateAfterInactivity); builder.append(", timeToLive=").append(timeToLive); + builder.append(", staggeredConnectEnabled=").append(staggeredConnectEnabled); + builder.append(", happyEyeballsAttemptDelay=").append(happyEyeballsAttemptDelay); + builder.append(", happyEyeballsOtherFamilyDelay=").append(happyEyeballsOtherFamilyDelay); + builder.append(", protocolFamilyPreference=").append(protocolFamilyPreference); builder.append("]"); return builder.toString(); } @@ -135,7 +213,11 @@ public static ConnectionConfig.Builder copy(final ConnectionConfig config) { .setConnectTimeout(config.getConnectTimeout()) .setSocketTimeout(config.getSocketTimeout()) .setValidateAfterInactivity(config.getValidateAfterInactivity()) - .setTimeToLive(config.getTimeToLive()); + .setTimeToLive(config.getTimeToLive()) + .setStaggeredConnectEnabled(config.isStaggeredConnectEnabled()) + .setHappyEyeballsAttemptDelay(config.getHappyEyeballsAttemptDelay()) + .setHappyEyeballsOtherFamilyDelay(config.getHappyEyeballsOtherFamilyDelay()) + .setProtocolFamilyPreference(config.getProtocolFamilyPreference()); } public static class Builder { @@ -146,6 +228,12 @@ public static class Builder { private TimeValue validateAfterInactivity; private TimeValue timeToLive; + // New fields (defaults) + private boolean staggeredConnectEnabled = false; // disabled by default + private TimeValue happyEyeballsAttemptDelay = DEFAULT_HE_ATTEMPT_DELAY; + private TimeValue happyEyeballsOtherFamilyDelay = DEFAULT_HE_OTHER_FAMILY_DELAY; + private ProtocolFamilyPreference protocolFamilyPreference = ProtocolFamilyPreference.INTERLEAVE; + Builder() { super(); this.connectTimeout = DEFAULT_CONNECT_TIMEOUT; @@ -281,13 +369,63 @@ public Builder setTimeToLive(final long timeToLive, final TimeUnit timeUnit) { return this; } + /** + * Enables or disables staggered (Happy Eyeballs–style) connection attempts. + * + * @since 5.6 + * @return this instance. + */ + public Builder setStaggeredConnectEnabled(final boolean enabled) { + this.staggeredConnectEnabled = enabled; + return this; + } + + /** + * Sets the delay between staggered connection attempts. + * + * @since 5.6 + * @return this instance. + */ + public Builder setHappyEyeballsAttemptDelay(final TimeValue delay) { + this.happyEyeballsAttemptDelay = delay; + return this; + } + + /** + * Sets the initial delay before launching the first address of the other + * protocol family (IPv6 vs IPv4) when interleaving attempts. + * + * @since 5.6 + * @return this instance. + */ + public Builder setHappyEyeballsOtherFamilyDelay(final TimeValue delay) { + this.happyEyeballsOtherFamilyDelay = delay; + return this; + } + + /** + * Sets the protocol family preference that guides address selection and ordering. + * + * @since 5.6 + * @return this instance. + */ + public Builder setProtocolFamilyPreference(final ProtocolFamilyPreference preference) { + this.protocolFamilyPreference = preference; + return this; + } + public ConnectionConfig build() { return new ConnectionConfig( connectTimeout != null ? connectTimeout : DEFAULT_CONNECT_TIMEOUT, socketTimeout, idleTimeout, validateAfterInactivity, - timeToLive); + timeToLive, + staggeredConnectEnabled, + happyEyeballsAttemptDelay != null ? happyEyeballsAttemptDelay : DEFAULT_HE_ATTEMPT_DELAY, + happyEyeballsOtherFamilyDelay != null ? happyEyeballsOtherFamilyDelay : DEFAULT_HE_OTHER_FAMILY_DELAY, + protocolFamilyPreference != null ? protocolFamilyPreference : ProtocolFamilyPreference.INTERLEAVE + ); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/config/ProtocolFamilyPreference.java b/httpclient5/src/main/java/org/apache/hc/client5/http/config/ProtocolFamilyPreference.java new file mode 100644 index 0000000000..d8026e2977 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/config/ProtocolFamilyPreference.java @@ -0,0 +1,67 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.config; + +/** + * Protocol family preference for outbound connections. + * + *

Used by connection initiation code to filter or order destination + * addresses and, when enabled, to interleave families during staggered attempts. + * + * @since 5.6 + */ +public enum ProtocolFamilyPreference { + /** Keep families as returned (or RFC 6724 ordered). */ + DEFAULT, + /** + * Prefer IPv4 addresses but allow IPv6 as a fallback. + */ + PREFER_IPV4, + + /** + * Prefer IPv6 addresses but allow IPv4 as a fallback. + */ + PREFER_IPV6, + + /** + * Use only IPv4 addresses. + */ + IPV4_ONLY, + + /** + * Use only IPv6 addresses. + */ + IPV6_ONLY, + + /** + * Interleave address families (v6, then v4, then v6, …) when multiple + * addresses are available. When staggered connects are enabled, the first + * address of the other family is delayed by a small offset. + */ + INTERLEAVE +} + diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java index d8ee3b90a2..342451013e 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java @@ -37,6 +37,7 @@ import org.apache.hc.client5.http.DnsResolver; import org.apache.hc.client5.http.SchemePortResolver; import org.apache.hc.client5.http.UnsupportedSchemeException; +import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.TlsConfig; import org.apache.hc.client5.http.impl.ConnPoolSupport; import org.apache.hc.client5.http.impl.DefaultSchemePortResolver; @@ -71,21 +72,14 @@ public class DefaultAsyncClientConnectionOperator implements AsyncClientConnecti private final MultihomeIOSessionRequester sessionRequester; private final Lookup tlsStrategyLookup; - /** - * Constructs a new {@code DefaultAsyncClientConnectionOperator}. - * - *

Note: this class is marked {@code @Internal}; rely on it - * only if you are prepared for incompatible changes in a future major - * release. Typical client code should use the high-level builders in - * {@code HttpAsyncClients} instead.

- */ - protected DefaultAsyncClientConnectionOperator( + DefaultAsyncClientConnectionOperator( final Lookup tlsStrategyLookup, final SchemePortResolver schemePortResolver, - final DnsResolver dnsResolver) { + final DnsResolver dnsResolver, + final ConnectionConfig defaultConnectionConfig) { this.tlsStrategyLookup = Args.notNull(tlsStrategyLookup, "TLS strategy lookup"); this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE; - this.sessionRequester = new MultihomeIOSessionRequester(dnsResolver); + this.sessionRequester = new MultihomeIOSessionRequester(dnsResolver, defaultConnectionConfig); } @Override @@ -279,4 +273,8 @@ protected void onBeforeTlsHandshake(final HttpContext httpContext, final HttpHos protected void onAfterTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) { } + public void shutdown() { + sessionRequester.shutdown(); + } + } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/MultihomeConnectionInitiator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/MultihomeConnectionInitiator.java index 2e8e00bbf0..8ba66074c0 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/MultihomeConnectionInitiator.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/MultihomeConnectionInitiator.java @@ -31,6 +31,8 @@ import java.util.concurrent.Future; import org.apache.hc.client5.http.DnsResolver; +import org.apache.hc.client5.http.SystemDefaultDnsResolver; +import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.FutureCallback; @@ -54,8 +56,19 @@ public final class MultihomeConnectionInitiator implements ConnectionInitiator { public MultihomeConnectionInitiator( final ConnectionInitiator connectionInitiator, final DnsResolver dnsResolver) { + this(connectionInitiator, dnsResolver, null); + } + + /** + * @since 5.6 + */ + public MultihomeConnectionInitiator( + final ConnectionInitiator connectionInitiator, + final DnsResolver dnsResolver, + final ConnectionConfig connectionConfig) { this.connectionInitiator = Args.notNull(connectionInitiator, "Connection initiator"); - this.sessionRequester = new MultihomeIOSessionRequester(dnsResolver); + final DnsResolver effectiveResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE; + this.sessionRequester = new MultihomeIOSessionRequester(effectiveResolver, connectionConfig); } @Override diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/MultihomeIOSessionRequester.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/MultihomeIOSessionRequester.java index 4b8172c4ce..252dbd013e 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/MultihomeIOSessionRequester.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/MultihomeIOSessionRequester.java @@ -28,33 +28,81 @@ package org.apache.hc.client5.http.impl.nio; import java.io.IOException; +import java.net.ConnectException; +import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.client5.http.ConnectExceptionSupport; import org.apache.hc.client5.http.DnsResolver; import org.apache.hc.client5.http.SystemDefaultDnsResolver; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.ProtocolFamilyPreference; import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.net.NamedEndpoint; import org.apache.hc.core5.reactor.ConnectionInitiator; import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Multi-home dialing strategy for {@link ConnectionInitiator}. + *

+ * If {@link ConnectionConfig#isStaggeredConnectEnabled()} is {@code false} (or config is null), + * attempts addresses sequentially (legacy behaviour). If enabled, performs staggered, + * interleaved connection attempts across protocol families (Happy Eyeballs–style). + * + * @since 5.6 + */ final class MultihomeIOSessionRequester { private static final Logger LOG = LoggerFactory.getLogger(MultihomeIOSessionRequester.class); + + private static final long DEFAULT_ATTEMPT_DELAY_MS = 250L; + private static final long DEFAULT_OTHER_FAMILY_DELAY_MS = 50L; + private final DnsResolver dnsResolver; + private final ConnectionConfig connectionConfig; + + // Stays alive for the lifetime of this requester (no premature shutdown) + private final ScheduledExecutorService scheduler; // created only when staggered is enabled MultihomeIOSessionRequester(final DnsResolver dnsResolver) { + this(dnsResolver, null); + } + + MultihomeIOSessionRequester(final DnsResolver dnsResolver, final ConnectionConfig connectionConfig) { this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE; + this.connectionConfig = connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT; + + if (connectionConfig != null && connectionConfig.isStaggeredConnectEnabled()) { + final ThreadFactory tf = r -> { + final Thread t = new Thread(r, "hc-hev2-mh-scheduler"); + t.setDaemon(true); + return t; + }; + this.scheduler = Executors.newSingleThreadScheduledExecutor(tf); + } else { + this.scheduler = null; + } } public Future connect( @@ -67,47 +115,54 @@ public Future connect( final FutureCallback callback) { final ComplexFuture future = new ComplexFuture<>(callback); + + // If a specific address is given, dial it directly (no multi-home logic). if (remoteAddress != null) { if (LOG.isDebugEnabled()) { - LOG.debug("{}:{} connecting {} to {} ({})", + LOG.debug("{}:{} connecting {}->{} ({})", remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, connectTimeout); } - final Future sessionFuture = connectionInitiator.connect(remoteEndpoint, remoteAddress, localAddress, connectTimeout, attachment, new FutureCallback() { - @Override - public void completed(final IOSession session) { - future.completed(session); - } - - @Override - public void failed(final Exception cause) { - if (LOG.isDebugEnabled()) { - LOG.debug("{}:{} connection to {} failed ({}); terminating operation", - remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass()); - } - if (cause instanceof IOException) { - future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, - (remoteAddress instanceof InetSocketAddress) ? - new InetAddress[] { ((InetSocketAddress) remoteAddress).getAddress() } : - new InetAddress[] {})); - } else { - future.failed(cause); - } - } + final Future sessionFuture = connectionInitiator.connect( + remoteEndpoint, remoteAddress, localAddress, connectTimeout, attachment, + new FutureCallback() { + @Override + public void completed(final IOSession session) { + if (LOG.isDebugEnabled()) { + LOG.debug("{}:{} connected {}->{} as {}", + remoteEndpoint.getHostName(), remoteEndpoint.getPort(), + localAddress, remoteAddress, session.getId()); + } + future.completed(session); + } - @Override - public void cancelled() { - future.cancel(); - } + @Override + public void failed(final Exception cause) { + if (LOG.isDebugEnabled()) { + LOG.debug("{}:{} connection to {} failed ({}); terminating", + remoteEndpoint.getHostName(), remoteEndpoint.getPort(), + remoteAddress, cause.getClass()); + } + if (cause instanceof IOException) { + final InetAddress[] addrs = new InetAddress[]{ + (remoteAddress instanceof InetSocketAddress) + ? ((InetSocketAddress) remoteAddress).getAddress() : null + }; + future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, addrs)); + } else { + future.failed(cause); + } + } - }); + @Override + public void cancelled() { + future.cancel(); + } + }); future.setDependency(sessionFuture); return future; } - if (LOG.isDebugEnabled()) { - LOG.debug("{} resolving remote address", remoteEndpoint.getHostName()); - } - + // Resolve all addresses final List remoteAddresses; try { remoteAddresses = dnsResolver.resolve(remoteEndpoint.getHostName(), remoteEndpoint.getPort()); @@ -115,6 +170,9 @@ public void cancelled() { throw new UnknownHostException(remoteEndpoint.getHostName()); } } catch (final UnknownHostException ex) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} DNS resolution failed: {}", remoteEndpoint.getHostName(), ex.getMessage()); + } future.failed(ex); return future; } @@ -123,32 +181,64 @@ public void cancelled() { LOG.debug("{} resolved to {}", remoteEndpoint.getHostName(), remoteAddresses); } - final Runnable runnable = new Runnable() { + final boolean staggerEnabled = connectionConfig != null && connectionConfig.isStaggeredConnectEnabled(); + + if (!staggerEnabled || remoteAddresses.size() == 1) { + // Legacy sequential behaviour + runSequential(connectionInitiator, remoteEndpoint, remoteAddresses, localAddress, + connectTimeout, attachment, future); + } else { + runStaggered(connectionInitiator, remoteEndpoint, remoteAddresses, localAddress, + connectTimeout, attachment, future); + } + return future; + } + + public Future connect( + final ConnectionInitiator connectionInitiator, + final NamedEndpoint remoteEndpoint, + final SocketAddress localAddress, + final Timeout connectTimeout, + final Object attachment, + final FutureCallback callback) { + return connect(connectionInitiator, remoteEndpoint, null, localAddress, connectTimeout, attachment, callback); + } + + // ----------------- legacy sequential ----------------- + + private void runSequential( + final ConnectionInitiator connectionInitiator, + final NamedEndpoint remoteEndpoint, + final List remoteAddresses, + final SocketAddress localAddress, + final Timeout connectTimeout, + final Object attachment, + final ComplexFuture future) { + + final Runnable r = new Runnable() { private final AtomicInteger attempt = new AtomicInteger(0); void executeNext() { final int index = attempt.getAndIncrement(); - final InetSocketAddress remoteAddress = remoteAddresses.get(index); + final InetSocketAddress nextRemote = remoteAddresses.get(index); if (LOG.isDebugEnabled()) { - LOG.debug("{}:{} connecting {}->{} ({})", - remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, connectTimeout); + LOG.debug("{}:{} connecting {}->{} ({}) [sequential attempt {}/{}]", + remoteEndpoint.getHostName(), remoteEndpoint.getPort(), + localAddress, nextRemote, connectTimeout, + index + 1, remoteAddresses.size()); } final Future sessionFuture = connectionInitiator.connect( - remoteEndpoint, - remoteAddress, - localAddress, - connectTimeout, - attachment, + remoteEndpoint, nextRemote, localAddress, connectTimeout, attachment, new FutureCallback() { - @Override public void completed(final IOSession session) { if (LOG.isDebugEnabled()) { LOG.debug("{}:{} connected {}->{} as {}", - remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, session.getId()); + remoteEndpoint.getHostName(), remoteEndpoint.getPort(), + localAddress, nextRemote, session.getId()); } future.completed(session); } @@ -156,23 +246,17 @@ public void completed(final IOSession session) { @Override public void failed(final Exception cause) { if (attempt.get() >= remoteAddresses.size()) { - if (LOG.isDebugEnabled()) { - LOG.debug("{}:{} connection to {} failed ({}); terminating operation", - remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass()); - } if (cause instanceof IOException) { - final InetAddress[] addresses = remoteAddresses.stream() - .filter(addr -> addr instanceof InetSocketAddress) - .map(addr -> ((InetSocketAddress) addr).getAddress()) - .toArray(InetAddress[]::new); - future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, addresses)); + final InetAddress[] addrs = toInetAddrs(remoteAddresses); + future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, addrs)); } else { future.failed(cause); } } else { if (LOG.isDebugEnabled()) { - LOG.debug("{}:{} connection to {} failed ({}); retrying connection to the next address", - remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass()); + LOG.debug("{}:{} connection to {} failed ({}); trying next address", + remoteEndpoint.getHostName(), remoteEndpoint.getPort(), + nextRemote, cause.getClass()); } executeNext(); } @@ -182,7 +266,6 @@ public void failed(final Exception cause) { public void cancelled() { future.cancel(); } - }); future.setDependency(sessionFuture); } @@ -191,20 +274,313 @@ public void cancelled() { public void run() { executeNext(); } - }; - runnable.run(); - return future; + + r.run(); } - public Future connect( + private void runStaggered( final ConnectionInitiator connectionInitiator, final NamedEndpoint remoteEndpoint, + final List remoteAddresses, final SocketAddress localAddress, final Timeout connectTimeout, final Object attachment, - final FutureCallback callback) { - return connect(connectionInitiator, remoteEndpoint, null, localAddress, connectTimeout, attachment, callback); + final ComplexFuture future) { + + // Defensive: scheduler must exist if we are here + if (scheduler == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} Happy Eyeballs requested but scheduler missing; falling back to sequential", remoteEndpoint.getHostName()); + } + runSequential(connectionInitiator, remoteEndpoint, remoteAddresses, localAddress, + connectTimeout, attachment, future); + return; + } + + // Split by family + final List v6 = new ArrayList<>(); + final List v4 = new ArrayList<>(); + for (int i = 0; i < remoteAddresses.size(); i++) { + final InetSocketAddress a = remoteAddresses.get(i); + if (a.getAddress() instanceof Inet6Address) { + v6.add(a); + } else { + v4.add(a); + } + } + + // Apply family preference (filter-only for *_ONLY) + final ProtocolFamilyPreference pref = connectionConfig.getProtocolFamilyPreference() != null + ? connectionConfig.getProtocolFamilyPreference() + : ProtocolFamilyPreference.INTERLEAVE; + + if (pref == ProtocolFamilyPreference.IPV6_ONLY && v6.isEmpty() + || pref == ProtocolFamilyPreference.IPV4_ONLY && v4.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} no addresses for {}", remoteEndpoint.getHostName(), pref); + } + future.failed(new UnknownHostException(remoteEndpoint.getHostName())); + return; + } + + List v6Try = v6; + List v4Try = v4; + + if (pref == ProtocolFamilyPreference.IPV6_ONLY) { + v4Try = new ArrayList<>(0); + } else if (pref == ProtocolFamilyPreference.IPV4_ONLY) { + v6Try = new ArrayList<>(0); + } + + // Determine starting family: + // - PREFER_IPV6 -> v6 first + // - PREFER_IPV4 -> v4 first + // - INTERLEAVE/DEFAULT -> start with family of first resolver address (keeps RFC6724 order) + final boolean startWithV6; + if (pref == ProtocolFamilyPreference.PREFER_IPV6) { + startWithV6 = true; + } else if (pref == ProtocolFamilyPreference.PREFER_IPV4) { + startWithV6 = false; + } else { + startWithV6 = !remoteAddresses.isEmpty() + && remoteAddresses.get(0).getAddress() instanceof Inet6Address; + } + + // Delays + final long attemptDelayMs = toMillisOrDefault(connectionConfig.getHappyEyeballsAttemptDelay(), DEFAULT_ATTEMPT_DELAY_MS); + final long otherFamilyDelayMs = Math.min( + toMillisOrDefault(connectionConfig.getHappyEyeballsOtherFamilyDelay(), DEFAULT_OTHER_FAMILY_DELAY_MS), + attemptDelayMs); + + if (LOG.isDebugEnabled()) { + LOG.debug("{} using Happy Eyeballs: attemptDelay={}ms, otherFamilyDelay={}ms, pref={}", + remoteEndpoint.getHostName(), attemptDelayMs, otherFamilyDelayMs, pref); + } + + // Shared state + final AtomicBoolean done = new AtomicBoolean(false); + final CopyOnWriteArrayList> ioFutures = new CopyOnWriteArrayList<>(); + final CopyOnWriteArrayList> scheduled = new CopyOnWriteArrayList<>(); + final AtomicReference lastFailure = new AtomicReference<>(null); + final AtomicInteger finishedCount = new AtomicInteger(0); + final AtomicInteger totalAttempts = new AtomicInteger(0); + + // Helper to cancel everything + final Runnable cancelAll = () -> { + int cancelledIO = 0, cancelledTimers = 0; + for (int i = 0; i < ioFutures.size(); i++) { + try { + if (ioFutures.get(i).cancel(true)) { + cancelledIO++; + } + } catch (final RuntimeException ignore) { + } + } + for (int i = 0; i < scheduled.size(); i++) { + try { + if (scheduled.get(i).cancel(true)) { + cancelledTimers++; + } + } catch (final RuntimeException ignore) { + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("{} cancelled pending attempts: {} I/O futures, {} timers", + remoteEndpoint.getHostName(), cancelledIO, cancelledTimers); + } + }; + + // Attempt runner + final Attempt attempt = new Attempt( + connectionInitiator, + remoteEndpoint, + localAddress, + connectTimeout, + attachment, + done, + ioFutures, + lastFailure, + finishedCount, + totalAttempts, + future, + cancelAll); + + // ---- HEv2 schedule: two independent clocks ---- + final List A = startWithV6 ? v6Try : v4Try; // starting family + final List B = startWithV6 ? v4Try : v6Try; // other family + + long tA = 0L; + long tB = A.isEmpty() ? 0L : otherFamilyDelayMs; // if A empty, start B at t=0 + + int iA = 0, iB = 0; + + // Kick off first attempts (if present) + if (iA < A.size()) { + scheduled.add(attempt.schedule(A.get(iA++), tA)); + tA += attemptDelayMs; + } + if (iB < B.size()) { + scheduled.add(attempt.schedule(B.get(iB++), tB)); + tB += attemptDelayMs; + } + + // Continue interleaving in time (A at 0,Δ,2Δ… ; B at δ,δ+Δ,δ+2Δ…) + while (iA < A.size() || iB < B.size()) { + if (iA < A.size()) { + scheduled.add(attempt.schedule(A.get(iA++), tA)); + tA += attemptDelayMs; + } + if (iB < B.size()) { + scheduled.add(attempt.schedule(B.get(iB++), tB)); + tB += attemptDelayMs; + } + } + } + + private static long toMillisOrDefault(final TimeValue tv, final long defMs) { + return tv != null ? tv.toMilliseconds() : defMs; } + private static InetAddress[] toInetAddrs(final List sockAddrs) { + final InetAddress[] out = new InetAddress[sockAddrs.size()]; + for (int i = 0; i < sockAddrs.size(); i++) { + out[i] = sockAddrs.get(i).getAddress(); + } + return out; + } + + // Schedules and runs a single connect attempt + private final class Attempt { + + private final ConnectionInitiator initiator; + private final NamedEndpoint host; + private final SocketAddress local; + private final Timeout timeout; + private final Object attachment; + + private final AtomicBoolean done; + private final CopyOnWriteArrayList> ioFutures; + private final AtomicReference lastFailure; + private final AtomicInteger finishedCount; + private final AtomicInteger totalAttempts; + private final ComplexFuture promise; + private final Runnable cancelAll; + + Attempt(final ConnectionInitiator initiator, + final NamedEndpoint host, + final SocketAddress local, + final Timeout timeout, + final Object attachment, + final AtomicBoolean done, + final CopyOnWriteArrayList> ioFutures, + final AtomicReference lastFailure, + final AtomicInteger finishedCount, + final AtomicInteger totalAttempts, + final ComplexFuture promise, + final Runnable cancelAll) { + this.initiator = initiator; + this.host = host; + this.local = local; + this.timeout = timeout; + this.attachment = attachment; + this.done = done; + this.ioFutures = ioFutures; + this.lastFailure = lastFailure; + this.finishedCount = finishedCount; + this.totalAttempts = totalAttempts; + this.promise = promise; + this.cancelAll = cancelAll; + } + + ScheduledFuture schedule(final InetSocketAddress dest, final long delayMs) { + totalAttempts.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} scheduling connect to {} in {} ms", host.getHostName(), dest, delayMs); + } + return scheduler.schedule(() -> { + if (done.get()) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} skipping connect to {} (already satisfied)", host.getHostName(), dest); + } + return; + } + final Future ioFuture = initiator.connect( + host, dest, local, timeout, attachment, + new FutureCallback() { + @Override + public void completed(final IOSession session) { + if (done.compareAndSet(false, true)) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} winner: connected to {} ({} total attempts scheduled)", + host.getHostName(), dest, totalAttempts.get()); + } + promise.completed(session); + if (LOG.isDebugEnabled()) { + LOG.debug("{} cancelling losing attempts", host.getHostName()); + } + cancelAll.run(); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{} late success to {} discarded (already have winner)", host.getHostName(), dest); + } + try { + session.close(); + } catch (final RuntimeException ignore) { + } + } + } + + @Override + public void failed(final Exception ex) { + lastFailure.set(ex); + final int finished = finishedCount.incrementAndGet(); + final int total = totalAttempts.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} failed to connect to {} ({}/{}) : {}", + host.getHostName(), dest, finished, total, + ex.getClass().getSimpleName()); + } + if (!done.get() && finished == total && done.compareAndSet(false, true)) { + final Exception last = lastFailure.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} all {} attempts exhausted; failing with {}", + host.getHostName(), total, last != null ? last.getClass().getSimpleName() : "unknown"); + } + if (last instanceof IOException) { + promise.failed(ConnectExceptionSupport.enhance((IOException) last, host, (InetAddress) null)); + } else { + promise.failed(last != null ? last + : new ConnectException("All connection attempts failed")); + } + cancelAll.run(); + } + } + + @Override + public void cancelled() { + lastFailure.compareAndSet(null, new CancellationException("Cancelled")); + final int finished = finishedCount.incrementAndGet(); + final int total = totalAttempts.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} connect attempt to {} was CANCELLED ({}/{})", + host.getHostName(), dest, finished, total); + } + if (!done.get() && finished == total && done.compareAndSet(false, true)) { + final Exception last = lastFailure.get(); + promise.failed(last != null ? last + : new ConnectException("All connection attempts failed")); + cancelAll.run(); + } + } + }); + ioFutures.add(ioFuture); + }, delayMs, TimeUnit.MILLISECONDS); + } + } + void shutdown() { + if (scheduler != null) { + scheduler.shutdownNow(); + } + } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java index abae03ffd1..7b79edde56 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java @@ -164,7 +164,18 @@ public PoolingAsyncClientConnectionManager( final TimeValue timeToLive, final SchemePortResolver schemePortResolver, final DnsResolver dnsResolver) { - this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver), + this(tlsStrategyLookup,poolConcurrencyPolicy,poolReusePolicy,timeToLive,schemePortResolver,dnsResolver,ConnectionConfig.DEFAULT); + } + + public PoolingAsyncClientConnectionManager( + final Lookup tlsStrategyLookup, + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final PoolReusePolicy poolReusePolicy, + final TimeValue timeToLive, + final SchemePortResolver schemePortResolver, + final DnsResolver dnsResolver, + final ConnectionConfig connectionConfig) { + this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver, connectionConfig), poolConcurrencyPolicy, poolReusePolicy, timeToLive, false); } @@ -237,6 +248,17 @@ public void close(final CloseMode closeMode) { } this.pool.close(closeMode); LOG.debug("Connection pool shut down"); + + if (connectionOperator instanceof DefaultAsyncClientConnectionOperator) { + try { + ((DefaultAsyncClientConnectionOperator) connectionOperator).shutdown(); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection operator shut down"); + } + } catch (final RuntimeException ex) { + LOG.warn("Connection operator shutdown failed", ex); + } + } } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java index f6c93457d9..62d2e0db0a 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java @@ -90,6 +90,10 @@ public class PoolingAsyncClientConnectionManagerBuilder { private Resolver tlsConfigResolver; private boolean messageMultiplexing; + private AsyncClientConnectionOperator connectionOperator; + + private ConnectionConfig defaultConnectionConfig; + public static PoolingAsyncClientConnectionManagerBuilder create() { return new PoolingAsyncClientConnectionManagerBuilder(); } @@ -178,6 +182,7 @@ public final PoolingAsyncClientConnectionManagerBuilder setMaxConnPerRoute(final */ public final PoolingAsyncClientConnectionManagerBuilder setDefaultConnectionConfig(final ConnectionConfig config) { this.connectionConfigResolver = route -> config; + this.defaultConnectionConfig = config; return this; } @@ -274,17 +279,33 @@ public final PoolingAsyncClientConnectionManagerBuilder setMessageMultiplexing(f return this; } + /** + * Sets custom {@link AsyncClientConnectionOperator} instance. + * + * @param connectionOperator the custom connection operator. + * @return this instance. + * @since 5.6 + */ + @Experimental + public final PoolingAsyncClientConnectionManagerBuilder setConnectionOperator( + final AsyncClientConnectionOperator connectionOperator) { + this.connectionOperator = connectionOperator; + return this; + } + @Internal protected AsyncClientConnectionOperator createConnectionOperator( final TlsStrategy tlsStrategy, final SchemePortResolver schemePortResolver, - final DnsResolver dnsResolver) { + final DnsResolver dnsResolver, + final ConnectionConfig defaultConnectionConfig) { return new DefaultAsyncClientConnectionOperator( RegistryBuilder.create() .register(URIScheme.HTTPS.getId(), tlsStrategy) .build(), schemePortResolver, - dnsResolver); + dnsResolver, + defaultConnectionConfig); } public PoolingAsyncClientConnectionManager build() { @@ -307,7 +328,7 @@ public PoolingAsyncClientConnectionManager build() { } } final PoolingAsyncClientConnectionManager poolingmgr = new PoolingAsyncClientConnectionManager( - createConnectionOperator(tlsStrategyCopy, schemePortResolver, dnsResolver), + createConnectionOperator(tlsStrategyCopy, schemePortResolver, dnsResolver, defaultConnectionConfig), poolConcurrencyPolicy, poolReusePolicy, null, @@ -323,4 +344,4 @@ public PoolingAsyncClientConnectionManager build() { return poolingmgr; } -} +} \ No newline at end of file diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/Rfc6724AddressSelectingDnsResolverTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/Rfc6724AddressSelectingDnsResolverTest.java new file mode 100644 index 0000000000..c37d6d4245 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/Rfc6724AddressSelectingDnsResolverTest.java @@ -0,0 +1,181 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.mockito.Mockito.when; + +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hc.client5.http.config.ProtocolFamilyPreference; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class Rfc6724AddressSelectingDnsResolverTest { + + private DnsResolver delegate; + + @BeforeEach + void setUp() { + delegate = Mockito.mock(DnsResolver.class); + } + + @Test + void ipv4Only_filtersOutIPv6() throws Exception { + final InetAddress v4 = InetAddress.getByName("203.0.113.10"); // TEST-NET-3 + final InetAddress v6 = InetAddress.getByName("2001:db8::10"); // documentation prefix + + when(delegate.resolve("dual.example")).thenReturn(new InetAddress[]{v6, v4}); + + final Rfc6724AddressSelectingDnsResolver r = + new Rfc6724AddressSelectingDnsResolver(delegate, ProtocolFamilyPreference.IPV4_ONLY); + + final InetAddress[] ordered = r.resolve("dual.example"); + Assertions.assertNotNull(ordered); + assertEquals(1, ordered.length); + assertInstanceOf(Inet4Address.class, ordered[0]); + assertEquals(v4, ordered[0]); + } + + @Test + void ipv6Only_filtersOutIPv4() throws Exception { + final InetAddress v4 = InetAddress.getByName("192.0.2.1"); // TEST-NET-1 + final InetAddress v6 = InetAddress.getByName("2001:db8::1"); + + when(delegate.resolve("dual.example")).thenReturn(new InetAddress[]{v4, v6}); + + final Rfc6724AddressSelectingDnsResolver r = + new Rfc6724AddressSelectingDnsResolver(delegate, ProtocolFamilyPreference.IPV6_ONLY); + + final InetAddress[] ordered = r.resolve("dual.example"); + Assertions.assertNotNull(ordered); + assertEquals(1, ordered.length); + assertInstanceOf(Inet6Address.class, ordered[0]); + assertEquals(v6, ordered[0]); + } + + @Test + void ipv4Only_emptyWhenNoIPv4Candidates() throws Exception { + final InetAddress v6a = InetAddress.getByName("2001:db8::1"); + final InetAddress v6b = InetAddress.getByName("2001:db8::2"); + + when(delegate.resolve("v6only.example")).thenReturn(new InetAddress[]{v6a, v6b}); + + final Rfc6724AddressSelectingDnsResolver r = + new Rfc6724AddressSelectingDnsResolver(delegate, ProtocolFamilyPreference.IPV4_ONLY); + + final InetAddress[] ordered = r.resolve("v6only.example"); + Assertions.assertNotNull(ordered); + assertEquals(0, ordered.length); + } + + @Test + void preferIpv6_groupsAllV6First_preservingRelativeOrder() throws Exception { + final InetAddress v4a = InetAddress.getByName("192.0.2.1"); + final InetAddress v6a = InetAddress.getByName("2001:db8::1"); + final InetAddress v4b = InetAddress.getByName("203.0.113.10"); + final InetAddress v6b = InetAddress.getByName("2001:db8::2"); + + when(delegate.resolve("dual.example")).thenReturn(new InetAddress[]{v4a, v6a, v4b, v6b}); + + final Rfc6724AddressSelectingDnsResolver r = + new Rfc6724AddressSelectingDnsResolver(delegate, ProtocolFamilyPreference.PREFER_IPV6); + + final InetAddress[] out = r.resolve("dual.example"); + + // all v6 first, in original relative order + final List v6Seen = new ArrayList<>(); + final List v4Seen = new ArrayList<>(); + Assertions.assertNotNull(out); + for (final InetAddress a : out) { + if (a instanceof Inet6Address) { + v6Seen.add(a); + } else { + v4Seen.add(a); + } + } + assertEquals(Arrays.asList(v6a, v6b), v6Seen); + assertEquals(Arrays.asList(v4a, v4b), v4Seen); + + // ensure first element is IPv6 (grouping actually happened) + assertInstanceOf(Inet6Address.class, out[0]); + } + + @Test + void interleave_alternatesFamilies_and_preservesRelativeOrder() throws Exception { + final InetAddress v6a = InetAddress.getByName("2001:db8::1"); + final InetAddress v6b = InetAddress.getByName("2001:db8::2"); + final InetAddress v4a = InetAddress.getByName("192.0.2.1"); + final InetAddress v4b = InetAddress.getByName("203.0.113.10"); + + when(delegate.resolve("dual.example")).thenReturn(new InetAddress[]{v6a, v6b, v4a, v4b}); + + final Rfc6724AddressSelectingDnsResolver r = + new Rfc6724AddressSelectingDnsResolver(delegate, ProtocolFamilyPreference.INTERLEAVE); + + final InetAddress[] out = r.resolve("dual.example"); + + // Preserve per-family relative order + final List v6Seen = new ArrayList<>(); + final List v4Seen = new ArrayList<>(); + Assertions.assertNotNull(out); + for (final InetAddress a : out) { + if (a instanceof Inet6Address) { + v6Seen.add(a); + } else { + v4Seen.add(a); + } + } + assertEquals(Arrays.asList(v6a, v6b), v6Seen); + assertEquals(Arrays.asList(v4a, v4b), v4Seen); + + // Alternation (as far as both families have remaining items) + final int pairs = Math.min(v6Seen.size(), v4Seen.size()); + for (int i = 1; i < pairs * 2; i++) { + assertNotEquals(out[i - 1] instanceof Inet6Address, out[i] instanceof Inet6Address, + "adjacent entries should alternate family under INTERLEAVE"); + } + } + + @Test + void canonicalHostname_delegates() throws Exception { + when(delegate.resolveCanonicalHostname("example.org")).thenReturn("canon.example.org"); + final Rfc6724AddressSelectingDnsResolver r = + new Rfc6724AddressSelectingDnsResolver(delegate, ProtocolFamilyPreference.INTERLEAVE); + assertEquals("canon.example.org", r.resolveCanonicalHostname("example.org")); + Mockito.verify(delegate).resolveCanonicalHostname("example.org"); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientHappyEyeballs.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientHappyEyeballs.java new file mode 100644 index 0000000000..1cbb9f68db --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientHappyEyeballs.java @@ -0,0 +1,309 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.Rfc6724AddressSelectingDnsResolver; +import org.apache.hc.client5.http.SystemDefaultDnsResolver; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.ProtocolFamilyPreference; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; + +/** + *

Example: RFC 6724 DNS ordering + Happy Eyeballs (with console output)

+ * + *

This example shows how to:

+ *
    + *
  • Wrap the system DNS resolver with {@link org.apache.hc.client5.http.Rfc6724AddressSelectingDnsResolver} + * to apply RFC 6724 destination address selection (IPv6/IPv4 ordering).
  • + *
  • Use {@link org.apache.hc.client5.http.config.ConnectionConfig} to enable Happy Eyeballs v2 pacing + * and set a protocol family preference (e.g., {@code IPV4_ONLY}, {@code IPV6_ONLY}, {@code PREFER_IPV6}, + * {@code PREFER_IPV4}, {@code INTERLEAVE}).
  • + *
  • Control the connect timeout so demos don’t stall on slow/broken networks.
  • + *
+ * + *

How to run with the example runner

+ *
+ * # Default (no args): hits http://ipv6-test.com/ and https://ipv6-test.com/
+ * ./run-example.sh AsyncClientHappyEyeballs
+ *
+ * # Pass one URI (runner supports command-line args)
+ * ./run-example.sh AsyncClientHappyEyeballs http://neverssl.com/
+ *
+ * # Pass multiple URIs
+ * ./run-example.sh AsyncClientHappyEyeballs http://neverssl.com/ https://example.org/
+ *
+ * # Optional system properties (the runner forwards -D...):
+ * #   -Dhc.he.pref=INTERLEAVE|PREFER_IPV4|PREFER_IPV6|IPV4_ONLY|IPV6_ONLY  (default: INTERLEAVE)
+ * #   -Dhc.he.delay.ms=250        (Happy Eyeballs attempt spacing; default 250)
+ * #   -Dhc.he.other.ms=50         (first other-family offset; default 50; clamped ≤ attempt delay)
+ * #   -Dhc.connect.ms=10000       (TCP connect timeout; default 10000)
+ *
+ * ./run-example.sh AsyncClientHappyEyeballs http://neverssl.com/ \
+ *   -Dhc.he.pref=INTERLEAVE -Dhc.he.delay.ms=250 -Dhc.he.other.ms=50 -Dhc.connect.ms=8000
+ * 
+ * + *

What to expect

+ *
    + *
  • For dual-stack hosts, the client schedules interleaved IPv6/IPv4 connects per the preference and delays.
  • + *
  • On networks without working IPv6, the IPv6 attempt will likely fail quickly while IPv4 succeeds.
  • + *
  • If you force {@code IPV6_ONLY} on a network without IPv6 routing, you’ll get + * {@code java.net.SocketException: Network is unreachable} — that’s expected.
  • + *
+ * + *

Tip

+ *

For the clearest behavior, align the resolver bias and the connection preference: + * construct the resolver with the same {@link ProtocolFamilyPreference} that you set in + * {@link ConnectionConfig}.

+ */ +public final class AsyncClientHappyEyeballs { + + private AsyncClientHappyEyeballs() { + } + + public static void main(final String[] args) throws Exception { + // --- Read settings from system properties (with sensible defaults) --- + final ProtocolFamilyPreference pref = parsePref(System.getProperty("hc.he.pref"), ProtocolFamilyPreference.INTERLEAVE); + final long attemptDelayMs = parseLong(System.getProperty("hc.he.delay.ms"), 250L); + final long otherFamilyDelayMs = Math.min(parseLong(System.getProperty("hc.he.other.ms"), 50L), attemptDelayMs); + final long connectMs = parseLong(System.getProperty("hc.connect.ms"), 10000L); // 10s default + + // --- Resolve targets from CLI args (or fall back to ipv6-test.com pair) --- + final List targets = new ArrayList(); + if (args != null && args.length > 0) { + for (int i = 0; i < args.length; i++) { + final URI u = safeParse(args[i]); + if (u != null) { + targets.add(u); + } else { + System.out.println("Skipping invalid URI: " + args[i]); + } + } + } else { + try { + targets.add(new URI("http://ipv6-test.com/")); + targets.add(new URI("https://ipv6-test.com/")); + } catch (final URISyntaxException ignore) { + } + } + + // --- Print banner so the runner shows the configuration up front --- + System.out.println("Happy Eyeballs: pref=" + pref + + ", attemptDelay=" + attemptDelayMs + "ms" + + ", otherFamilyDelay=" + otherFamilyDelayMs + "ms" + + ", connectTimeout=" + connectMs + "ms"); + + // --- DNS resolver with RFC 6724 selection (biased using the same pref for clarity) --- + final Rfc6724AddressSelectingDnsResolver dnsResolver = + new Rfc6724AddressSelectingDnsResolver(SystemDefaultDnsResolver.INSTANCE, pref); + + // --- Connection config enabling HEv2 pacing and family preference --- + final ConnectionConfig connectionConfig = ConnectionConfig.custom() + .setStaggeredConnectEnabled(true) + .setHappyEyeballsAttemptDelay(TimeValue.ofMilliseconds(attemptDelayMs)) + .setHappyEyeballsOtherFamilyDelay(TimeValue.ofMilliseconds(otherFamilyDelayMs)) + .setProtocolFamilyPreference(pref).setConnectTimeout(Timeout.ofMilliseconds(connectMs)) + + .build(); + + final RequestConfig requestConfig = RequestConfig.custom() + .build(); + + // --- TLS strategy (uses system properties for trust/key stores, ALPN, etc.) --- + final TlsStrategy tls = ClientTlsStrategyBuilder.create() + .useSystemProperties() + .buildAsync(); + + // --- Connection manager wires in DNS + ConnectionConfig + TLS --- + final PoolingAsyncClientConnectionManager cm = + PoolingAsyncClientConnectionManagerBuilder.create() + .setDnsResolver(dnsResolver) + .setDefaultConnectionConfig(connectionConfig) + .setTlsStrategy(tls) + .build(); + + final CloseableHttpAsyncClient client = HttpAsyncClients.custom() + .setConnectionManager(cm) + .setDefaultRequestConfig(requestConfig) + .build(); + + client.start(); + + // --- Execute each target once --- + for (int i = 0; i < targets.size(); i++) { + final URI uri = targets.get(i); + final HttpHost host = new HttpHost( + uri.getScheme(), + uri.getHost(), + computePort(uri) + ); + final String path = buildPathAndQuery(uri); + + final SimpleHttpRequest request = SimpleRequestBuilder.get() + .setHttpHost(host) + .setPath(path) + .build(); + + System.out.println("Executing request " + request); + final Future future = client.execute( + SimpleRequestProducer.create(request), + SimpleResponseConsumer.create(), + new FutureCallback() { + @Override + public void completed(final SimpleHttpResponse response) { + System.out.println(request + " -> " + new StatusLine(response)); + System.out.println(response.getBody()); + } + + @Override + public void failed(final Exception ex) { + System.out.println(request + " -> " + ex); + } + + @Override + public void cancelled() { + System.out.println(request + " cancelled"); + } + }); + + try { + future.get(); + } catch (final java.util.concurrent.ExecutionException ex) { + // Show the root cause without a giant stack trace in the example + System.out.println(request + " -> " + ex.getCause()); + } + } + + System.out.println("Shutting down"); + client.close(CloseMode.GRACEFUL); + cm.close(CloseMode.GRACEFUL); + } + + // ------------ helpers (Java 8 friendly) ------------ + + private static int computePort(final URI uri) { + final int p = uri.getPort(); + if (p >= 0) { + return p; + } + final String scheme = uri.getScheme(); + if ("http".equalsIgnoreCase(scheme)) { + return 80; + } + if ("https".equalsIgnoreCase(scheme)) { + return 443; + } + return -1; + } + + private static String buildPathAndQuery(final URI uri) { + String path = uri.getRawPath(); + if (path == null || path.isEmpty()) { + path = "/"; + } + final String query = uri.getRawQuery(); + if (query != null && !query.isEmpty()) { + return path + "?" + query; + } + return path; + } + + private static long parseLong(final String s, final long defVal) { + if (s == null) { + return defVal; + } + try { + return Long.parseLong(s.trim()); + } catch (final NumberFormatException ignore) { + return defVal; + } + } + + private static ProtocolFamilyPreference parsePref(final String s, final ProtocolFamilyPreference defVal) { + if (s == null) { + return defVal; + } + final String u = s.trim().toUpperCase(java.util.Locale.ROOT); + if ("IPV6_ONLY".equals(u)) { + return ProtocolFamilyPreference.IPV6_ONLY; + } + if ("IPV4_ONLY".equals(u)) { + return ProtocolFamilyPreference.IPV4_ONLY; + } + if ("PREFER_IPV6".equals(u)) { + return ProtocolFamilyPreference.PREFER_IPV6; + } + if ("PREFER_IPV4".equals(u)) { + return ProtocolFamilyPreference.PREFER_IPV4; + } + if ("INTERLEAVE".equals(u)) { + return ProtocolFamilyPreference.INTERLEAVE; + } + return defVal; + } + + private static URI safeParse(final String s) { + try { + final URI u = new URI(s); + final String scheme = u.getScheme(); + if (!URIScheme.HTTP.same(scheme) && !URIScheme.HTTPS.same(scheme)) { + System.out.println("Unsupported scheme (only http/https): " + s); + return null; + } + if (u.getHost() == null) { + System.out.println("Missing host in URI: " + s); + return null; + } + return u; + } catch (final URISyntaxException ex) { + return null; + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/MultihomeIOSessionRequesterTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/MultihomeIOSessionRequesterTest.java index e2ec5696b8..bd2b8e2def 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/MultihomeIOSessionRequesterTest.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/MultihomeIOSessionRequesterTest.java @@ -24,15 +24,18 @@ * . * */ - package org.apache.hc.client5.http.impl.nio; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.net.InetAddress; @@ -41,14 +44,21 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.hc.client5.http.DnsResolver; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.ProtocolFamilyPreference; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.net.NamedEndpoint; import org.apache.hc.core5.reactor.ConnectionInitiator; import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -57,19 +67,38 @@ class MultihomeIOSessionRequesterTest { private DnsResolver dnsResolver; private ConnectionInitiator connectionInitiator; - private MultihomeIOSessionRequester sessionRequester; private NamedEndpoint namedEndpoint; + // Shared scheduler to make mock timings deterministic across platforms/CI + private ScheduledExecutorService testScheduler; + @BeforeEach void setUp() { dnsResolver = Mockito.mock(DnsResolver.class); connectionInitiator = Mockito.mock(ConnectionInitiator.class); namedEndpoint = Mockito.mock(NamedEndpoint.class); - sessionRequester = new MultihomeIOSessionRequester(dnsResolver); + + testScheduler = Executors.newScheduledThreadPool(2, r -> { + final Thread t = new Thread(r, "mh-test-scheduler"); + t.setDaemon(true); + return t; + }); + } + + @AfterEach + void shutdownScheduler() { + if (testScheduler != null) { + testScheduler.shutdownNow(); + } } @Test - void testConnectWithMultipleAddresses() throws Exception { + void testConnectWithMultipleAddresses_allFail_surfaceLastFailure() throws Exception { + final MultihomeIOSessionRequester sessionRequester = + new MultihomeIOSessionRequester(dnsResolver, ConnectionConfig.custom() + .setStaggeredConnectEnabled(false) + .build()); + final InetAddress address1 = InetAddress.getByAddress(new byte[]{10, 0, 0, 1}); final InetAddress address2 = InetAddress.getByAddress(new byte[]{10, 0, 0, 2}); final List remoteAddresses = Arrays.asList( @@ -77,41 +106,37 @@ void testConnectWithMultipleAddresses() throws Exception { new InetSocketAddress(address2, 8080) ); - Mockito.when(namedEndpoint.getHostName()).thenReturn("somehost"); - Mockito.when(namedEndpoint.getPort()).thenReturn(8080); - Mockito.when(dnsResolver.resolve("somehost", 8080)).thenReturn(remoteAddresses); + when(namedEndpoint.getHostName()).thenReturn("somehost"); + when(namedEndpoint.getPort()).thenReturn(8080); + when(dnsResolver.resolve("somehost", 8080)).thenReturn(remoteAddresses); - Mockito.when(connectionInitiator.connect(any(), any(), any(), any(), any(), any())) + when(connectionInitiator.connect(any(), any(), any(), any(), any(), any())) .thenAnswer(invocation -> { - final FutureCallback callback = invocation.getArgument(5); - // Simulate a failure for the first connection attempt - final CompletableFuture future = new CompletableFuture<>(); - callback.failed(new IOException("Simulated connection failure")); - future.completeExceptionally(new IOException("Simulated connection failure")); - return future; + final FutureCallback cb = invocation.getArgument(5); + final CompletableFuture f = new CompletableFuture<>(); + final IOException io = new IOException("Simulated connection failure"); + cb.failed(io); + f.completeExceptionally(io); + return f; }); final Future future = sessionRequester.connect( - connectionInitiator, - namedEndpoint, - null, - Timeout.ofMilliseconds(500), - null, - null + connectionInitiator, namedEndpoint, null, Timeout.ofMilliseconds(500), null, null ); assertTrue(future.isDone()); - try { - future.get(); - fail("Expected ExecutionException"); - } catch (final ExecutionException ex) { - assertInstanceOf(IOException.class, ex.getCause()); - assertEquals("Simulated connection failure", ex.getCause().getMessage()); - } + final ExecutionException ex = assertThrows(ExecutionException.class, future::get); + assertInstanceOf(IOException.class, ex.getCause()); + assertEquals("Simulated connection failure", ex.getCause().getMessage()); } @Test void testConnectSuccessfulAfterRetries() throws Exception { + final MultihomeIOSessionRequester sessionRequester = + new MultihomeIOSessionRequester(dnsResolver, ConnectionConfig.custom() + .setStaggeredConnectEnabled(false) + .build()); + final InetAddress address1 = InetAddress.getByAddress(new byte[]{10, 0, 0, 1}); final InetAddress address2 = InetAddress.getByAddress(new byte[]{10, 0, 0, 2}); final List remoteAddresses = Arrays.asList( @@ -119,43 +144,136 @@ void testConnectSuccessfulAfterRetries() throws Exception { new InetSocketAddress(address2, 8080) ); - Mockito.when(namedEndpoint.getHostName()).thenReturn("somehost"); - Mockito.when(namedEndpoint.getPort()).thenReturn(8080); - Mockito.when(dnsResolver.resolve("somehost", 8080)).thenReturn(remoteAddresses); + when(namedEndpoint.getHostName()).thenReturn("somehost"); + when(namedEndpoint.getPort()).thenReturn(8080); + when(dnsResolver.resolve("somehost", 8080)).thenReturn(remoteAddresses); - Mockito.when(connectionInitiator.connect(any(), any(), any(), any(), any(), any())) + when(connectionInitiator.connect(any(), any(), any(), any(), any(), any())) .thenAnswer(invocation -> { - final FutureCallback callback = invocation.getArgument(5); + final FutureCallback cb = invocation.getArgument(5); final InetSocketAddress remoteAddress = invocation.getArgument(1); - final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture f = new CompletableFuture<>(); if (remoteAddress.getAddress().equals(address1)) { - // Fail the first address - callback.failed(new IOException("Simulated connection failure")); - future.completeExceptionally(new IOException("Simulated connection failure")); + final IOException io = new IOException("Simulated connection failure"); + cb.failed(io); + f.completeExceptionally(io); } else { - // Succeed for the second address - final IOSession mockSession = Mockito.mock(IOSession.class); - callback.completed(mockSession); - future.complete(mockSession); + final IOSession s = Mockito.mock(IOSession.class); + cb.completed(s); + f.complete(s); } - return future; + return f; }); final Future future = sessionRequester.connect( - connectionInitiator, - namedEndpoint, - null, - Timeout.ofMilliseconds(500), - null, - null + connectionInitiator, namedEndpoint, null, Timeout.ofMilliseconds(500), null, null ); assertTrue(future.isDone()); - try { - final IOSession session = future.get(); - assertNotNull(session); - } catch (final ExecutionException ex) { - fail("Did not expect an ExecutionException", ex); - } + assertNotNull(future.get()); + } + + @Test + void testHappyEyeballs_fastV4BeatsSlowerV6() throws Exception { + final MultihomeIOSessionRequester sessionRequester = + new MultihomeIOSessionRequester(dnsResolver, ConnectionConfig.custom() + .setStaggeredConnectEnabled(true) + .setHappyEyeballsAttemptDelay(TimeValue.ofMilliseconds(250)) + .setHappyEyeballsOtherFamilyDelay(TimeValue.ofMilliseconds(50)) + .setProtocolFamilyPreference(ProtocolFamilyPreference.INTERLEAVE) + .build()); + + final InetAddress v6 = InetAddress.getByName("2001:db8::10"); + final InetAddress v4 = InetAddress.getByName("203.0.113.10"); + final InetSocketAddress aV6 = new InetSocketAddress(v6, 8080); + final InetSocketAddress aV4 = new InetSocketAddress(v4, 8080); + + when(namedEndpoint.getHostName()).thenReturn("dual"); + when(namedEndpoint.getPort()).thenReturn(8080); + // v6 first from DNS so requester will start with v6 and stagger v4 shortly after + when(dnsResolver.resolve("dual", 8080)).thenReturn(Arrays.asList(aV6, aV4)); + + final IOSession v6Session = Mockito.mock(IOSession.class, "v6Session"); + final IOSession v4Session = Mockito.mock(IOSession.class, "v4Session"); + + when(connectionInitiator.connect(any(), any(), any(), any(), any(), any())) + .thenAnswer(invocation -> { + final InetSocketAddress remote = invocation.getArgument(1); + final FutureCallback cb = invocation.getArgument(5); + final CompletableFuture f = new CompletableFuture<>(); + + // Large margin so v4 always wins even with CI jitter. + if (remote.equals(aV6)) { + testScheduler.schedule(() -> { + cb.completed(v6Session); + f.complete(v6Session); + }, 1200, TimeUnit.MILLISECONDS); + } else { + testScheduler.schedule(() -> { + cb.completed(v4Session); + f.complete(v4Session); + }, 60, TimeUnit.MILLISECONDS); + } + return f; + }); + + final Future future = sessionRequester.connect( + connectionInitiator, namedEndpoint, null, Timeout.ofSeconds(3), null, null); + + final IOSession winner = future.get(3, TimeUnit.SECONDS); + assertSame(v4Session, winner, "IPv4 should win with faster completion"); + verify(connectionInitiator, atLeast(2)).connect(any(), any(), any(), any(), any(), any()); + } + + @Test + void testHappyEyeballs_v6Fails_v4Succeeds() throws Exception { + final MultihomeIOSessionRequester sessionRequester = + new MultihomeIOSessionRequester(dnsResolver, ConnectionConfig.custom() + .setStaggeredConnectEnabled(true) + .setHappyEyeballsAttemptDelay(TimeValue.ofMilliseconds(200)) + .setHappyEyeballsOtherFamilyDelay(TimeValue.ofMilliseconds(50)) + .setProtocolFamilyPreference(ProtocolFamilyPreference.INTERLEAVE) + .build()); + + final InetAddress v6 = InetAddress.getByName("2001:db8::10"); + final InetAddress v4 = InetAddress.getByName("203.0.113.10"); + final InetSocketAddress aV6 = new InetSocketAddress(v6, 8443); + final InetSocketAddress aV4 = new InetSocketAddress(v4, 8443); + + when(namedEndpoint.getHostName()).thenReturn("dual"); + when(namedEndpoint.getPort()).thenReturn(8443); + when(dnsResolver.resolve("dual", 8443)).thenReturn(Arrays.asList(aV6, aV4)); + + final IOSession v4Session = Mockito.mock(IOSession.class, "v4Session"); + + when(connectionInitiator.connect(any(), any(), any(), any(), any(), any())) + .thenAnswer(invocation -> { + final InetSocketAddress remote = invocation.getArgument(1); + final FutureCallback cb = invocation.getArgument(5); + final CompletableFuture f = new CompletableFuture<>(); + + if (remote.equals(aV6)) { + // Fail v6 quickly + testScheduler.schedule(() -> { + final IOException io = new IOException("v6 down"); + cb.failed(io); + f.completeExceptionally(io); + }, 30, TimeUnit.MILLISECONDS); + } else { + // Succeed v4 after a short delay + testScheduler.schedule(() -> { + cb.completed(v4Session); + f.complete(v4Session); + }, 60, TimeUnit.MILLISECONDS); + } + return f; + }); + + final Future future = sessionRequester.connect( + connectionInitiator, namedEndpoint, null, Timeout.ofSeconds(2), null, null + ); + + final IOSession session = future.get(2, TimeUnit.SECONDS); + assertSame(v4Session, session); } }