diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-24 12:10:45 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-24 12:10:45 +0100 |
commit | 8e2abfc58e936bd6be3194f9e02a21a9b70b7692 (patch) | |
tree | 9918c66fc57647c0b1c831223f6b0f9c5d6b680d /container-search/src | |
parent | 90b260e83678edc36eb877ec235e0e6ce5892a48 (diff) |
Put loadbalancer and invokerfactory in a volatile object to ensure atomic switch when reconfiguring.
Diffstat (limited to 'container-search/src')
14 files changed, 182 insertions, 189 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 2c5a9372d06..92a522e9970 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -16,6 +16,7 @@ import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.query.profile.types.FieldDescription; @@ -53,12 +54,20 @@ public class Dispatcher extends AbstractComponent { /** If set will control computation of how many hits will be fetched from each partition.*/ public static final CompoundName topKProbability = CompoundName.fromComponents(DISPATCH, TOP_K_PROBABILITY); - private final ClusterMonitor<Node> clusterMonitor; - private final LoadBalancer loadBalancer; - private final SearchCluster searchCluster; - private final InvokerFactory invokerFactory; - private final int maxHitsPerNode; + private final DispatchConfig dispatchConfig; private final RpcResourcePool rpcResourcePool; + private final SearchCluster searchCluster; + private final ClusterMonitor<Node> clusterMonitor; + private volatile VolatileItems volatileItems; + + private static class VolatileItems { + final LoadBalancer loadBalancer; + final InvokerFactory invokerFactory; + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { + this.loadBalancer = loadBalancer; + this.invokerFactory = invokerFactory; + } + } private static final QueryProfileType argumentType; @@ -73,53 +82,36 @@ public class Dispatcher extends AbstractComponent { public static QueryProfileType getArgumentType() { return argumentType; } @Inject - public Dispatcher(ComponentId clusterId, - DispatchConfig dispatchConfig, - DispatchNodesConfig nodesConfig, - VipStatus vipStatus) { - this(clusterId, dispatchConfig, nodesConfig, vipStatus, - new RpcResourcePool(dispatchConfig, nodesConfig)); - - } - private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, - DispatchNodesConfig nodesConfig, VipStatus vipStatus, - RpcResourcePool resourcePool) { - this(new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), nodesConfig, - vipStatus, new RpcPingFactory(resourcePool)), - dispatchConfig, resourcePool); - - } - - private Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, - RpcResourcePool rpcResourcePool) { - this(new ClusterMonitor<>(searchCluster, true), - searchCluster, dispatchConfig, - new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig), - rpcResourcePool); - } - - /* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ - protected Dispatcher(ClusterMonitor<Node> clusterMonitor, - SearchCluster searchCluster, - DispatchConfig dispatchConfig, - InvokerFactory invokerFactory) { - this(clusterMonitor, searchCluster, dispatchConfig, invokerFactory, null); + public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, + DispatchNodesConfig nodesConfig, VipStatus vipStatus) { + this.dispatchConfig = dispatchConfig; + rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig); + searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), + nodesConfig, vipStatus, new RpcPingFactory(rpcResourcePool)); + clusterMonitor = new ClusterMonitor<>(searchCluster, true); + volatileItems = update(null); + initialWarmup(dispatchConfig.warmuptime()); } - private Dispatcher(ClusterMonitor<Node> clusterMonitor, - SearchCluster searchCluster, - DispatchConfig dispatchConfig, - InvokerFactory invokerFactory, - RpcResourcePool rpcResourcePool) { - + /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ + Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, + DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { + this.dispatchConfig = dispatchConfig; + this.rpcResourcePool = null; this.searchCluster = searchCluster; this.clusterMonitor = clusterMonitor; - this.loadBalancer = new LoadBalancer(searchCluster.groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())); - this.invokerFactory = invokerFactory; - this.rpcResourcePool = rpcResourcePool; - this.maxHitsPerNode = dispatchConfig.maxHitsPerNode(); + this.volatileItems = update(invokerFactory); + } + private VolatileItems update(InvokerFactory invokerFactory) { + var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), + (invokerFactory == null) + ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig) + : invokerFactory); searchCluster.addMonitoring(clusterMonitor); - Thread warmup = new Thread(() -> warmup(dispatchConfig.warmuptime())); + return items; + } + private void initialWarmup(double warmupTime) { + Thread warmup = new Thread(() -> warmup(warmupTime)); warmup.start(); try { while ( ! searchCluster.hasInformationAboutAllNodes()) { @@ -153,7 +145,7 @@ public class Dispatcher extends AbstractComponent { } public boolean allGroupsHaveSize1() { - return searchCluster.allGroupsHaveSize1(); + return searchCluster.groupList().groups().stream().allMatch(g -> g.nodes().size() == 1); } @Override @@ -166,13 +158,14 @@ public class Dispatcher extends AbstractComponent { } public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) { - return invokerFactory.createFillInvoker(searcher, result); + return volatileItems.invokerFactory.createFillInvoker(searcher, result); } public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) { - SearchCluster cluster = searchCluster; // Take a snapshot - InvokerFactory factory = invokerFactory; - SearchInvoker invoker = getSearchPathInvoker(query, searcher, cluster, factory, maxHitsPerNode).orElseGet(() -> getInternalInvoker(query, searcher, cluster, loadBalancer, factory, maxHitsPerNode)); + VolatileItems items = volatileItems; // Take a snapshot + int maxHitsPerNode = dispatchConfig.maxHitsPerNode(); + SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.invokerFactory, maxHitsPerNode) + .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.loadBalancer, items.invokerFactory, maxHitsPerNode)); if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { query.setHits(0); @@ -182,7 +175,7 @@ public class Dispatcher extends AbstractComponent { } /** Builds an invoker based on searchpath */ - private static Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster, + private static Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher, SearchGroups cluster, InvokerFactory invokerFactory, int maxHitsPerNode) { String searchPath = query.getModel().getSearchPath(); if (searchPath == null) return Optional.empty(); @@ -217,9 +210,9 @@ public class Dispatcher extends AbstractComponent { } int covered = cluster.groupsWithSufficientCoverage(); - int groups = cluster.numGroups(); + int groups = cluster.groupList().size(); int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS); - Set<Integer> rejected = rejectGroupBlockingFeed(cluster.groups()); + Set<Integer> rejected = rejectGroupBlockingFeed(cluster.groupList().groups()); for (int i = 0; i < max; i++) { Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected); if (groupInCluster.isEmpty()) break; // No groups available diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index f3d87e04810..d6fb6de6354 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -6,8 +6,8 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; @@ -24,12 +24,12 @@ import java.util.Set; public abstract class InvokerFactory { private static final double SKEW_FACTOR = 0.05; - private final SearchCluster searchCluster; + private final SearchGroups cluster; private final DispatchConfig dispatchConfig; private final TopKEstimator hitEstimator; - public InvokerFactory(SearchCluster searchCluster, DispatchConfig dispatchConfig) { - this.searchCluster = searchCluster; + public InvokerFactory(SearchGroups searchCluster, DispatchConfig dispatchConfig) { + this.cluster = searchCluster; this.dispatchConfig = dispatchConfig; this.hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR); } @@ -57,7 +57,7 @@ public abstract class InvokerFactory { List<Node> nodes, boolean acceptIncompleteCoverage, int maxHits) { - Group group = searchCluster.group(nodes.get(0).group()); // Nodes must be of the same group + Group group = cluster.get(nodes.get(0).group()); // Nodes must be of the same group List<SearchInvoker> invokers = new ArrayList<>(nodes.size()); Set<Integer> failed = null; for (Node node : nodes) { @@ -85,7 +85,7 @@ public abstract class InvokerFactory { success.add(node); } } - if ( ! searchCluster.isPartialGroupCoverageSufficient(success) && !acceptIncompleteCoverage) { + if ( ! cluster.isPartialGroupCoverageSufficient(success) && !acceptIncompleteCoverage) { return Optional.empty(); } if (invokers.isEmpty()) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java index 10bf5e36988..568c9f117d7 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java @@ -3,7 +3,7 @@ package com.yahoo.search.dispatch; import com.yahoo.collections.Pair; import com.yahoo.search.dispatch.searchcluster.Group; -import com.yahoo.search.dispatch.searchcluster.GroupList; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; import java.util.ArrayList; @@ -36,7 +36,7 @@ public class SearchPath { * @return list of nodes chosen with the search path, or an empty list in which * case some other node selection logic should be used */ - public static List<Node> selectNodes(String searchPath, GroupList cluster) { + public static List<Node> selectNodes(String searchPath, SearchGroups cluster) { Optional<SearchPath> sp = SearchPath.fromString(searchPath); if (sp.isPresent()) { return sp.get().mapToNodes(cluster); @@ -73,7 +73,7 @@ public class SearchPath { this.groups = groups; } - private List<Node> mapToNodes(GroupList cluster) { + private List<Node> mapToNodes(SearchGroups cluster) { if (cluster.isEmpty()) { return List.of(); } @@ -100,11 +100,11 @@ public class SearchPath { return nodes.isEmpty() && groups.isEmpty(); } - private Group selectRandomGroupWithSufficientCoverage(GroupList cluster, List<Integer> groupIds) { + private Group selectRandomGroupWithSufficientCoverage(SearchGroups cluster, List<Integer> groupIds) { while ( groupIds.size() > 1 ) { int index = random.nextInt(groupIds.size()); int groupId = groupIds.get(index); - Group group = cluster.group(groupId); + Group group = cluster.get(groupId); if (group != null) { if (group.hasSufficientCoverage()) { return group; @@ -115,10 +115,10 @@ public class SearchPath { throw new InvalidSearchPathException("Invalid searchPath, cluster does not have " + (groupId + 1) + " groups"); } } - return cluster.group(groupIds.get(0)); + return cluster.get(groupIds.get(0)); } - private Group selectGroup(GroupList cluster) { + private Group selectGroup(SearchGroups cluster) { if ( ! groups.isEmpty()) { List<Integer> potentialGroups = new ArrayList<>(); for (Selection groupSelection : groups) { @@ -130,7 +130,7 @@ public class SearchPath { } // pick any working group - return selectRandomGroupWithSufficientCoverage(cluster, new ArrayList<>(cluster.groupKeys())); + return selectRandomGroupWithSufficientCoverage(cluster, new ArrayList<>(cluster.keys())); } private static SearchPath parseElement(String element) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index d5b1b876540..4466b03a713 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -7,8 +7,8 @@ import com.yahoo.search.Result; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Optional; @@ -21,8 +21,8 @@ public class RpcInvokerFactory extends InvokerFactory { private final RpcConnectionPool rpcResourcePool; private final CompressPayload compressor; - public RpcInvokerFactory(RpcConnectionPool rpcResourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) { - super(searchCluster, dispatchConfig); + public RpcInvokerFactory(RpcConnectionPool rpcResourcePool, SearchGroups cluster, DispatchConfig dispatchConfig) { + super(cluster, dispatchConfig); this.rpcResourcePool = rpcResourcePool; this.compressor = new CompressService(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index cf161638104..fbea58054da 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -1,8 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; -import com.google.common.collect.ImmutableList; - import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -22,7 +20,7 @@ public class Group { private final static int minDocsPerNodeToRequireLowSkew = 100; private final int id; - private final ImmutableList<Node> nodes; + private final List<Node> nodes; private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true); private final AtomicBoolean hasFullCoverage = new AtomicBoolean(true); private final AtomicLong activeDocuments = new AtomicLong(0); @@ -32,7 +30,7 @@ public class Group { public Group(int id, List<Node> nodes) { this.id = id; - this.nodes = ImmutableList.copyOf(nodes); + this.nodes = List.copyOf(nodes); int idx = 0; for(var node: nodes) { @@ -48,7 +46,7 @@ public class Group { public int id() { return id; } /** Returns the nodes in this group as an immutable list */ - public ImmutableList<Node> nodes() { return nodes; } + public List<Node> nodes() { return nodes; } /** * Returns whether this group has sufficient active documents diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/GroupListImpl.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/GroupListImpl.java deleted file mode 100644 index b0cbf7d1d1d..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/GroupListImpl.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.yahoo.search.dispatch.searchcluster; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class GroupListImpl implements GroupList { - private final Map<Integer, Group> groups; - public GroupListImpl(Map<Integer, Group> groups) { - this.groups = Map.copyOf(groups); - } - @Override public Group group(int id) { return groups.get(id); } - @Override public Set<Integer> groupKeys() { return groups.keySet();} - @Override public Collection<Group> groups() { return groups.values(); } - @Override public int numGroups() { return groups.size(); } - public static GroupList buildGroupListForTest(int numGroups, int nodesPerGroup) { - return new GroupListImpl(buildGroupMapForTest(numGroups, nodesPerGroup)); - } - public static Map<Integer, Group> buildGroupMapForTest(int numGroups, int nodesPerGroup) { - Map<Integer, Group> groups = new HashMap<>(); - int distributionKey = 0; - for (int group = 0; group < numGroups; group++) { - List<Node> groupNodes = new ArrayList<>(); - for (int i = 0; i < nodesPerGroup; i++) { - Node node = new Node(distributionKey, "host" + distributionKey, group); - node.setWorking(true); - groupNodes.add(node); - distributionKey++; - } - Group g = new Group(group, groupNodes); - groups.put(group, g); - } - return Map.copyOf(groups); - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index bedfa965229..aa3797cb627 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -2,7 +2,6 @@ package com.yahoo.search.dispatch.searchcluster; import com.google.common.collect.ImmutableMap; -import com.google.common.math.Quantiles; import com.yahoo.container.handler.VipStatus; import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; @@ -14,7 +13,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.Executor; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -24,15 +22,14 @@ import java.util.stream.Collectors; * * @author bratseth */ -public class SearchCluster implements NodeManager<Node>, GroupList { +public class SearchCluster implements NodeManager<Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); - private final double minActivedocsPercentage; private final String clusterId; private final VipStatus vipStatus; private final PingFactory pingFactory; - private final GroupList groups; + private final SearchGroupsImpl groups; private long nextLogTime = 0; /** @@ -52,12 +49,10 @@ public class SearchCluster implements NodeManager<Node>, GroupList { } public SearchCluster(String clusterId, double minActivedocsPercentage, List<Node> nodes, VipStatus vipStatus, PingFactory pingFactory) { - this(clusterId, minActivedocsPercentage, toGroups(nodes), vipStatus, pingFactory); + this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory); } - public SearchCluster(String clusterId, double minActivedocsPercentage, GroupList groups, - VipStatus vipStatus, PingFactory pingFactory) { + public SearchCluster(String clusterId, SearchGroupsImpl groups, VipStatus vipStatus, PingFactory pingFactory) { this.clusterId = clusterId; - this.minActivedocsPercentage = minActivedocsPercentage; this.vipStatus = vipStatus; this.pingFactory = pingFactory; this.groups = groups; @@ -74,7 +69,7 @@ public class SearchCluster implements NodeManager<Node>, GroupList { } } - private static Node findLocalCorpusDispatchTarget(String selfHostname, GroupList groups) { + private static Node findLocalCorpusDispatchTarget(String selfHostname, SearchGroups groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. // It has all the data <==> No other nodes in the search cluster have the same group id as this node. // That local search node responds. @@ -86,7 +81,7 @@ public class SearchCluster implements NodeManager<Node>, GroupList { if (localSearchNodes.size() != 1) return null; Node localSearchNode = localSearchNodes.iterator().next(); - Group localSearchGroup = groups.group(localSearchNode.group()); + Group localSearchGroup = groups.get(localSearchNode.group()); // Only use direct dispatch if the local search node has the entire corpus if (localSearchGroup.nodes().size() != 1) return null; @@ -100,27 +95,19 @@ public class SearchCluster implements NodeManager<Node>, GroupList { .toList(); } - private static GroupList toGroups(Collection<Node> nodes) { + private static SearchGroupsImpl toGroups(Collection<Node> nodes, double minActivedocsPercentage) { ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { Group g = new Group(group.getKey(), group.getValue()); groupsBuilder.put(group.getKey(), g); } - return new GroupListImpl(groupsBuilder.build()); + return new SearchGroupsImpl(groupsBuilder.build(), minActivedocsPercentage); } - @Override - public Group group(int id) { - return groups.group(id); - } - @Override - public Set<Integer> groupKeys() { return groups.groupKeys(); } - @Override - public Collection<Group> groups() { return groups.groups(); } - @Override - public int numGroups() { return groups.numGroups(); } + public SearchGroups groupList() { return groups; } + public Group group(int id) { return groups.get(id); } - public boolean allGroupsHaveSize1() { return groups().stream().allMatch(g -> g.nodes().size() == 1); } + private Collection<Group> groups() { return groups.groups(); } public int groupsWithSufficientCoverage() { return (int)groups().stream().filter(Group::hasSufficientCoverage).count(); @@ -134,7 +121,7 @@ public class SearchCluster implements NodeManager<Node>, GroupList { if ( localCorpusDispatchTarget == null) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage - Group localSearchGroup = group(localCorpusDispatchTarget.group()); + Group localSearchGroup = groups.get(localCorpusDispatchTarget.group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is not down @@ -225,26 +212,20 @@ public class SearchCluster implements NodeManager<Node>, GroupList { // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. updateSufficientCoverage(group, true); - boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), group.activeDocuments()); + boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), group.activeDocuments()); trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments()); } private void pingIterationCompletedMultipleGroups() { groups().forEach(Group::aggregateNodeValues); - long medianDocuments = medianDocumentsPerGroup(); + long medianDocuments = groups.medianDocumentsPerGroup(); for (Group group : groups()) { - boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); + boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments); } } - private long medianDocumentsPerGroup() { - if (isEmpty()) return 0; - var activeDocuments = groups().stream().map(Group::activeDocuments).collect(Collectors.toList()); - return (long)Quantiles.median().compute(activeDocuments); - } - /** * Update statistics after a round of issuing pings. * Note that this doesn't wait for pings to return, so it will typically accumulate data from @@ -252,27 +233,19 @@ public class SearchCluster implements NodeManager<Node>, GroupList { */ @Override public void pingIterationCompleted() { - if (numGroups() == 1) { + if (groups.size() == 1) { pingIterationCompletedSingleGroup(); } else { pingIterationCompletedMultipleGroups(); } } - private boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) { - double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments; - return ! (medianDocuments > 0 && documentCoverage < minActivedocsPercentage); - } + /** * Calculate whether a subset of nodes in a group has enough coverage */ - public boolean isPartialGroupCoverageSufficient(List<Node> nodes) { - if (numGroups() == 1) - return true; - long activeDocuments = nodes.stream().mapToLong(Node::getActiveDocuments).sum(); - return isGroupCoverageSufficient(activeDocuments, medianDocumentsPerGroup()); - } + private void trackGroupCoverageChanges(Group group, boolean fullCoverage, long medianDocuments) { if ( ! hasInformationAboutAllNodes()) return; // Be silent until we know what we are talking about. diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/GroupList.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java index 7d625f55cb6..b041ba28db9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/GroupList.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java @@ -7,12 +7,13 @@ import java.util.Set; * Simple interface for groups and their nodes in the content cluster * @author baldersheim */ -public interface GroupList { - Group group(int id); - Set<Integer> groupKeys(); +public interface SearchGroups { + Group get(int id); + Set<Integer> keys(); Collection<Group> groups(); default boolean isEmpty() { - return numGroups() == 0; + return size() == 0; } - int numGroups(); + int size(); + boolean isPartialGroupCoverageSufficient(Collection<Node> nodes); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java new file mode 100644 index 00000000000..a8f0d07b494 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java @@ -0,0 +1,63 @@ +package com.yahoo.search.dispatch.searchcluster; + +import com.google.common.math.Quantiles; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class SearchGroupsImpl implements SearchGroups { + private final Map<Integer, Group> groups; + private final double minActivedocsPercentage; + public SearchGroupsImpl(Map<Integer, Group> groups, double minActivedocsPercentage) { + this.groups = Map.copyOf(groups); + this.minActivedocsPercentage = minActivedocsPercentage; + } + @Override public Group get(int id) { return groups.get(id); } + @Override public Set<Integer> keys() { return groups.keySet();} + @Override public Collection<Group> groups() { return groups.values(); } + @Override public int size() { return groups.size(); } + @Override + public boolean isPartialGroupCoverageSufficient(Collection<Node> nodes) { + if (size() == 1) + return true; + long activeDocuments = nodes.stream().mapToLong(Node::getActiveDocuments).sum(); + return isGroupCoverageSufficient(activeDocuments, medianDocumentsPerGroup()); + } + + public boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) { + double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments; + return ! (medianDocuments > 0 && documentCoverage < minActivedocsPercentage); + } + + public long medianDocumentsPerGroup() { + if (isEmpty()) return 0; + var activeDocuments = groups().stream().map(Group::activeDocuments).collect(Collectors.toList()); + return (long) Quantiles.median().compute(activeDocuments); + } + + + public static SearchGroupsImpl buildGroupListForTest(int numGroups, int nodesPerGroup, double minActivedocsPercentage) { + return new SearchGroupsImpl(buildGroupMapForTest(numGroups, nodesPerGroup), minActivedocsPercentage); + } + public static Map<Integer, Group> buildGroupMapForTest(int numGroups, int nodesPerGroup) { + Map<Integer, Group> groups = new HashMap<>(); + int distributionKey = 0; + for (int group = 0; group < numGroups; group++) { + List<Node> groupNodes = new ArrayList<>(); + for (int i = 0; i < nodesPerGroup; i++) { + Node node = new Node(distributionKey, "host" + distributionKey, group); + node.setWorking(true); + groupNodes.add(node); + distributionKey++; + } + Group g = new Group(group, groupNodes); + groups.put(group, g); + } + return Map.copyOf(groups); + } +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java index 2f960add4a8..e9b1cbabc79 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java @@ -13,6 +13,7 @@ import com.yahoo.prelude.fastsearch.SummaryParameters; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.Searcher; +import com.yahoo.search.dispatch.MockDispatcher; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index 3c5b026b95e..89ca400362c 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -5,6 +5,7 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.PingFactory; import com.yahoo.search.dispatch.searchcluster.Pinger; @@ -34,7 +35,7 @@ public class DispatcherTest { SearchCluster cl = new MockSearchCluster("1", 2, 2); Query q = new Query(); q.getModel().setSearchPath("1/0"); // second node in first group - MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (nodes, a) -> { + MockInvokerFactory invokerFactory = new MockInvokerFactory(cl.groupList(), dispatchConfig, (nodes, a) -> { assertEquals(1, nodes.size()); assertEquals(1, nodes.get(0).key()); return true; @@ -54,7 +55,7 @@ public class DispatcherTest { return Optional.of(new Node(1, "test", 1)); } }; - MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (n, a) -> true); + MockInvokerFactory invokerFactory = new MockInvokerFactory(cl.groupList(), dispatchConfig, (n, a) -> true); Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory); SearchInvoker invoker = disp.getSearchInvoker(new Query(), null); assertNotNull(invoker); @@ -66,7 +67,7 @@ public class DispatcherTest { void requireThatInvokerConstructionIsRetriedAndLastAcceptsAnyCoverage() { SearchCluster cl = new MockSearchCluster("1", 2, 1); - MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (n, acceptIncompleteCoverage) -> { + MockInvokerFactory invokerFactory = new MockInvokerFactory(cl.groupList(), dispatchConfig, (n, acceptIncompleteCoverage) -> { assertFalse(acceptIncompleteCoverage); return false; }, (n, acceptIncompleteCoverage) -> { @@ -85,7 +86,7 @@ public class DispatcherTest { try { SearchCluster cl = new MockSearchCluster("1", 2, 1); - MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (n, a) -> false, (n, a) -> false); + MockInvokerFactory invokerFactory = new MockInvokerFactory(cl.groupList(), dispatchConfig, (n, a) -> false, (n, a) -> false); Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory); disp.getSearchInvoker(new Query(), null); disp.deconstruct(); @@ -99,7 +100,8 @@ public class DispatcherTest { @Test void testGroup0IsSelected() { SearchCluster cluster = new MockSearchCluster("1", 3, 1); - Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true)); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, + new MockInvokerFactory(cluster.groupList(), dispatchConfig, (n, a) -> true)); cluster.pingIterationCompleted(); assertEquals(0, dispatcher.getSearchInvoker(new Query(), null).distributionKey().get().longValue()); @@ -109,7 +111,8 @@ public class DispatcherTest { @Test void testGroup0IsSkippedWhenItIsBlockingFeed() { SearchCluster cluster = new MockSearchCluster("1", 3, 1); - Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true)); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, + new MockInvokerFactory(cluster.groupList(), dispatchConfig, (n, a) -> true)); cluster.group(0).nodes().get(0).setBlockingWrites(true); cluster.pingIterationCompleted(); assertEquals(1, @@ -121,7 +124,8 @@ public class DispatcherTest { @Test void testGroup0IsSelectedWhenMoreAreBlockingFeed() { SearchCluster cluster = new MockSearchCluster("1", 3, 1); - Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true)); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, + new MockInvokerFactory(cluster.groupList(), dispatchConfig, (n, a) -> true)); cluster.group(0).nodes().get(0).setBlockingWrites(true); cluster.group(1).nodes().get(0).setBlockingWrites(true); cluster.pingIterationCompleted(); @@ -134,7 +138,8 @@ public class DispatcherTest { @Test void testGroup0IsSelectedWhenItIsBlockingFeedWhenNoOthers() { SearchCluster cluster = new MockSearchCluster("1", 1, 1); - Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true)); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, + new MockInvokerFactory(cluster.groupList(), dispatchConfig, (n, a) -> true)); cluster.group(0).nodes().get(0).setBlockingWrites(true); cluster.pingIterationCompleted(); assertEquals(0, @@ -152,7 +157,7 @@ public class DispatcherTest { private final FactoryStep[] events; private int step = 0; - public MockInvokerFactory(SearchCluster cl, DispatchConfig disptachConfig, FactoryStep... events) { + public MockInvokerFactory(SearchGroups cl, DispatchConfig disptachConfig, FactoryStep... events) { super(cl, disptachConfig); this.events = events; } diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockDispatcher.java index 886c9818842..86b3d90f5ca 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockDispatcher.java @@ -1,9 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.prelude.fastsearch.test; +package com.yahoo.search.dispatch; import com.yahoo.container.handler.VipStatus; import com.yahoo.search.cluster.ClusterMonitor; -import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; @@ -14,7 +13,7 @@ import com.yahoo.vespa.config.search.DispatchNodesConfig; import java.util.List; -class MockDispatcher extends Dispatcher { +public class MockDispatcher extends Dispatcher { public final ClusterMonitor clusterMonitor; @@ -31,7 +30,7 @@ class MockDispatcher extends Dispatcher { } private MockDispatcher(ClusterMonitor clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcResourcePool rpcResourcePool) { - this(clusterMonitor, searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig)); + this(clusterMonitor, searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig)); } private MockDispatcher(ClusterMonitor clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcInvokerFactory invokerFactory) { @@ -39,10 +38,10 @@ class MockDispatcher extends Dispatcher { this.clusterMonitor = clusterMonitor; } - static DispatchConfig toDispatchConfig() { + public static DispatchConfig toDispatchConfig() { return new DispatchConfig.Builder().build(); } - static DispatchNodesConfig toNodesConfig(List<Node> nodes) { + public static DispatchNodesConfig toNodesConfig(List<Node> nodes) { DispatchNodesConfig.Builder dispatchConfigBuilder = new DispatchNodesConfig.Builder(); int key = 0; for (Node node : nodes) { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java index 9a4931a8fa7..727e00ed681 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java @@ -1,25 +1,23 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.yahoo.search.dispatch.searchcluster.GroupListImpl; +import com.yahoo.search.dispatch.searchcluster.SearchGroupsImpl; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.vespa.config.search.DispatchConfig; -import java.util.List; - /** * @author ollivir */ public class MockSearchCluster extends SearchCluster { public MockSearchCluster(String clusterId, int groups, int nodesPerGroup) { - super(clusterId, 88.0, GroupListImpl.buildGroupListForTest(groups, nodesPerGroup), null, null); + super(clusterId, SearchGroupsImpl.buildGroupListForTest(groups, nodesPerGroup, 88.0), null, null); } @Override public int groupsWithSufficientCoverage() { - return numGroups(); + return groupList().size(); } @Override diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java index fcd587e19f7..f0083e13eac 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java @@ -2,8 +2,8 @@ package com.yahoo.search.dispatch; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; -import com.yahoo.search.dispatch.searchcluster.GroupList; -import com.yahoo.search.dispatch.searchcluster.GroupListImpl; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; +import com.yahoo.search.dispatch.searchcluster.SearchGroupsImpl; import com.yahoo.search.dispatch.searchcluster.Node; import org.junit.jupiter.api.Test; @@ -83,7 +83,7 @@ public class SearchPathTest { } } - private void verifyRandomGroup(GroupList cluster, String searchPath, Set<?> possibleSolutions) { + private void verifyRandomGroup(SearchGroups cluster, String searchPath, Set<?> possibleSolutions) { for (int i=0; i < 100; i++) { String nodes = distKeysAsString(SearchPath.selectNodes(searchPath, cluster)); assertTrue(possibleSolutions.contains(nodes)); @@ -92,7 +92,7 @@ public class SearchPathTest { @Test void searchPathMustFilterNodesBasedOnDefinition() { - GroupList cluster = GroupListImpl.buildGroupListForTest(3, 3); + SearchGroups cluster = SearchGroupsImpl.buildGroupListForTest(3, 3, 100); assertEquals(distKeysAsString(SearchPath.selectNodes("1/1", cluster)), "4"); assertEquals(distKeysAsString(SearchPath.selectNodes("/1", cluster)), "3,4,5"); |