diff options
Diffstat (limited to 'container-search/src/main/java/com')
18 files changed, 294 insertions, 116 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java index fd8110e1173..d1377b8d373 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java @@ -82,7 +82,7 @@ public abstract class BaseNodeMonitor<T> { /** Thread-safely changes the state of this node if required */ protected abstract void setWorking(boolean working,String explanation); - /** Returns whether or not this is monitoring an internal node. Default is false. */ + /** Returns whether this is monitoring an internal node. Default is false. */ public boolean isInternal() { return internal; } } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index 0b627e91bc5..332bf4ea2c4 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -66,7 +66,7 @@ public class ClusterMonitor<T> { * </ul> * * @param node the object representing the node - * @param internal whether or not this node is internal to this cluster + * @param internal whether this node is internal to this cluster */ public void add(T node, boolean internal) { nodeMonitors.put(node, new TrafficNodeMonitor<>(node, configuration, internal)); @@ -96,11 +96,10 @@ public class ClusterMonitor<T> { * Ping all nodes which needs pinging to discover state changes */ public void ping(Executor executor) { - for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) { - BaseNodeMonitor<T> monitor= i.next(); - nodeManager.ping(this, monitor.getNode(), executor); // Cause call to failed or responded + for (var monitor : nodeMonitors()) { + if (closed.get()) return; // Do nothing to change state if close has started. + nodeManager.ping(this, monitor.getNode(), executor); } - if (closed.get()) return; // Do nothing to change state if close has started. nodeManager.pingIterationCompleted(); } @@ -143,7 +142,7 @@ public class ClusterMonitor<T> { // for all pings when there are no problems (important because it ensures that // any thread local connections are reused) 2) a new thread will be started to execute // new pings when a ping is not responding - ExecutorService pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); + ExecutorService pingExecutor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); while (!closed.get()) { try { log.finest("Activating ping"); @@ -165,7 +164,9 @@ public class ClusterMonitor<T> { } pingExecutor.shutdown(); try { - pingExecutor.awaitTermination(10, TimeUnit.SECONDS); + if ( ! pingExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + log.warning("Timeout waiting for ping executor to terminate"); + } } catch (InterruptedException e) { } log.info("Stopped cluster monitor thread " + getName()); } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java index 4af6757db8c..1cf36d75fc5 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java @@ -48,7 +48,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod * * @param id the id of this searcher * @param connections the connections of the cluster - * @param internal whether or not this cluster is internal (part of the same installation) + * @param internal whether this cluster is internal (part of the same installation) */ public ClusterSearcher(ComponentId id, List<T> connections, boolean internal) { this(id, connections, new Hasher<>(), internal); diff --git a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java index 1f6602053d9..f8f8c0d888d 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java @@ -22,7 +22,7 @@ public class MonitorConfiguration { /** * Returns the number of milliseconds to attempt to service a request - * (at different nodes) before giving up. Default is 5000 ms. + * (at different nodes) before giving up. See {@link #requestTimeout}. */ public long getRequestTimeout() { return requestTimeout; } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java index 11475b6a0ca..108e7e3e34b 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java @@ -23,7 +23,7 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { this.configuration = configuration; } - /** Whether or not this has ever responded successfully */ + /** Whether this has ever responded successfully */ private boolean atStartUp = true; public T getNode() { return node; } @@ -55,7 +55,7 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { respondedAt = now(); succeededAt = respondedAt; - setWorking(true,"Responds correctly"); + setWorking(true, "Responds correctly"); } /** diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java index 77496114df1..c6fef88fa2d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java @@ -2,7 +2,6 @@ package com.yahoo.search.dispatch; import java.io.Closeable; -import java.time.Duration; import java.util.function.BiConsumer; /** @@ -21,8 +20,8 @@ public abstract class CloseableInvoker implements Closeable { private RequestDuration duration; public void teardown(BiConsumer<Boolean, RequestDuration> teardown) { - this.teardown = teardown; - this.duration = new RequestDuration(); + this.teardown = this.teardown == null ? teardown : this.teardown.andThen(teardown); + this.duration = this.duration == null ? new RequestDuration() : this.duration; } protected void setFinalStatus(boolean success) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index db7e80a95e5..6f6b0fc2b79 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -1,9 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.yahoo.component.annotation.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.component.ComponentId; +import com.yahoo.component.annotation.Inject; import com.yahoo.compress.Compressor; import com.yahoo.container.handler.VipStatus; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; @@ -12,13 +12,14 @@ import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; +import com.yahoo.search.dispatch.rpc.RpcConnectionPool; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; -import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.FieldType; import com.yahoo.search.query.profile.types.QueryProfileType; @@ -32,6 +33,7 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; /** * A dispatcher communicates with search nodes to perform queries and fill hits. @@ -54,19 +56,43 @@ public class Dispatcher extends AbstractComponent { /** If set will control computation of how many hits will be fetched from each partition.*/ public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY); + private final InvokerFactoryFactory invokerFactories; private final DispatchConfig dispatchConfig; - private final RpcResourcePool rpcResourcePool; + private final RpcConnectionPool rpcResourcePool; private final SearchCluster searchCluster; - private final ClusterMonitor<Node> clusterMonitor; private volatile VolatileItems volatileItems; private static class VolatileItems { + final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; - VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { + final ClusterMonitor<Node> clusterMonitor; + final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. + Runnable cleanup = () -> { }; + + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; + this.clusterMonitor = clusterMonitor; + } + + private void countDown() { + if (inflight.decrementAndGet() == 0) cleanup.run(); + } + + private class Ref implements AutoCloseable { + boolean handedOff = false; + { inflight.incrementAndGet(); } + VolatileItems get() { return VolatileItems.this; } + /** Hands off the reference to the given invoker, which will decrement the counter when closed. */ + <T extends CloseableInvoker> T register(T invoker) { + invoker.teardown((__, ___) -> countDown()); + handedOff = true; + return invoker; + } + @Override public void close() { if ( ! handedOff) countDown(); } } + } private static final QueryProfileType argumentType; @@ -81,34 +107,105 @@ public class Dispatcher extends AbstractComponent { public static QueryProfileType getArgumentType() { return argumentType; } + interface InvokerFactoryFactory { + InvokerFactory create(RpcConnectionPool rpcConnectionPool, SearchGroups searchGroups, DispatchConfig dispatchConfig); + } + @Inject - public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, - DispatchNodesConfig nodesConfig, VipStatus vipStatus) { - this.dispatchConfig = dispatchConfig; - rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig); - searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), - toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool)); - clusterMonitor = new ClusterMonitor<>(searchCluster, true); - volatileItems = update(null); + public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) { + this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new); initialWarmup(dispatchConfig.warmuptime()); } - /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ - Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, - DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { + Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) { + this(dispatchConfig, rpcConnectionPool, + new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), + toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), + invokerFactories); + } + + Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { + this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories); + this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it. + } + + Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + SearchCluster searchCluster, ClusterMonitor<Node> clusterMonitor, InvokerFactoryFactory invokerFactories) { this.dispatchConfig = dispatchConfig; - this.rpcResourcePool = null; + this.rpcResourcePool = rpcConnectionPool; this.searchCluster = searchCluster; - this.clusterMonitor = clusterMonitor; - this.volatileItems = update(invokerFactory); + this.invokerFactories = invokerFactories; + this.volatileItems = update(clusterMonitor); + searchCluster.addMonitoring(clusterMonitor); } - private VolatileItems update(InvokerFactory invokerFactory) { + /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */ + Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, + DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { + this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory); + } + + /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */ + private VolatileItems.Ref volatileItems() { + return volatileItems.new Ref(); + } + + /** + * This is called whenever we have new config for backend nodes. + * Normally, we'd want to handle partial failure of the component graph, by reinstating the old state; + * however, in this case, such a failure would be local to this container, and we instead want to keep + * the newest config, as that is what most accurately represents the actual backend. + * + * The flow of reconfiguration is: + * 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes. + * 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close. + * 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up. + * + * Ownership details: + * 1. The RPC resource pool is owned by the dispatcher, and is updated on node set changes; + * it contains the means by which the container talks to backend nodes, so cleanup must be delayed until safe. + * 2. The invocation factory is owned by the volatile snapshot, and is swapped atomically with it; + * it is used by the dispatcher to create ephemeral invokers, which must complete before cleanup (above) can happen. + * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it; + * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot. + * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time; + * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that. + * 5. The search cluster is owned by the dispatcher, and is updated on node set changes; + * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher, + * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible + * of its state across reconfigurations: with new node config, it will immediately forget obsolete nodes, and set + * coverage information as if the new nodes have zero documents, before even checking their status; this is fine + * under the assumption that this is the common case, i.e., new nodes have no documents yet. + */ + void updateWithNewConfig(DispatchNodesConfig nodesConfig) { + try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up. + items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0. + + // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do. + Collection<? extends AutoCloseable> connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig); + items.get().cleanup = () -> { + for (AutoCloseable pool : connectionPoolsToClose) { + try { pool.close(); } catch (Exception ignored) { } + } + }; + + // Update the nodes the search cluster keeps track of, and what nodes are monitored. + ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); + + // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. + this.volatileItems = update(newMonitor); + + // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. + items.get().clusterMonitor.shutdown(); + } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. + } + + private VolatileItems update(ClusterMonitor<Node> clusterMonitor) { var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), - (invokerFactory == null) - ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig) - : invokerFactory); - searchCluster.addMonitoring(clusterMonitor); + invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig), + clusterMonitor); return items; } @@ -158,27 +255,30 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. - clusterMonitor.shutdown(); + volatileItems.clusterMonitor.shutdown(); if (rpcResourcePool != null) { rpcResourcePool.close(); } } public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) { - return volatileItems.invokerFactory.createFillInvoker(searcher, result); + try (var items = volatileItems()) { // Take a snapshot, and release it when we're done. + return items.register(items.get().invokerFactory.createFillInvoker(searcher, result)); + } } public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) { - VolatileItems items = volatileItems; // Take a snapshot - int maxHitsPerNode = dispatchConfig.maxHitsPerNode(); - SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.invokerFactory, maxHitsPerNode) - .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.loadBalancer, items.invokerFactory, maxHitsPerNode)); - - if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { - query.setHits(0); - query.setOffset(0); + try (var items = volatileItems()) { // Take a snapshot, and release it when we're done. + int maxHitsPerNode = dispatchConfig.maxHitsPerNode(); + SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.get().invokerFactory, maxHitsPerNode) + .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.get().loadBalancer, items.get().invokerFactory, maxHitsPerNode)); + + if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { + query.setHits(0); + query.setOffset(0); + } + return items.register(invoker); } - return invoker; } /** Builds an invoker based on searchpath */ diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java new file mode 100644 index 00000000000..625a8bcb6da --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java @@ -0,0 +1,37 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.component.ComponentId; +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.messagebus.network.rpc.SlobrokConfigSubscriber; +import com.yahoo.vespa.config.search.DispatchConfig; +import com.yahoo.vespa.config.search.DispatchNodesConfig; +import com.yahoo.yolean.UncheckedInterruptedException; + +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * @author jonmv + */ +public class ReconfigurableDispatcher extends Dispatcher { + + private final ConfigSubscriber subscriber; + + public ReconfigurableDispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, VipStatus vipStatus) { + super(clusterId, dispatchConfig, new DispatchNodesConfig.Builder().build(), vipStatus); + this.subscriber = new ConfigSubscriber(); + this.subscriber.subscribe(this::updateWithNewConfig, DispatchNodesConfig.class, clusterId.stringValue()); + } + + @Override + public void deconstruct() { + subscriber.close(); + super.deconstruct(); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java index 1206277a103..6b134dc23a6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java @@ -5,7 +5,7 @@ import java.time.Duration; import java.time.Instant; /** - * Contains start and and time. Exposes a duration, and lets you measure the time difference between 2 requests. + * Contains start and end time. Exposes a duration, and lets you measure the time difference between 2 requests. * It does use System.nanoTime to get a steady clock. * * @author baldersheim diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index 22ed8b6d9fa..6c1f666835c 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -12,7 +12,7 @@ import java.util.Optional; * * @author bratseth */ -interface Client { +public interface Client { /** Creates a connection to a particular node in this */ NodeConnection createConnection(String hostname, int port); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java index fd8e0e4f81a..a93ddb0b360 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java @@ -1,11 +1,27 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; +import com.yahoo.vespa.config.search.DispatchNodesConfig; + +import java.util.Collection; +import java.util.List; + /** * Interface for getting a connection given a node id. * * @author balderersheim */ -public interface RpcConnectionPool { +public interface RpcConnectionPool extends AutoCloseable { + + /** Returns a connection to the given node id. */ Client.NodeConnection getConnection(int nodeId); + + + /** Will return a list of items that need a delayed close when updating node set. */ + default Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { return List.of(); } + + /** Shuts down all connections in the pool, and the underlying RPC client. */ + @Override + void close(); + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index 154002c4f77..b6228994ac8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -33,7 +33,7 @@ public class RpcInvokerFactory extends InvokerFactory { super(cluster, dispatchConfig); this.rpcResourcePool = rpcResourcePool; this.compressor = new CompressService(); - decodeType = convert(dispatchConfig.summaryDecodePolicy()); + this.decodeType = convert(dispatchConfig.summaryDecodePolicy()); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index 53dc54f7bc5..a59097e5fff 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -37,7 +37,7 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { this.clusterMonitor = clusterMonitor; this.pingSequenceId = node.createPingSequenceId(); this.pongHandler = pongHandler; - this. compressor = compressor; + this.compressor = compressor; } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index 63530a7f650..d1f22514481 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -3,8 +3,10 @@ package com.yahoo.search.dispatch.rpc; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; +import com.yahoo.search.dispatch.rpc.RpcClient.RpcNodeConnection; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; +import com.yahoo.vespa.config.search.DispatchNodesConfig.Node; import java.util.ArrayList; import java.util.Collection; @@ -19,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom; * * @author ollivir */ -public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { +public class RpcResourcePool implements RpcConnectionPool { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of(); @@ -35,46 +37,35 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { } public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) { - super(); rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); numConnections = dispatchConfig.numJrtConnectionsPerNode(); - updateNodes(nodesConfig).forEach(item -> { - try { - item.close(); - } catch (Exception e) {} + updateNodes(nodesConfig).forEach(pool -> { + try { pool.close(); } catch (Exception ignored) { } // Shouldn't throw. }); } - /** Will return a list of items that need a delayed close */ - public Collection<AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { - List<AutoCloseable> toClose = new ArrayList<>(); - var builder = new HashMap<Integer, NodeConnectionPool>(); + @Override + public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { + Map<Integer, NodeConnectionPool> currentPools = new HashMap<>(nodeConnectionPools); + Map<Integer, NodeConnectionPool> nextPools = new HashMap<>(); // Who can be reused - for (var node : nodesConfig.node()) { - var prev = nodeConnectionPools.get(node.key()); - NodeConnection nc = prev != null ? prev.nextConnection() : null; - if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection - && rpcNodeConnection.getPort() == node.port() - && rpcNodeConnection.getHostname().equals(node.host())) + for (Node node : nodesConfig.node()) { + if ( currentPools.containsKey(node.key()) + && currentPools.get(node.key()).nextConnection() instanceof RpcNodeConnection rpcNodeConnection + && rpcNodeConnection.getPort() == node.port() + && rpcNodeConnection.getHostname().equals(node.host())) { - builder.put(node.key(), prev); + nextPools.put(node.key(), currentPools.remove(node.key())); } else { - var connections = new ArrayList<NodeConnection>(numConnections); + ArrayList<NodeConnection> connections = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; i++) { connections.add(rpcClient.createConnection(node.host(), node.port())); } - builder.put(node.key(), new NodeConnectionPool(connections)); + nextPools.put(node.key(), new NodeConnectionPool(connections)); } } - // Who are not needed any more - nodeConnectionPools.forEach((key, pool) -> { - var survivor = builder.get(key); - if (survivor == null || pool != survivor) { - toClose.add(pool); - } - }); - this.nodeConnectionPools = Map.copyOf(builder); - return toClose; + this.nodeConnectionPools = Map.copyOf(nextPools); + return currentPools.values(); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index 121c12335f5..c8af5cea5aa 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; +import java.util.ArrayList; import java.util.List; import java.util.logging.Logger; @@ -51,7 +52,7 @@ public class Group { /** * Returns whether this group has sufficient active documents - * (compared to other groups) that is should receive traffic + * (compared to other groups) that should receive traffic */ public boolean hasSufficientCoverage() { return hasSufficientCoverage; @@ -66,14 +67,16 @@ public class Group { } public void aggregateNodeValues() { - long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum(); + List<Node> workingNodes = new ArrayList<>(nodes); + workingNodes.removeIf(node -> node.isWorking() != Boolean.TRUE); + long activeDocs = workingNodes.stream().mapToLong(Node::getActiveDocuments).sum(); activeDocuments = activeDocs; - targetActiveDocuments = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getTargetActiveDocuments).sum(); + targetActiveDocuments = workingNodes.stream().mapToLong(Node::getTargetActiveDocuments).sum(); isBlockingWrites = nodes.stream().anyMatch(Node::isBlockingWrites); - int numWorkingNodes = workingNodes(); + int numWorkingNodes = workingNodes.size(); if (numWorkingNodes > 0) { long average = activeDocs / numWorkingNodes; - long skew = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum(); + long skew = workingNodes.stream().mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum(); boolean balanced = skew <= activeDocs * maxContentSkew; if (balanced != isBalanced) { if (!isSparse()) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 9c65cb3d4c0..3c8950f1f7f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -6,15 +6,18 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; +import com.yahoo.yolean.UncheckedInterruptedException; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; import java.util.logging.Logger; -import java.util.stream.Collectors; + +import static java.util.stream.Collectors.groupingBy; /** * A model of a search cluster we might want to dispatch queries to. @@ -28,7 +31,7 @@ public class SearchCluster implements NodeManager<Node> { private final String clusterId; private final VipStatus vipStatus; private final PingFactory pingFactory; - private final SearchGroupsImpl groups; + private volatile SearchGroupsImpl groups; private volatile long nextLogTime = 0; /** @@ -45,6 +48,7 @@ public class SearchCluster implements NodeManager<Node> { VipStatus vipStatus, PingFactory pingFactory) { this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory); } + public SearchCluster(String clusterId, SearchGroupsImpl groups, VipStatus vipStatus, PingFactory pingFactory) { this.clusterId = clusterId; this.vipStatus = vipStatus; @@ -55,13 +59,28 @@ public class SearchCluster implements NodeManager<Node> { @Override public String name() { return clusterId; } - public VipStatus getVipStatus() { return vipStatus; } + + /** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */ + public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) { + Collection<Node> retainedNodes = groups.nodes(); + Collection<Node> currentNodes = new HashSet<>(newNodes); + retainedNodes.retainAll(currentNodes); // Throw away all old nodes which are not in the new set. + currentNodes.removeIf(retainedNodes::contains); // Throw away all new nodes for which we have more information in an old object. + Collection<Node> addedNodes = List.copyOf(currentNodes); + currentNodes.addAll(retainedNodes); // Keep the old nodes that were replaced in the new set. + SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage); + ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false); + for (Node node : groups.nodes()) monitor.add(node, true); + monitor.start(); + try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } } + catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } + pingIterationCompleted(groups); + this.groups = groups; + return monitor; + } public void addMonitoring(ClusterMonitor<Node> clusterMonitor) { - for (var group : groups()) { - for (var node : group.nodes()) - clusterMonitor.add(node, true); - } + for (Node node : groups.nodes()) clusterMonitor.add(node, true); } private static Node findLocalCorpusDispatchTarget(String selfHostname, SearchGroups groups) { @@ -86,14 +105,14 @@ public class SearchCluster implements NodeManager<Node> { private static SearchGroupsImpl toGroups(Collection<Node> nodes, double minActivedocsPercentage) { Map<Integer, Group> groups = new HashMap<>(); - for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { - Group g = new Group(group.getKey(), group.getValue()); - groups.put(group.getKey(), g); - } + nodes.stream().collect(groupingBy(Node::group)).forEach((groupId, groupNodes) -> { + groups.put(groupId, new Group(groupId, groupNodes)); + }); return new SearchGroupsImpl(Map.copyOf(groups), minActivedocsPercentage); } public SearchGroups groupList() { return groups; } + public Group group(int id) { return groups.get(id); } private Collection<Group> groups() { return groups.groups(); } @@ -107,14 +126,14 @@ public class SearchCluster implements NodeManager<Node> { * or empty if we should not dispatch directly. */ public Optional<Node> localCorpusDispatchTarget() { - if ( localCorpusDispatchTarget == null) return Optional.empty(); + if (localCorpusDispatchTarget == null) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage Group localSearchGroup = groups.get(localCorpusDispatchTarget.group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is not down - if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty(); + if (localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty(); return Optional.of(localCorpusDispatchTarget); } @@ -176,7 +195,7 @@ public class SearchCluster implements NodeManager<Node> { return groups().stream().allMatch(group -> group.nodes().stream().allMatch(node -> node.isWorking() != null)); } - public long nonWorkingNodeCount() { + long nonWorkingNodeCount() { return groups().stream().flatMap(group -> group.nodes().stream()).filter(node -> node.isWorking() == Boolean.FALSE).count(); } @@ -194,13 +213,13 @@ public class SearchCluster implements NodeManager<Node> { /** Used by the cluster monitor to manage node status */ @Override - public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) { + public void ping(ClusterMonitor<Node> clusterMonitor, Node node, Executor executor) { Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor)); pinger.ping(); } - private void pingIterationCompletedSingleGroup() { - Group group = groups().iterator().next(); + private void pingIterationCompletedSingleGroup(SearchGroupsImpl groups) { + Group group = groups.groups().iterator().next(); group.aggregateNodeValues(); // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. @@ -209,10 +228,10 @@ public class SearchCluster implements NodeManager<Node> { trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments()); } - private void pingIterationCompletedMultipleGroups() { - groups().forEach(Group::aggregateNodeValues); + private void pingIterationCompletedMultipleGroups(SearchGroupsImpl groups) { + groups.groups().forEach(Group::aggregateNodeValues); long medianDocuments = groups.medianDocumentsPerGroup(); - for (Group group : groups()) { + for (Group group : groups.groups()) { boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments); @@ -226,20 +245,20 @@ public class SearchCluster implements NodeManager<Node> { */ @Override public void pingIterationCompleted() { + pingIterationCompleted(groups); + } + + private void pingIterationCompleted(SearchGroupsImpl groups) { if (groups.size() == 1) { - pingIterationCompletedSingleGroup(); + pingIterationCompletedSingleGroup(groups); } else { - pingIterationCompletedMultipleGroups(); + pingIterationCompletedMultipleGroups(groups); } } - - /** * Calculate whether a subset of nodes in a group has enough coverage */ - - private void trackGroupCoverageChanges(Group group, boolean fullCoverage, long medianDocuments) { if ( ! hasInformationAboutAllNodes()) return; // Be silent until we know what we are talking about. boolean changed = group.fullCoverageStatusChanged(fullCoverage); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java index b041ba28db9..5727931281a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java @@ -1,8 +1,16 @@ package com.yahoo.search.dispatch.searchcluster; +import com.yahoo.stream.CustomCollectors; + import java.util.Collection; +import java.util.Comparator; +import java.util.LinkedHashSet; import java.util.Set; +import static java.util.Comparator.comparingInt; +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toSet; + /** * Simple interface for groups and their nodes in the content cluster * @author baldersheim @@ -14,6 +22,11 @@ public interface SearchGroups { default boolean isEmpty() { return size() == 0; } + default Set<Node> nodes() { + return groups().stream().flatMap(group -> group.nodes().stream()) + .sorted(comparingInt(Node::key)) + .collect(toCollection(LinkedHashSet::new)); + } int size(); boolean isPartialGroupCoverageSufficient(Collection<Node> nodes); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java index 514f0de4fec..3c5dbe9927a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java @@ -3,10 +3,8 @@ package com.yahoo.search.dispatch.searchcluster; import com.google.common.math.Quantiles; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; public class SearchGroupsImpl implements SearchGroups { @@ -42,4 +40,5 @@ public class SearchGroupsImpl implements SearchGroups { double[] activeDocuments = groups().stream().mapToDouble(Group::activeDocuments).toArray(); return (long) Quantiles.median().computeInPlace(activeDocuments); } + } |