From 3f2f274b40d19354b78c3d3bec6b8c04c461d09c Mon Sep 17 00:00:00 2001 From: jonmv Date: Tue, 11 Jul 2023 17:20:50 +0200 Subject: Set up skeleton for replacing monitored groups and nodes --- .../com/yahoo/search/cluster/BaseNodeMonitor.java | 2 +- .../com/yahoo/search/cluster/ClusterMonitor.java | 2 +- .../yahoo/search/cluster/MonitorConfiguration.java | 2 +- .../yahoo/search/cluster/TrafficNodeMonitor.java | 4 +- .../yahoo/search/dispatch/CloseableInvoker.java | 6 +-- .../java/com/yahoo/search/dispatch/Dispatcher.java | 60 +++++++++++++++------- .../yahoo/search/dispatch/searchcluster/Group.java | 13 +++-- .../dispatch/searchcluster/SearchCluster.java | 42 ++++++++------- .../dispatch/searchcluster/SearchGroups.java | 3 ++ .../dispatch/searchcluster/SearchGroupsImpl.java | 3 +- .../dispatch/searchcluster/SearchClusterTest.java | 4 +- 11 files changed, 84 insertions(+), 57 deletions(-) (limited to 'container-search') diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java index fd8110e1173..d1377b8d373 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java @@ -82,7 +82,7 @@ public abstract class BaseNodeMonitor { /** Thread-safely changes the state of this node if required */ protected abstract void setWorking(boolean working,String explanation); - /** Returns whether or not this is monitoring an internal node. Default is false. */ + /** Returns whether this is monitoring an internal node. Default is false. */ public boolean isInternal() { return internal; } } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index d1c212168a6..332bf4ea2c4 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -66,7 +66,7 @@ public class ClusterMonitor { * * * @param node the object representing the node - * @param internal whether or not this node is internal to this cluster + * @param internal whether this node is internal to this cluster */ public void add(T node, boolean internal) { nodeMonitors.put(node, new TrafficNodeMonitor<>(node, configuration, internal)); diff --git a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java index 1f6602053d9..f8f8c0d888d 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java @@ -22,7 +22,7 @@ public class MonitorConfiguration { /** * Returns the number of milliseconds to attempt to service a request - * (at different nodes) before giving up. Default is 5000 ms. + * (at different nodes) before giving up. See {@link #requestTimeout}. */ public long getRequestTimeout() { return requestTimeout; } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java index 11475b6a0ca..108e7e3e34b 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java @@ -23,7 +23,7 @@ public class TrafficNodeMonitor extends BaseNodeMonitor { this.configuration = configuration; } - /** Whether or not this has ever responded successfully */ + /** Whether this has ever responded successfully */ private boolean atStartUp = true; public T getNode() { return node; } @@ -55,7 +55,7 @@ public class TrafficNodeMonitor extends BaseNodeMonitor { respondedAt = now(); succeededAt = respondedAt; - setWorking(true,"Responds correctly"); + setWorking(true, "Responds correctly"); } /** diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java index 558b734be51..c6fef88fa2d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java @@ -2,7 +2,6 @@ package com.yahoo.search.dispatch; import java.io.Closeable; -import java.time.Duration; import java.util.function.BiConsumer; /** @@ -21,10 +20,7 @@ public abstract class CloseableInvoker implements Closeable { private RequestDuration duration; public void teardown(BiConsumer teardown) { - this.teardown = this.teardown == null ? teardown : (success, duration) -> { - this.teardown.accept(success, duration); - teardown.accept(success, duration); - }; + this.teardown = this.teardown == null ? teardown : this.teardown.andThen(teardown); this.duration = this.duration == null ? new RequestDuration() : this.duration; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 2e8fbe34781..9b0c79dc2bc 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -1,9 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.yahoo.component.annotation.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.component.ComponentId; +import com.yahoo.component.annotation.Inject; import com.yahoo.compress.Compressor; import com.yahoo.container.handler.VipStatus; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; @@ -16,9 +16,9 @@ import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; -import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.FieldType; import com.yahoo.search.query.profile.types.QueryProfileType; @@ -33,7 +33,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; /** * A dispatcher communicates with search nodes to perform queries and fill hits. @@ -59,19 +58,20 @@ public class Dispatcher extends AbstractComponent { private final DispatchConfig dispatchConfig; private final RpcResourcePool rpcResourcePool; private final SearchCluster searchCluster; - private final ClusterMonitor clusterMonitor; private volatile VolatileItems volatileItems; private static class VolatileItems { final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; + final ClusterMonitor clusterMonitor; final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. Runnable cleanup = () -> { }; - VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor clusterMonitor) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; + this.clusterMonitor = clusterMonitor; } private void countDown() { @@ -112,19 +112,17 @@ public class Dispatcher extends AbstractComponent { rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig); searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool)); - clusterMonitor = new ClusterMonitor<>(searchCluster, true); - volatileItems = update(null); + volatileItems = update(null, new ClusterMonitor<>(searchCluster, true)); initialWarmup(dispatchConfig.warmuptime()); } - /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ + /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */ Dispatcher(ClusterMonitor clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { this.dispatchConfig = dispatchConfig; this.rpcResourcePool = null; this.searchCluster = searchCluster; - this.clusterMonitor = clusterMonitor; - this.volatileItems = update(invokerFactory); + this.volatileItems = update(invokerFactory, clusterMonitor); } /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */ @@ -142,13 +140,28 @@ public class Dispatcher extends AbstractComponent { * 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes. * 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close. * 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up. + * + * Ownership details: + * 1. The RPC resource pool is owned by the dispatcher, and is updated on node set changes; + * it contains the means by which the container talks to backend nodes, so cleanup must be delayed until safe. + * 2. The invocation factory is owned by the volatile snapshot, and is swapped atomically with it; + * it is used by the dispatcher to create ephemeral invokers, which must complete before cleanup (above) can happen. + * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it; + * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot. + * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time; + * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that. + * 5. The search cluster is owned by the dispatcher, and is updated on node set changes; + * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher, + * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible + * of its state across reconfigurations: with new node config, it will immediately forget obsolete nodes, and set + * coverage information as if the new nodes have zero documents, before even checking their status; this is fine + * under the assumption that this is the common case, i.e., new nodes have no documents yet. */ void updateWithNewConfig(DispatchNodesConfig nodesConfig) { - try (var items = volatileItems()) { // Replace the volatile items snapshot, marking a reference to the old one. - items.get().countDown(); // Decrement for the initial creation reference. - this.volatileItems = update(null); + try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up. + items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0. - // Set up the cleanup that we need to do. + // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do. Collection connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig); items.get().cleanup = () -> { for (AutoCloseable pool : connectionPoolsToClose) { @@ -156,15 +169,24 @@ public class Dispatcher extends AbstractComponent { } }; - } // Close the old snapshot, which may trigger the cleanup right now, or when the last invoker is closed. + // Update the nodes the search cluster keeps track of, and what nodes are monitored. + searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); + + // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. + this.volatileItems = update(null, new ClusterMonitor<>(searchCluster, true)); + + // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. + items.get().clusterMonitor.shutdown(); + } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. } - private VolatileItems update(InvokerFactory invokerFactory) { + private VolatileItems update(InvokerFactory invokerFactory, ClusterMonitor clusterMonitor) { var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), (invokerFactory == null) ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig) - : invokerFactory); - searchCluster.addMonitoring(clusterMonitor); // TODO: Update, rather than add ... as this creates a pinger for each node + : invokerFactory, + clusterMonitor); + searchCluster.addMonitoring(clusterMonitor); return items; } @@ -214,7 +236,7 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. - clusterMonitor.shutdown(); + volatileItems.clusterMonitor.shutdown(); if (rpcResourcePool != null) { rpcResourcePool.close(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index 121c12335f5..c8af5cea5aa 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; +import java.util.ArrayList; import java.util.List; import java.util.logging.Logger; @@ -51,7 +52,7 @@ public class Group { /** * Returns whether this group has sufficient active documents - * (compared to other groups) that is should receive traffic + * (compared to other groups) that should receive traffic */ public boolean hasSufficientCoverage() { return hasSufficientCoverage; @@ -66,14 +67,16 @@ public class Group { } public void aggregateNodeValues() { - long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum(); + List workingNodes = new ArrayList<>(nodes); + workingNodes.removeIf(node -> node.isWorking() != Boolean.TRUE); + long activeDocs = workingNodes.stream().mapToLong(Node::getActiveDocuments).sum(); activeDocuments = activeDocs; - targetActiveDocuments = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getTargetActiveDocuments).sum(); + targetActiveDocuments = workingNodes.stream().mapToLong(Node::getTargetActiveDocuments).sum(); isBlockingWrites = nodes.stream().anyMatch(Node::isBlockingWrites); - int numWorkingNodes = workingNodes(); + int numWorkingNodes = workingNodes.size(); if (numWorkingNodes > 0) { long average = activeDocs / numWorkingNodes; - long skew = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum(); + long skew = workingNodes.stream().mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum(); boolean balanced = skew <= activeDocs * maxContentSkew; if (balanced != isBalanced) { if (!isSparse()) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 9c65cb3d4c0..1334a6c43f1 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -9,12 +9,14 @@ import com.yahoo.search.cluster.NodeManager; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; import java.util.logging.Logger; -import java.util.stream.Collectors; + +import static java.util.stream.Collectors.groupingBy; /** * A model of a search cluster we might want to dispatch queries to. @@ -28,7 +30,7 @@ public class SearchCluster implements NodeManager { private final String clusterId; private final VipStatus vipStatus; private final PingFactory pingFactory; - private final SearchGroupsImpl groups; + private volatile SearchGroupsImpl groups; private volatile long nextLogTime = 0; /** @@ -45,6 +47,7 @@ public class SearchCluster implements NodeManager { VipStatus vipStatus, PingFactory pingFactory) { this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory); } + public SearchCluster(String clusterId, SearchGroupsImpl groups, VipStatus vipStatus, PingFactory pingFactory) { this.clusterId = clusterId; this.vipStatus = vipStatus; @@ -55,13 +58,18 @@ public class SearchCluster implements NodeManager { @Override public String name() { return clusterId; } - public VipStatus getVipStatus() { return vipStatus; } + + public void updateNodes(Collection newNodes, double minActivedocsPercentage) { + Collection retainedNodes = groups.nodes(); + Collection currentNodes = new HashSet<>(newNodes); + retainedNodes.retainAll(currentNodes); + currentNodes.removeIf(retainedNodes::contains); + currentNodes.addAll(retainedNodes); + groups = toGroups(currentNodes, minActivedocsPercentage); + } public void addMonitoring(ClusterMonitor clusterMonitor) { - for (var group : groups()) { - for (var node : group.nodes()) - clusterMonitor.add(node, true); - } + for (Node node : groups.nodes()) clusterMonitor.add(node, true); } private static Node findLocalCorpusDispatchTarget(String selfHostname, SearchGroups groups) { @@ -86,14 +94,14 @@ public class SearchCluster implements NodeManager { private static SearchGroupsImpl toGroups(Collection nodes, double minActivedocsPercentage) { Map groups = new HashMap<>(); - for (Map.Entry> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { - Group g = new Group(group.getKey(), group.getValue()); - groups.put(group.getKey(), g); - } + nodes.stream().collect(groupingBy(Node::group)).forEach((groupId, groupNodes) -> { + groups.put(groupId, new Group(groupId, groupNodes)); + }); return new SearchGroupsImpl(Map.copyOf(groups), minActivedocsPercentage); } public SearchGroups groupList() { return groups; } + public Group group(int id) { return groups.get(id); } private Collection groups() { return groups.groups(); } @@ -107,14 +115,14 @@ public class SearchCluster implements NodeManager { * or empty if we should not dispatch directly. */ public Optional localCorpusDispatchTarget() { - if ( localCorpusDispatchTarget == null) return Optional.empty(); + if (localCorpusDispatchTarget == null) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage Group localSearchGroup = groups.get(localCorpusDispatchTarget.group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is not down - if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty(); + if (localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty(); return Optional.of(localCorpusDispatchTarget); } @@ -176,7 +184,7 @@ public class SearchCluster implements NodeManager { return groups().stream().allMatch(group -> group.nodes().stream().allMatch(node -> node.isWorking() != null)); } - public long nonWorkingNodeCount() { + long nonWorkingNodeCount() { return groups().stream().flatMap(group -> group.nodes().stream()).filter(node -> node.isWorking() == Boolean.FALSE).count(); } @@ -194,7 +202,7 @@ public class SearchCluster implements NodeManager { /** Used by the cluster monitor to manage node status */ @Override - public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) { + public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) { Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor)); pinger.ping(); } @@ -233,13 +241,9 @@ public class SearchCluster implements NodeManager { } } - - /** * Calculate whether a subset of nodes in a group has enough coverage */ - - private void trackGroupCoverageChanges(Group group, boolean fullCoverage, long medianDocuments) { if ( ! hasInformationAboutAllNodes()) return; // Be silent until we know what we are talking about. boolean changed = group.fullCoverageStatusChanged(fullCoverage); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java index b041ba28db9..cdbc3365409 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java @@ -3,6 +3,8 @@ package com.yahoo.search.dispatch.searchcluster; import java.util.Collection; import java.util.Set; +import static java.util.stream.Collectors.toSet; + /** * Simple interface for groups and their nodes in the content cluster * @author baldersheim @@ -14,6 +16,7 @@ public interface SearchGroups { default boolean isEmpty() { return size() == 0; } + default Set nodes() { return groups().stream().flatMap(group -> group.nodes().stream()).collect(toSet());} int size(); boolean isPartialGroupCoverageSufficient(Collection nodes); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java index 514f0de4fec..3c5dbe9927a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java @@ -3,10 +3,8 @@ package com.yahoo.search.dispatch.searchcluster; import com.google.common.math.Quantiles; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; public class SearchGroupsImpl implements SearchGroups { @@ -42,4 +40,5 @@ public class SearchGroupsImpl implements SearchGroups { double[] activeDocuments = groups().stream().mapToDouble(Group::activeDocuments).toArray(); return (long) Quantiles.median().computeInPlace(activeDocuments); } + } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java index 51256ec496e..f0231e44bef 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java @@ -31,7 +31,7 @@ public class SearchClusterTest { final int nodesPerGroup; final VipStatus vipStatus; final SearchCluster searchCluster; - final ClusterMonitor clusterMonitor; + final ClusterMonitor clusterMonitor; final List numDocsPerNode; List pingCounts; @@ -57,7 +57,7 @@ public class SearchClusterTest { } searchCluster = new SearchCluster(clusterId, 100.0, nodes, vipStatus, new Factory(nodesPerGroup, numDocsPerNode, pingCounts)); - clusterMonitor = new ClusterMonitor(searchCluster, false); + clusterMonitor = new ClusterMonitor<>(searchCluster, false); searchCluster.addMonitoring(clusterMonitor); } -- cgit v1.2.3