From 8b8d3b2de3f5f92af37ec68b0ff5768fbae341b3 Mon Sep 17 00:00:00 2001 From: Olli Virtanen Date: Fri, 2 Nov 2018 14:49:18 +0100 Subject: Java dispatcher refactoring and cluster monitoring improvements --- .../prelude/fastsearch/FS4InvokerFactory.java | 52 ++- .../yahoo/prelude/fastsearch/FS4SearchInvoker.java | 10 +- .../com/yahoo/prelude/fastsearch/FastSearcher.java | 12 +- .../com/yahoo/search/cluster/ClusterMonitor.java | 4 +- .../yahoo/search/cluster/TrafficNodeMonitor.java | 23 +- .../java/com/yahoo/search/dispatch/Dispatcher.java | 54 ++- .../com/yahoo/search/dispatch/LoadBalancer.java | 21 +- .../com/yahoo/search/dispatch/SearchCluster.java | 478 --------------------- .../java/com/yahoo/search/dispatch/SearchPath.java | 16 +- .../yahoo/search/dispatch/searchcluster/Group.java | 75 ++++ .../yahoo/search/dispatch/searchcluster/Node.java | 73 ++++ .../search/dispatch/searchcluster/Pinger.java | 42 ++ .../dispatch/searchcluster/SearchCluster.java | 343 +++++++++++++++ .../fastsearch/test/FastSearcherTestCase.java | 65 +-- .../fastsearch/test/FastSearcherTester.java | 18 +- .../prelude/fastsearch/test/MockDispatcher.java | 16 +- .../yahoo/search/dispatch/LoadBalancerTest.java | 43 +- .../yahoo/search/dispatch/MockSearchCluster.java | 3 + .../com/yahoo/search/dispatch/SearchPathTest.java | 5 +- 19 files changed, 726 insertions(+), 627 deletions(-) delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java (limited to 'container-search/src') diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java index f68bb718c8d..51048db3cb7 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -8,10 +8,12 @@ import com.yahoo.search.Result; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InterleavedFillInvoker; import com.yahoo.search.dispatch.InterleavedSearchInvoker; -import com.yahoo.search.dispatch.SearchCluster; import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Hit; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -30,18 +32,20 @@ import java.util.Set; public class FS4InvokerFactory { private final FS4ResourcePool fs4ResourcePool; private final VespaBackEndSearcher searcher; - private final ImmutableMap nodesByKey; + private final SearchCluster searchCluster; + private final ImmutableMap nodesByKey; public FS4InvokerFactory(FS4ResourcePool fs4ResourcePool, SearchCluster searchCluster, VespaBackEndSearcher searcher) { this.fs4ResourcePool = fs4ResourcePool; this.searcher = searcher; + this.searchCluster = searchCluster; - ImmutableMap.Builder builder = ImmutableMap.builder(); + ImmutableMap.Builder builder = ImmutableMap.builder(); searchCluster.groups().values().forEach(group -> group.nodes().forEach(node -> builder.put(node.key(), node))); this.nodesByKey = builder.build(); } - public SearchInvoker getSearchInvoker(Query query, SearchCluster.Node node) { + public SearchInvoker getSearchInvoker(Query query, Node node) { Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); return new FS4SearchInvoker(searcher, query, backend.openChannel(), node); } @@ -49,22 +53,46 @@ public class FS4InvokerFactory { /** * Create a {@link SearchInvoker} for a list of content nodes. * - * @param query the search query being processed - * @param nodes pre-selected list of content nodes - * @return Optional containing the SearchInvoker or empty if some node in the list is invalid + * @param query + * the search query being processed + * @param groupId + * the id of the node group to which the nodes belong + * @param nodes + * pre-selected list of content nodes + * @param acceptIncompleteCoverage + * if some of the nodes are unavailable and this parameter is + * false, verify that the remaining set of nodes has enough + * coverage + * @return Optional containing the SearchInvoker or empty if some node in the + * list is invalid and the remaining coverage is not sufficient */ - public Optional getSearchInvoker(Query query, List nodes) { + public Optional getSearchInvoker(Query query, int groupId, List nodes, boolean acceptIncompleteCoverage) { Map invokers = new HashMap<>(); - for (SearchCluster.Node node : nodes) { + Set failed = null; + for (Node node : nodes) { if (node.isWorking()) { Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); if (backend.probeConnection()) { invokers.put(node.key(), new FS4SearchInvoker(searcher, query, backend.openChannel(), node)); } else { - return Optional.empty(); + if(failed == null) { + failed = new HashSet<>(); + } + failed.add(node.key()); } } } + if (failed != null && ! acceptIncompleteCoverage) { + List success = new ArrayList<>(nodes.size() - failed.size()); + for (Node node : nodes) { + if (!failed.contains(node.key())) { + success.add(node); + } + } + if (!searchCluster.isPartialGroupCoverageSufficient(groupId, success)) { + return Optional.empty(); + } + } if (invokers.size() == 1) { return Optional.of(invokers.values().iterator().next()); } else { @@ -72,7 +100,7 @@ public class FS4InvokerFactory { } } - public FillInvoker getFillInvoker(Query query, SearchCluster.Node node) { + public FillInvoker getFillInvoker(Query query, Node node) { return new FS4FillInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key()); } @@ -88,7 +116,7 @@ public class FS4InvokerFactory { Query query = result.getQuery(); Map invokers = new HashMap<>(); for (Integer distKey : requiredNodes) { - SearchCluster.Node node = nodesByKey.get(distKey); + Node node = nodesByKey.get(distKey); if (node == null) { return Optional.empty(); } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java index dc8cd53e638..98676890bdf 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -11,8 +11,8 @@ import com.yahoo.fs4.mplex.FS4Channel; import com.yahoo.fs4.mplex.InvalidChannelException; import com.yahoo.search.Query; import com.yahoo.search.Result; -import com.yahoo.search.dispatch.SearchCluster; import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; @@ -33,13 +33,13 @@ import static java.util.Arrays.asList; public class FS4SearchInvoker extends SearchInvoker { private final VespaBackEndSearcher searcher; private FS4Channel channel; - private final Optional node; + private final Optional node; private ErrorMessage pendingSearchError = null; private Query query = null; private QueryPacket queryPacket = null; - public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, SearchCluster.Node node) { + public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, Node node) { this.searcher = searcher; this.node = Optional.of(node); this.channel = channel; @@ -115,7 +115,7 @@ public class FS4SearchInvoker extends SearchInvoker { searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result); - searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, node.map(SearchCluster.Node::key)); + searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, node.map(Node::key)); Packet[] packets; CacheControl cacheControl = searcher.getCacheControl(); PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query); @@ -130,7 +130,7 @@ public class FS4SearchInvoker extends SearchInvoker { } else { packets = new Packet[1]; packets[0] = resultPacket; - cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, node.map(SearchCluster.Node::key)); + cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, node.map(Node::key)); } } return asList(result); diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 36d283040a2..11b71c2c159 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -17,8 +17,8 @@ import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.FillInvoker; -import com.yahoo.search.dispatch.SearchCluster; import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.GroupingOperation; import com.yahoo.search.query.Ranking; @@ -218,7 +218,7 @@ public class FastSearcher extends VespaBackEndSearcher { return invoker.get(); } - Optional direct = getDirectNode(query); + Optional direct = getDirectNode(query); if(direct.isPresent()) { return fs4InvokerFactory.getSearchInvoker(query, direct.get()); } @@ -237,7 +237,7 @@ public class FastSearcher extends VespaBackEndSearcher { return invoker.get(); } - Optional direct = getDirectNode(query); + Optional direct = getDirectNode(query); if (direct.isPresent()) { return fs4InvokerFactory.getFillInvoker(query, direct.get()); } @@ -248,18 +248,18 @@ public class FastSearcher extends VespaBackEndSearcher { * If the query can be directed to a single local content node, returns that node. Otherwise, * returns an empty value. */ - private Optional getDirectNode(Query query) { + private Optional getDirectNode(Query query) { if (!query.properties().getBoolean(dispatchDirect, true)) return Optional.empty(); if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) return Optional.empty(); - Optional directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); + Optional directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); if (!directDispatchRecipient.isPresent()) return Optional.empty(); // Dispatch directly to the single, local search node - SearchCluster.Node local = directDispatchRecipient.get(); + Node local = directDispatchRecipient.get(); query.trace(false, 2, "Dispatching directly to ", local); return Optional.of(local); } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index 4878691742c..5de0c5eff74 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.cluster; - -import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.search.result.ErrorMessage; @@ -43,7 +41,7 @@ public class ClusterMonitor { public ClusterMonitor(NodeManager manager, String ignored) { this(manager); } - + public ClusterMonitor(NodeManager manager) { nodeManager = manager; monitorThread = new MonitorThread("search.clustermonitor"); diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java index c1c955cb03a..ea881ad8b48 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.cluster; +import com.yahoo.container.protect.Error; import com.yahoo.search.result.ErrorMessage; /** @@ -36,20 +37,14 @@ public class TrafficNodeMonitor extends BaseNodeMonitor { public void failed(ErrorMessage error) { respondedAt = now(); - switch (error.getCode()) { - // TODO: Remove hard coded error messages. - // Refer to docs/errormessages - case 10: - case 11: - // Only count not being able to talk to backend at all - // as errors we care about - if ((respondedAt-succeededAt) > 10000) { - setWorking(false, "Not working for 10 s: " + error.toString()); - } - break; - default: - succeededAt = respondedAt; - break; + if (error.getCode() == Error.BACKEND_COMMUNICATION_ERROR.code) { + setWorking(false, "Connection failure: " + error.toString()); + } else if (error.getCode() == Error.NO_ANSWER_WHEN_PINGING_NODE.code) { + if ((respondedAt - succeededAt) > 10000) { + setWorking(false, "Not working for 10 s: " + error.toString()); + } + } else { + succeededAt = respondedAt; } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 286eee004c5..0dd682dee0e 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -11,6 +11,9 @@ import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; +import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Arrays; @@ -33,6 +36,8 @@ import java.util.Set; * @author ollvir */ public class Dispatcher extends AbstractComponent { + private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3; + /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */ private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal"); @@ -66,11 +71,6 @@ public class Dispatcher extends AbstractComponent { rpcResourcePool.release(); } - @FunctionalInterface - private interface SearchInvokerSupplier { - Optional supply(Query query, List nodes); - } - public Optional getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb, FS4InvokerFactory fs4InvokerFactory) { Optional rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb); @@ -102,6 +102,11 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } + @FunctionalInterface + private interface SearchInvokerSupplier { + Optional supply(Query query, int groupId, List nodes, boolean acceptIncompleteCoverage); + } + // build invoker based on searchpath private Optional getSearchPathInvoker(Query query, SearchInvokerSupplier invokerFactory) { String searchPath = query.getModel().getSearchPath(); @@ -109,11 +114,11 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } try { - List nodes = SearchPath.selectNodes(searchPath, searchCluster); + List nodes = SearchPath.selectNodes(searchPath, searchCluster); if (nodes.isEmpty()) { return Optional.empty(); } else { - return invokerFactory.supply(query, nodes); + return invokerFactory.supply(query, -1, nodes, true); } } catch (InvalidSearchPathException e) { return Optional.of(new SearchErrorInvoker(e.getMessage())); @@ -121,41 +126,34 @@ public class Dispatcher extends AbstractComponent { } private Optional getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) { - Optional directNode = searchCluster.directDispatchTarget(); + Optional directNode = searchCluster.directDispatchTarget(); if (directNode.isPresent()) { - SearchCluster.Node node = directNode.get(); + Node node = directNode.get(); query.trace(false, 2, "Dispatching directly to ", node); - return invokerFactory.supply(query, Arrays.asList(node)); + return invokerFactory.supply(query, -1, Arrays.asList(node), true); } - Set tried = null; - int max = searchCluster.groups().size(); - for (int attempt = 0; attempt < max; attempt++) { - Optional groupInCluster = loadBalancer.takeGroupForQuery(query); - if (! groupInCluster.isPresent()) { + int max = Integer.min(searchCluster.orderedGroups().size(), MAX_GROUP_SELECTION_ATTEMPTS); + Set rejected = null; + for (int i = 0; i < max; i++) { + Optional groupInCluster = loadBalancer.takeGroupForQuery(query, rejected); + if (!groupInCluster.isPresent()) { // No groups available break; } - SearchCluster.Group group = groupInCluster.get(); - if (tried != null && tried.contains(group.id())) { - // bail out: LB is offering a previously discarded group - loadBalancer.releaseGroup(group); - break; - } - - Optional invoker = invokerFactory.supply(query, group.nodes()); + Group group = groupInCluster.get(); + boolean acceptIncompleteCoverage = (i == max - 1); + Optional invoker = invokerFactory.supply(query, group.id(), group.nodes(), acceptIncompleteCoverage); if (invoker.isPresent()) { query.trace(false, 2, "Dispatching internally to ", group); invoker.get().teardown(() -> loadBalancer.releaseGroup(group)); return invoker; } else { - // invoker could not be produced (likely connectivity issue) - searchCluster.groupConnectionFailure(group); loadBalancer.releaseGroup(group); - if (tried == null) { - tried = new HashSet<>(); + if (rejected == null) { + rejected = new HashSet<>(); } - tried.add(group.id()); + rejected.add(group.id()); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 64e38a488ab..22573fac2d9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -2,12 +2,14 @@ package com.yahoo.search.dispatch; import com.yahoo.search.Query; -import com.yahoo.search.dispatch.SearchCluster.Group; +import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,18 +49,19 @@ public class LoadBalancer { * {@link #releaseGroup} symmetrically for each taken allocation. * * @param query the query for which this allocation is made + * @param rejectedGroups if not null, the load balancer will only return groups with IDs not in the set * @return The node group to target, or empty if the internal dispatch logic cannot be used */ - public Optional takeGroupForQuery(Query query) { + public Optional takeGroupForQuery(Query query, Set rejectedGroups) { if (scoreboard == null) { return Optional.empty(); } - return allocateNextGroup(); + return allocateNextGroup(rejectedGroups); } /** - * Release an allocation given by {@link #takeGroupForQuery(Query)}. The release must be done exactly once for each allocation. + * Release an allocation given by {@link #takeGroupForQuery}. The release must be done exactly once for each allocation. * * @param group * previously allocated group @@ -74,7 +77,7 @@ public class LoadBalancer { } } - private Optional allocateNextGroup() { + private Optional allocateNextGroup(Set rejectedGroups) { synchronized (this) { GroupSchedule bestSchedule = null; int bestIndex = needle; @@ -82,9 +85,11 @@ public class LoadBalancer { int index = needle; for (int i = 0; i < scoreboard.size(); i++) { GroupSchedule sched = scoreboard.get(index); - if (sched.isPreferredOver(bestSchedule)) { - bestSchedule = sched; - bestIndex = index; + if (rejectedGroups == null || !rejectedGroups.contains(sched.group.id())) { + if (sched.isPreferredOver(bestSchedule)) { + bestSchedule = sched; + bestIndex = index; + } } index = nextScoreboardIndex(index); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java deleted file mode 100644 index e26dd5648eb..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ /dev/null @@ -1,478 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch; - -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMultimap; -import com.yahoo.container.handler.VipStatus; -import com.yahoo.net.HostName; -import com.yahoo.search.cluster.ClusterMonitor; -import com.yahoo.search.cluster.NodeManager; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.vespa.config.search.DispatchConfig; - -// Only needed until query requests are moved to rpc -import com.yahoo.prelude.Ping; -import com.yahoo.prelude.fastsearch.FastSearcher; -import com.yahoo.yolean.Exceptions; -import com.yahoo.prelude.Pong; -import com.yahoo.prelude.fastsearch.FS4ResourcePool; - -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.stream.Collectors; - -/** - * A model of a search cluster we might want to dispatch queries to. - * - * @author bratseth - */ -public class SearchCluster implements NodeManager { - - private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); - - /** The min active docs a group must have to be considered up, as a % of the average active docs of the other groups */ - private final double minActivedocsCoveragePercentage; - private final double minGroupCoverage; - private final int maxNodesDownPerGroup; - private final int size; - private final ImmutableMap groups; - private final ImmutableMultimap nodesByHost; - private final ImmutableList orderedGroups; - private final ClusterMonitor clusterMonitor; - private final VipStatus vipStatus; - - /** - * A search node on this local machine having the entire corpus, which we therefore - * should prefer to dispatch directly to, or empty if there is no such local search node. - * If there is one, we also maintain the VIP status of this container based on the availability - * of the corpus on this local node (up + has coverage), such that this node is taken out of rotation - * if it only queries this cluster when the local node cannot be used, to avoid unnecessary - * cross-node network traffic. - */ - private final Optional directDispatchTarget; - - // Only needed until query requests are moved to rpc - private final FS4ResourcePool fs4ResourcePool; - - public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { - this(dispatchConfig.minActivedocsPercentage(), dispatchConfig.minGroupCoverage(), dispatchConfig.maxNodesDownPerGroup(), - toNodes(dispatchConfig), fs4ResourcePool, containerClusterSize, vipStatus); - } - - public SearchCluster(double minActivedocsCoverage, double minGroupCoverage, int maxNodesDownPerGroup, List nodes, FS4ResourcePool fs4ResourcePool, - int containerClusterSize, VipStatus vipStatus) { - this.minActivedocsCoveragePercentage = minActivedocsCoverage; - this.minGroupCoverage = minGroupCoverage; - this.maxNodesDownPerGroup = maxNodesDownPerGroup; - this.size = nodes.size(); - this.fs4ResourcePool = fs4ResourcePool; - this.vipStatus = vipStatus; - - // Create groups - ImmutableMap.Builder groupsBuilder = new ImmutableMap.Builder<>(); - for (Map.Entry> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { - Group g = new Group(group.getKey(), group.getValue()); - groupsBuilder.put(group.getKey(), g); - } - this.groups = groupsBuilder.build(); - LinkedHashMap groupIntroductionOrder = new LinkedHashMap<>(); - nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group))); - this.orderedGroups = ImmutableList.builder().addAll(groupIntroductionOrder.values()).build(); - - // Index nodes by host - ImmutableMultimap.Builder nodesByHostBuilder = new ImmutableMultimap.Builder<>(); - for (Node node : nodes) - nodesByHostBuilder.put(node.hostname(), node); - this.nodesByHost = nodesByHostBuilder.build(); - - this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize, - nodesByHost, groups); - - // Set up monitoring of the fs4 interface of the nodes - // We can switch to monitoring the rpc interface instead when we move the query phase to rpc - this.clusterMonitor = new ClusterMonitor<>(this); - for (Node node : nodes) { - // cluster monitor will only call working() when the - // node transitions from down to up, so we need to - // register the initial (working) state here: - working(node); - clusterMonitor.add(node, true); - } - } - - private static Optional findDirectDispatchTarget(String selfHostname, - int searchClusterSize, - int containerClusterSize, - ImmutableMultimapnodesByHost, - ImmutableMap groups) { - // A search node in the search cluster in question is configured on the same host as the currently running container. - // It has all the data <==> No other nodes in the search cluster have the same group id as this node. - // That local search node responds. - // The search cluster to be searched has at least as many nodes as the container cluster we're running in. - ImmutableCollection localSearchNodes = nodesByHost.get(selfHostname); - // Only use direct dispatch if we have exactly 1 search node on the same machine: - if (localSearchNodes.size() != 1) return Optional.empty(); - - SearchCluster.Node localSearchNode = localSearchNodes.iterator().next(); - SearchCluster.Group localSearchGroup = groups.get(localSearchNode.group()); - - // Only use direct dispatch if the local search node has the entire corpus - if (localSearchGroup.nodes().size() != 1) return Optional.empty(); - - // Only use direct dispatch if this container cluster has at least as many nodes as the search cluster - // to avoid load skew/preserve fanout in the case where a subset of the search nodes are also containers. - // This disregards the case where the search and container clusters are partially overlapping. - // Such configurations produce skewed load in any case. - if (containerClusterSize < searchClusterSize) return Optional.empty(); - - return Optional.of(localSearchNode); - } - - private static ImmutableList toNodes(DispatchConfig dispatchConfig) { - ImmutableList.Builder nodesBuilder = new ImmutableList.Builder<>(); - for (DispatchConfig.Node node : dispatchConfig.node()) - nodesBuilder.add(new Node(node.key(), node.host(), node.fs4port(), node.group())); - return nodesBuilder.build(); - } - - /** Returns the number of nodes in this cluster (across all groups) */ - public int size() { return size; } - - /** Returns the groups of this cluster as an immutable map indexed by group id */ - public ImmutableMap groups() { return groups; } - - /** Returns the groups of this cluster as an immutable list in introduction order */ - public ImmutableList orderedGroups() { return orderedGroups; } - - /** Returns the n'th (zero-indexed) group in the cluster if possible */ - public Optional group(int n) { - if (orderedGroups.size() > n) { - return Optional.of(orderedGroups.get(n)); - } else { - return Optional.empty(); - } - } - - /** Returns the number of nodes per group - size()/groups.size() */ - public int groupSize() { - if (groups.size() == 0) return size(); - return size() / groups.size(); - } - - /** - * Returns the nodes of this cluster as an immutable map indexed by host. - * One host may contain multiple nodes (on different ports), so this is a multi-map. - */ - public ImmutableMultimap nodesByHost() { return nodesByHost; } - - /** - * Returns the recipient we should dispatch queries directly to (bypassing fdispatch), - * or empty if we should not dispatch directly. - */ - public Optional directDispatchTarget() { - if ( ! directDispatchTarget.isPresent()) return Optional.empty(); - - // Only use direct dispatch if the local group has sufficient coverage - SearchCluster.Group localSearchGroup = groups.get(directDispatchTarget.get().group()); - if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); - - // Only use direct dispatch if the local search node is up - if ( ! directDispatchTarget.get().isWorking()) return Optional.empty(); - - return directDispatchTarget; - } - - /** Used by the cluster monitor to manage node status */ - @Override - public void working(Node node) { - node.setWorking(true); - - if (usesDirectDispatchTo(node)) - vipStatus.addToRotation(this); - } - - /** Used by the cluster monitor to manage node status */ - @Override - public void failed(Node node) { - node.setWorking(false); - - // Take ourselves out if we usually dispatch only to our own host - if (usesDirectDispatchTo(node)) - vipStatus.removeFromRotation(this); - } - - public void groupConnectionFailure(Group group) { - group.setHasSufficientCoverage(false); // will be reset after next ping iteration - } - - private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { - // update VIP status if we direct dispatch to this group and coverage status changed - if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) { - if (sufficientCoverage) { - vipStatus.addToRotation(this); - } else { - vipStatus.removeFromRotation(this); - } - } - group.setHasSufficientCoverage(sufficientCoverage); - } - - private boolean usesDirectDispatchTo(Node node) { - if ( ! directDispatchTarget.isPresent()) return false; - return directDispatchTarget.get().equals(node); - } - - private boolean usesDirectDispatchTo(Group group) { - if ( ! directDispatchTarget.isPresent()) return false; - return directDispatchTarget.get().group() == group.id(); - } - - /** Used by the cluster monitor to manage node status */ - @Override - public void ping(Node node, Executor executor) { - Pinger pinger = new Pinger(node); - FutureTask futurePong = new FutureTask<>(pinger); - executor.execute(futurePong); - Pong pong = getPong(futurePong, node); - futurePong.cancel(true); - - if (pong.badResponse()) - clusterMonitor.failed(node, pong.getError(0)); - else - clusterMonitor.responded(node); - } - - /** - * Update statistics after a round of issuing pings. - * Note that this doesn't wait for pings to return, so it will typically accumulate data from - * last rounds pinging, or potentially (although unlikely) some combination of new and old data. - */ - @Override - public void pingIterationCompleted() { - int numGroups = orderedGroups.size(); - if (numGroups == 1) { - Group group = groups.values().iterator().next(); - group.aggregateActiveDocuments(); - updateSufficientCoverage(group, true); // by definition - return; - } - - // Update active documents per group and use it to decide if the group should be active - - long[] activeDocumentsInGroup = new long[numGroups]; - long sumOfActiveDocuments = 0; - for(int i = 0; i < numGroups; i++) { - Group group = orderedGroups.get(i); - group.aggregateActiveDocuments(); - activeDocumentsInGroup[i] = group.getActiveDocuments(); - sumOfActiveDocuments += activeDocumentsInGroup[i]; - } - - for (int i = 0; i < numGroups; i++) { - Group group = orderedGroups.get(i); - long activeDocuments = activeDocumentsInGroup[i]; - long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); - boolean sufficientCoverage = true; - - if (averageDocumentsInOtherGroups > 0) { - double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups; - sufficientCoverage = coverage >= minActivedocsCoveragePercentage; - } - if (sufficientCoverage) { - sufficientCoverage = isNodeCoverageSufficient(group); - } - updateSufficientCoverage(group, sufficientCoverage); - } - } - - private boolean isNodeCoverageSufficient(Group group) { - int nodesUp = 0; - for (Node node : group.nodes()) { - if (node.isWorking()) { - nodesUp++; - } - } - int nodes = group.nodes().size(); - int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) nodes * (100.0 - minGroupCoverage)) / 100.0); - return nodesUp + nodesAllowedDown >= nodes; - } - - private Pong getPong(FutureTask futurePong, Node node) { - try { - return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.log(Level.WARNING, "Exception pinging " + node, e); - return new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); - } catch (ExecutionException e) { - log.log(Level.WARNING, "Exception pinging " + node, e); - return new Pong(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); - } catch (TimeoutException e) { - return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); - } - } - - private class Pinger implements Callable { - - private final Node node; - - public Pinger(Node node) { - this.node = node; - } - - public Pong call() { - try { - Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), - fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString()); - if (pong.activeDocuments().isPresent()) - node.setActiveDocuments(pong.activeDocuments().get()); - return pong; - } catch (RuntimeException e) { - return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " - + Exceptions.toMessageString(e))); - } - } - - } - - /** A group in a search cluster. This class is multithread safe. */ - public static class Group { - - private final int id; - private final ImmutableList nodes; - - private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true); - private final AtomicLong activeDocuments = new AtomicLong(0); - - public Group(int id, List nodes) { - this.id = id; - this.nodes = ImmutableList.copyOf(nodes); - } - - /** Returns the unique identity of this group */ - public int id() { return id; } - - /** Returns the nodes in this group as an immutable list */ - public ImmutableList nodes() { return nodes; } - - /** - * Returns whether this group has sufficient active documents - * (compared to other groups) that is should receive traffic - */ - public boolean hasSufficientCoverage() { - return hasSufficientCoverage.get(); - } - - void setHasSufficientCoverage(boolean sufficientCoverage) { - hasSufficientCoverage.lazySet(sufficientCoverage); - } - - void aggregateActiveDocuments() { - long activeDocumentsInGroup = 0; - for (Node node : nodes) { - if (node.isWorking()) { - activeDocumentsInGroup += node.getActiveDocuments(); - } - } - activeDocuments.set(activeDocumentsInGroup); - - } - - /** Returns the active documents on this node. If unknown, 0 is returned. */ - long getActiveDocuments() { - return this.activeDocuments.get(); - } - - @Override - public String toString() { return "search group " + id; } - - @Override - public int hashCode() { return id; } - - @Override - public boolean equals(Object other) { - if (other == this) return true; - if (!(other instanceof Group)) return false; - return ((Group) other).id == this.id; - } - - } - - /** A node in a search cluster. This class is multithread safe. */ - public static class Node { - - private final int key; - private final String hostname; - private final int fs4port; - private final int group; - - private final AtomicBoolean working = new AtomicBoolean(true); - private final AtomicLong activeDocuments = new AtomicLong(0); - - public Node(int key, String hostname, int fs4port, int group) { - this.key = key; - this.hostname = hostname; - this.fs4port = fs4port; - this.group = group; - } - - /** Returns the unique and stable distribution key of this node */ - public int key() { return key; } - - public String hostname() { return hostname; } - - public int fs4port() { return fs4port; } - - /** Returns the id of this group this node belongs to */ - public int group() { return group; } - - void setWorking(boolean working) { - this.working.lazySet(working); - } - - /** Returns whether this node is currently responding to requests */ - public boolean isWorking() { return working.get(); } - - /** Updates the active documents on this node */ - void setActiveDocuments(long activeDocuments) { - this.activeDocuments.set(activeDocuments); - } - - /** Returns the active documents on this node. If unknown, 0 is returned. */ - public long getActiveDocuments() { - return this.activeDocuments.get(); - } - - @Override - public int hashCode() { return Objects.hash(hostname, fs4port); } - - @Override - public boolean equals(Object o) { - if (o == this) return true; - if ( ! (o instanceof Node)) return false; - Node other = (Node)o; - if ( ! Objects.equals(this.hostname, other.hostname)) return false; - if ( ! Objects.equals(this.fs4port, other.fs4port)) return false; - return true; - } - - @Override - public String toString() { return "search node " + hostname + ":" + fs4port + " in group " + group; } - - } - -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java index 57f06225d27..6800a80b78f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java @@ -3,7 +3,9 @@ package com.yahoo.search.dispatch; import com.google.common.collect.ImmutableCollection; import com.yahoo.collections.Pair; -import com.yahoo.search.dispatch.SearchCluster.Group; +import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.ArrayList; import java.util.Collection; @@ -37,7 +39,7 @@ public class SearchPath { * @return list of nodes chosen with the search path, or an empty list in which * case some other node selection logic should be used */ - public static List selectNodes(String searchPath, SearchCluster cluster) { + public static List selectNodes(String searchPath, SearchCluster cluster) { Optional sp = SearchPath.fromString(searchPath); if (sp.isPresent()) { return sp.get().mapToNodes(cluster); @@ -46,7 +48,7 @@ public class SearchPath { } } - public static Optional fromString(String path) { + static Optional fromString(String path) { if (path == null || path.isEmpty()) { return Optional.empty(); } @@ -73,23 +75,23 @@ public class SearchPath { this.group = group; } - private List mapToNodes(SearchCluster cluster) { + private List mapToNodes(SearchCluster cluster) { if (cluster.groups().isEmpty()) { return Collections.emptyList(); } - SearchCluster.Group selectedGroup = selectGroup(cluster); + Group selectedGroup = selectGroup(cluster); if (nodes.isEmpty()) { return selectedGroup.nodes(); } - List groupNodes = selectedGroup.nodes(); + List groupNodes = selectedGroup.nodes(); Set wanted = new HashSet<>(); int max = groupNodes.size(); for (NodeSelection node : nodes) { wanted.addAll(node.matches(max)); } - List ret = new ArrayList<>(); + List ret = new ArrayList<>(); for (int idx : wanted) { ret.add(groupNodes.get(idx)); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java new file mode 100644 index 00000000000..01cbc5cd307 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -0,0 +1,75 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A group in a search cluster. This class is multithread safe. + * + * @author bratseth + * @author ollivir + */ +public class Group { + + private final int id; + private final ImmutableList nodes; + + private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true); + private final AtomicLong activeDocuments = new AtomicLong(0); + + public Group(int id, List nodes) { + this.id = id; + this.nodes = ImmutableList.copyOf(nodes); + } + + /** Returns the unique identity of this group */ + public int id() { return id; } + + /** Returns the nodes in this group as an immutable list */ + public ImmutableList nodes() { return nodes; } + + /** + * Returns whether this group has sufficient active documents + * (compared to other groups) that is should receive traffic + */ + public boolean hasSufficientCoverage() { + return hasSufficientCoverage.get(); + } + + void setHasSufficientCoverage(boolean sufficientCoverage) { + hasSufficientCoverage.lazySet(sufficientCoverage); + } + + void aggregateActiveDocuments() { + long activeDocumentsInGroup = 0; + for (Node node : nodes) { + if (node.isWorking()) { + activeDocumentsInGroup += node.getActiveDocuments(); + } + } + activeDocuments.set(activeDocumentsInGroup); + + } + + /** Returns the active documents on this node. If unknown, 0 is returned. */ + long getActiveDocuments() { + return this.activeDocuments.get(); + } + + @Override + public String toString() { return "search group " + id; } + + @Override + public int hashCode() { return id; } + + @Override + public boolean equals(Object other) { + if (other == this) return true; + if (!(other instanceof Group)) return false; + return ((Group) other).id == this.id; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java new file mode 100644 index 00000000000..98deb9e3199 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -0,0 +1,73 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A node in a search cluster. This class is multithread safe. + * + * @author bratseth + * @author ollivir + */ +public class Node { + + private final int key; + private final String hostname; + private final int fs4port; + final int group; + + private final AtomicBoolean working = new AtomicBoolean(true); + private final AtomicLong activeDocuments = new AtomicLong(0); + + public Node(int key, String hostname, int fs4port, int group) { + this.key = key; + this.hostname = hostname; + this.fs4port = fs4port; + this.group = group; + } + + /** Returns the unique and stable distribution key of this node */ + public int key() { return key; } + + public String hostname() { return hostname; } + + public int fs4port() { return fs4port; } + + /** Returns the id of this group this node belongs to */ + public int group() { return group; } + + public void setWorking(boolean working) { + this.working.lazySet(working); + } + + /** Returns whether this node is currently responding to requests */ + public boolean isWorking() { return working.get(); } + + /** Updates the active documents on this node */ + void setActiveDocuments(long activeDocuments) { + this.activeDocuments.set(activeDocuments); + } + + /** Returns the active documents on this node. If unknown, 0 is returned. */ + public long getActiveDocuments() { + return this.activeDocuments.get(); + } + + @Override + public int hashCode() { return Objects.hash(hostname, fs4port); } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if ( ! (o instanceof Node)) return false; + Node other = (Node)o; + if ( ! Objects.equals(this.hostname, other.hostname)) return false; + if ( ! Objects.equals(this.fs4port, other.fs4port)) return false; + return true; + } + + @Override + public String toString() { return "search node " + hostname + ":" + fs4port + " in group " + group; } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java new file mode 100644 index 00000000000..7c7a9cb1d1c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java @@ -0,0 +1,42 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; +import com.yahoo.prelude.fastsearch.FastSearcher; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.yolean.Exceptions; + +import java.util.concurrent.Callable; + +/** + * @author bratseth + * @author ollivir + */ +class Pinger implements Callable { + private final Node node; + private final ClusterMonitor clusterMonitor; + private final FS4ResourcePool fs4ResourcePool; + + public Pinger(Node node, ClusterMonitor clusterMonitor, FS4ResourcePool fs4ResourcePool) { + this.node = node; + this.clusterMonitor = clusterMonitor; + this.fs4ResourcePool = fs4ResourcePool; + } + + public Pong call() { + try { + Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), + fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString()); + if (pong.activeDocuments().isPresent()) + node.setActiveDocuments(pong.activeDocuments().get()); + return pong; + } catch (RuntimeException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + + Exceptions.toMessageString(e))); + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java new file mode 100644 index 00000000000..b0e63d20931 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -0,0 +1,343 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.net.HostName; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.cluster.NodeManager; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.vespa.config.search.DispatchConfig; +import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * A model of a search cluster we might want to dispatch queries to. + * + * @author bratseth + */ +public class SearchCluster implements NodeManager { + + private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); + + /** The min active docs a group must have to be considered up, as a % of the average active docs of the other groups */ + private final double minActivedocsCoveragePercentage; + private final double minGroupCoverage; + private final int maxNodesDownPerGroup; + private final int size; + private final ImmutableMap groups; + private final ImmutableMultimap nodesByHost; + private final ImmutableList orderedGroups; + private final ClusterMonitor clusterMonitor; + private final VipStatus vipStatus; + + /** + * A search node on this local machine having the entire corpus, which we therefore + * should prefer to dispatch directly to, or empty if there is no such local search node. + * If there is one, we also maintain the VIP status of this container based on the availability + * of the corpus on this local node (up + has coverage), such that this node is taken out of rotation + * if it only queries this cluster when the local node cannot be used, to avoid unnecessary + * cross-node network traffic. + */ + private final Optional directDispatchTarget; + + // Only needed until query requests are moved to rpc + private final FS4ResourcePool fs4ResourcePool; + + public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { + this(dispatchConfig.minActivedocsPercentage(), dispatchConfig.minGroupCoverage(), dispatchConfig.maxNodesDownPerGroup(), + toNodes(dispatchConfig), fs4ResourcePool, containerClusterSize, vipStatus); + } + + public SearchCluster(double minActivedocsCoverage, double minGroupCoverage, int maxNodesDownPerGroup, List nodes, FS4ResourcePool fs4ResourcePool, + int containerClusterSize, VipStatus vipStatus) { + this.minActivedocsCoveragePercentage = minActivedocsCoverage; + this.minGroupCoverage = minGroupCoverage; + this.maxNodesDownPerGroup = maxNodesDownPerGroup; + this.size = nodes.size(); + this.fs4ResourcePool = fs4ResourcePool; + this.vipStatus = vipStatus; + + // Create groups + ImmutableMap.Builder groupsBuilder = new ImmutableMap.Builder<>(); + for (Map.Entry> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { + Group g = new Group(group.getKey(), group.getValue()); + groupsBuilder.put(group.getKey(), g); + } + this.groups = groupsBuilder.build(); + LinkedHashMap groupIntroductionOrder = new LinkedHashMap<>(); + nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group))); + this.orderedGroups = ImmutableList.builder().addAll(groupIntroductionOrder.values()).build(); + + // Index nodes by host + ImmutableMultimap.Builder nodesByHostBuilder = new ImmutableMultimap.Builder<>(); + for (Node node : nodes) + nodesByHostBuilder.put(node.hostname(), node); + this.nodesByHost = nodesByHostBuilder.build(); + + this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize, + nodesByHost, groups); + + // Set up monitoring of the fs4 interface of the nodes + // We can switch to monitoring the rpc interface instead when we move the query phase to rpc + this.clusterMonitor = new ClusterMonitor<>(this); + for (Node node : nodes) { + // cluster monitor will only call working() when the + // node transitions from down to up, so we need to + // register the initial (working) state here: + working(node); + clusterMonitor.add(node, true); + } + } + + private static Optional findDirectDispatchTarget(String selfHostname, + int searchClusterSize, + int containerClusterSize, + ImmutableMultimapnodesByHost, + ImmutableMap groups) { + // A search node in the search cluster in question is configured on the same host as the currently running container. + // It has all the data <==> No other nodes in the search cluster have the same group id as this node. + // That local search node responds. + // The search cluster to be searched has at least as many nodes as the container cluster we're running in. + ImmutableCollection localSearchNodes = nodesByHost.get(selfHostname); + // Only use direct dispatch if we have exactly 1 search node on the same machine: + if (localSearchNodes.size() != 1) return Optional.empty(); + + Node localSearchNode = localSearchNodes.iterator().next(); + Group localSearchGroup = groups.get(localSearchNode.group()); + + // Only use direct dispatch if the local search node has the entire corpus + if (localSearchGroup.nodes().size() != 1) return Optional.empty(); + + // Only use direct dispatch if this container cluster has at least as many nodes as the search cluster + // to avoid load skew/preserve fanout in the case where a subset of the search nodes are also containers. + // This disregards the case where the search and container clusters are partially overlapping. + // Such configurations produce skewed load in any case. + if (containerClusterSize < searchClusterSize) return Optional.empty(); + + return Optional.of(localSearchNode); + } + + private static ImmutableList toNodes(DispatchConfig dispatchConfig) { + ImmutableList.Builder nodesBuilder = new ImmutableList.Builder<>(); + for (DispatchConfig.Node node : dispatchConfig.node()) + nodesBuilder.add(new Node(node.key(), node.host(), node.fs4port(), node.group())); + return nodesBuilder.build(); + } + + /** Returns the number of nodes in this cluster (across all groups) */ + public int size() { return size; } + + /** Returns the groups of this cluster as an immutable map indexed by group id */ + public ImmutableMap groups() { return groups; } + + /** Returns the groups of this cluster as an immutable list in introduction order */ + public ImmutableList orderedGroups() { return orderedGroups; } + + /** Returns the n'th (zero-indexed) group in the cluster if possible */ + public Optional group(int n) { + if (orderedGroups.size() > n) { + return Optional.of(orderedGroups.get(n)); + } else { + return Optional.empty(); + } + } + + /** Returns the number of nodes per group - size()/groups.size() */ + public int groupSize() { + if (groups.size() == 0) return size(); + return size() / groups.size(); + } + + /** + * Returns the nodes of this cluster as an immutable map indexed by host. + * One host may contain multiple nodes (on different ports), so this is a multi-map. + */ + public ImmutableMultimap nodesByHost() { return nodesByHost; } + + /** + * Returns the recipient we should dispatch queries directly to (bypassing fdispatch), + * or empty if we should not dispatch directly. + */ + public Optional directDispatchTarget() { + if ( ! directDispatchTarget.isPresent()) return Optional.empty(); + + // Only use direct dispatch if the local group has sufficient coverage + Group localSearchGroup = groups.get(directDispatchTarget.get().group()); + if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); + + // Only use direct dispatch if the local search node is up + if ( ! directDispatchTarget.get().isWorking()) return Optional.empty(); + + return directDispatchTarget; + } + + /** Used by the cluster monitor to manage node status */ + @Override + public void working(Node node) { + node.setWorking(true); + + if (usesDirectDispatchTo(node)) + vipStatus.addToRotation(this); + } + + /** Used by the cluster monitor to manage node status */ + @Override + public void failed(Node node) { + node.setWorking(false); + + // Take ourselves out if we usually dispatch only to our own host + if (usesDirectDispatchTo(node)) + vipStatus.removeFromRotation(this); + } + + private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { + // update VIP status if we direct dispatch to this group and coverage status changed + if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) { + if (sufficientCoverage) { + vipStatus.addToRotation(this); + } else { + vipStatus.removeFromRotation(this); + } + } + group.setHasSufficientCoverage(sufficientCoverage); + } + + private boolean usesDirectDispatchTo(Node node) { + if ( ! directDispatchTarget.isPresent()) return false; + return directDispatchTarget.get().equals(node); + } + + private boolean usesDirectDispatchTo(Group group) { + if ( ! directDispatchTarget.isPresent()) return false; + return directDispatchTarget.get().group() == group.id(); + } + + /** Used by the cluster monitor to manage node status */ + @Override + public void ping(Node node, Executor executor) { + Pinger pinger = new Pinger(node, clusterMonitor, fs4ResourcePool); + FutureTask futurePong = new FutureTask<>(pinger); + executor.execute(futurePong); + Pong pong = getPong(futurePong, node); + futurePong.cancel(true); + + if (pong.badResponse()) + clusterMonitor.failed(node, pong.getError(0)); + else + clusterMonitor.responded(node); + } + + /** + * Update statistics after a round of issuing pings. + * Note that this doesn't wait for pings to return, so it will typically accumulate data from + * last rounds pinging, or potentially (although unlikely) some combination of new and old data. + */ + @Override + public void pingIterationCompleted() { + int numGroups = orderedGroups.size(); + if (numGroups == 1) { + Group group = groups.values().iterator().next(); + group.aggregateActiveDocuments(); + updateSufficientCoverage(group, true); // by definition + return; + } + + // Update active documents per group and use it to decide if the group should be active + + long[] activeDocumentsInGroup = new long[numGroups]; + long sumOfActiveDocuments = 0; + for(int i = 0; i < numGroups; i++) { + Group group = orderedGroups.get(i); + group.aggregateActiveDocuments(); + activeDocumentsInGroup[i] = group.getActiveDocuments(); + sumOfActiveDocuments += activeDocumentsInGroup[i]; + } + + for (int i = 0; i < numGroups; i++) { + Group group = orderedGroups.get(i); + long activeDocuments = activeDocumentsInGroup[i]; + long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); + boolean sufficientCoverage = isGroupCoverageSufficient(group.nodes(), activeDocuments, averageDocumentsInOtherGroups); + updateSufficientCoverage(group, sufficientCoverage); + } + } + + private boolean isGroupCoverageSufficient(List nodes, long activeDocuments, long averageDocumentsInOtherGroups) { + boolean sufficientCoverage = true; + + if (averageDocumentsInOtherGroups > 0) { + double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups; + sufficientCoverage = coverage >= minActivedocsCoveragePercentage; + } + if (sufficientCoverage) { + sufficientCoverage = isGroupNodeCoverageSufficient(nodes); + } + return sufficientCoverage; + } + + private boolean isGroupNodeCoverageSufficient(List nodes) { + int nodesUp = 0; + for (Node node : nodes) { + if (node.isWorking()) { + nodesUp++; + } + } + int numNodes = nodes.size(); + int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) numNodes * (100.0 - minGroupCoverage)) / 100.0); + return nodesUp + nodesAllowedDown >= numNodes; + } + + private Pong getPong(FutureTask futurePong, Node node) { + try { + return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.log(Level.WARNING, "Exception pinging " + node, e); + return new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); + } catch (ExecutionException e) { + log.log(Level.WARNING, "Exception pinging " + node, e); + return new Pong(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); + } catch (TimeoutException e) { + return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); + } + } + + /** + * Calculate whether a subset of nodes in a group has enough coverage + */ + public boolean isPartialGroupCoverageSufficient(int groupId, List nodes) { + if (orderedGroups.size() == 1) { + return true; + } + long sumOfActiveDocuments = 0; + int otherGroups = 0; + for (Group g : orderedGroups) { + if (g.id() != groupId) { + sumOfActiveDocuments += g.getActiveDocuments(); + otherGroups++; + } + } + long activeDocuments = 0; + for (Node n : nodes) { + activeDocuments += n.getActiveDocuments(); + } + long averageDocumentsInOtherGroups = sumOfActiveDocuments / otherGroups; + return isGroupCoverageSufficient(nodes, activeDocuments, averageDocumentsInOtherGroups); + } +} diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java index b08aef6ecb1..79b43563c6a 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java @@ -5,26 +5,37 @@ import com.google.common.collect.ImmutableList; import com.yahoo.component.chain.Chain; import com.yahoo.config.subscription.ConfigGetter; import com.yahoo.container.handler.VipStatus; +import com.yahoo.container.protect.Error; import com.yahoo.container.search.Fs4Config; import com.yahoo.document.GlobalId; -import com.yahoo.fs4.mplex.*; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.BackendTestCase; import com.yahoo.fs4.test.QueryTestCase; import com.yahoo.language.simple.SimpleLinguistics; import com.yahoo.prelude.Ping; import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.CacheControl; +import com.yahoo.prelude.fastsearch.CacheKey; +import com.yahoo.prelude.fastsearch.CacheParams; +import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; -import com.yahoo.container.protect.Error; -import com.yahoo.fs4.*; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.prelude.fastsearch.FastSearcher; +import com.yahoo.prelude.fastsearch.PacketWrapper; +import com.yahoo.prelude.fastsearch.SummaryParameters; import com.yahoo.prelude.fastsearch.test.fs4mock.MockBackend; import com.yahoo.prelude.fastsearch.test.fs4mock.MockFS4ResourcePool; import com.yahoo.prelude.fastsearch.test.fs4mock.MockFSChannel; +import com.yahoo.prelude.query.WordItem; import com.yahoo.processing.execution.Execution.Trace; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.Searcher; -import com.yahoo.prelude.fastsearch.*; -import com.yahoo.prelude.query.WordItem; -import com.yahoo.search.dispatch.SearchCluster; +import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.AllOperation; import com.yahoo.search.grouping.request.EachOperation; @@ -48,9 +59,13 @@ import java.util.logging.Level; import java.util.logging.Logger; import static org.hamcrest.CoreMatchers.containsString; -import static org.junit.Assert.*; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * Tests the Fast searcher @@ -87,7 +102,7 @@ public class FastSearcherTestCase { @Test public void testNullQuery() { Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); - FastSearcher fastSearcher = new FastSearcher(new MockBackend(), + FastSearcher fastSearcher = new FastSearcher(new MockBackend(), new FS4ResourcePool(1), new MockDispatcher(Collections.emptyList()), new SummaryParameters(null), @@ -115,9 +130,9 @@ public class FastSearcherTestCase { .rankprofile(new DocumentdbInfoConfig.Documentdb.Rankprofile.Builder() .name("simpler").hasRankFeatures(false).hasSummaryFeatures(false)))); - List nodes = new ArrayList<>(); - nodes.add(new SearchCluster.Node(0, "host1", 5000, 0)); - nodes.add(new SearchCluster.Node(2, "host2", 5000, 0)); + List nodes = new ArrayList<>(); + nodes.add(new Node(0, "host1", 5000, 0)); + nodes.add(new Node(2, "host2", 5000, 0)); MockFS4ResourcePool mockFs4ResourcePool = new MockFS4ResourcePool(); FastSearcher fastSearcher = new FastSearcher(new MockBackend(), @@ -162,15 +177,15 @@ public class FastSearcherTestCase { new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder().documentdb(new DocumentdbInfoConfig.Documentdb.Builder().name("testDb"))); FastSearcher fastSearcher = new FastSearcher(mockBackend, new FS4ResourcePool(1), - new MockDispatcher(Collections.emptyList()), + new MockDispatcher(Collections.emptyList()), new SummaryParameters(null), new ClusterParams("testhittype"), - new CacheParams(100, 1e64), + new CacheParams(100, 1e64), documentdbConfigWithOneDb); Query query = new Query("?query=foo&model.restrict=testDb"); query.prepare(); - Result result = doSearch(fastSearcher, query, 0, 10); + doSearch(fastSearcher, query, 0, 10); Packet receivedPacket = mockBackend.getChannel().getLastQueryPacket(); byte[] encoded = QueryTestCase.packetToBytes(receivedPacket); @@ -354,10 +369,10 @@ public class FastSearcherTestCase { Logger.getLogger(FastSearcher.class.getName()).setLevel(Level.ALL); return new FastSearcher(mockBackend, new FS4ResourcePool(1), - new MockDispatcher(Collections.emptyList()), + new MockDispatcher(Collections.emptyList()), new SummaryParameters(null), - new ClusterParams("testhittype"), - new CacheParams(100, 1e64), + new ClusterParams("testhittype"), + new CacheParams(100, 1e64), config); } @@ -436,12 +451,12 @@ public class FastSearcherTestCase { } } - + @Test public void testSinglePassGroupingIsForcedWithSingleNodeGroups() { FastSearcher fastSearcher = new FastSearcher(new MockBackend(), new FS4ResourcePool(1), - new MockDispatcher(new SearchCluster.Node(0, "host0", 123, 0)), + new MockDispatcher(new Node(0, "host0", 123, 0)), new SummaryParameters(null), new ClusterParams("testhittype"), new CacheParams(100, 1e64), @@ -455,17 +470,17 @@ public class FastSearcherTestCase { all.addChild(new EachOperation()); all.addChild(new EachOperation()); request2.setRootOperation(all); - + assertForceSinglePassIs(false, q); fastSearcher.search(q, new Execution(Execution.Context.createContextStub())); - assertForceSinglePassIs(true, q); + assertForceSinglePassIs(true, q); } @Test public void testSinglePassGroupingIsNotForcedWithSingleNodeGroups() { - MockDispatcher dispatcher = - new MockDispatcher(ImmutableList.of(new SearchCluster.Node(0, "host0", 123, 0), - new SearchCluster.Node(2, "host1", 123, 0))); + MockDispatcher dispatcher = + new MockDispatcher(ImmutableList.of(new Node(0, "host0", 123, 0), + new Node(2, "host1", 123, 0))); FastSearcher fastSearcher = new FastSearcher(new MockBackend(), new FS4ResourcePool(1), @@ -495,7 +510,7 @@ public class FastSearcherTestCase { } private void assertForceSinglePassIs(boolean expected, GroupingOperation operation) { - assertEquals("Force single pass is " + expected + " in " + operation, + assertEquals("Force single pass is " + expected + " in " + operation, expected, operation.getForceSinglePass()); for (GroupingOperation child : operation.getChildren()) assertForceSinglePassIs(expected, child); diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java index 4f6d2d88917..12c313dbfe3 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTester.java @@ -12,16 +12,14 @@ import com.yahoo.prelude.fastsearch.FastSearcher; import com.yahoo.prelude.fastsearch.SummaryParameters; import com.yahoo.prelude.fastsearch.test.fs4mock.MockBackend; import com.yahoo.prelude.fastsearch.test.fs4mock.MockFS4ResourcePool; -import com.yahoo.prelude.fastsearch.test.fs4mock.MockFSChannel; import com.yahoo.search.Query; import com.yahoo.search.Result; -import com.yahoo.search.dispatch.SearchCluster; +import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.searchchain.Execution; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -38,7 +36,7 @@ class FastSearcherTester { private final MockDispatcher mockDispatcher; private final VipStatus vipStatus; - public FastSearcherTester(int containerClusterSize, SearchCluster.Node searchNode) { + public FastSearcherTester(int containerClusterSize, Node searchNode) { this(containerClusterSize, Collections.singletonList(searchNode)); } @@ -46,7 +44,7 @@ class FastSearcherTester { this(containerClusterSize, toNodes(hostAndPortAndGroupStrings)); } - public FastSearcherTester(int containerClusterSize, List searchNodes) { + public FastSearcherTester(int containerClusterSize, List searchNodes) { ClustersStatus clustersStatus = new ClustersStatus(); clustersStatus.setContainerHasClusters(true); vipStatus = new VipStatus(clustersStatus); @@ -61,12 +59,12 @@ class FastSearcherTester { new DocumentdbInfoConfig(new DocumentdbInfoConfig.Builder())); } - private static List toNodes(String... hostAndPortAndGroupStrings) { - List nodes = new ArrayList<>(); + private static List toNodes(String... hostAndPortAndGroupStrings) { + List nodes = new ArrayList<>(); int key = 0; for (String s : hostAndPortAndGroupStrings) { String[] parts = s.split(":"); - nodes.add(new SearchCluster.Node(key++, parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]))); + nodes.add(new Node(key++, parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]))); } return nodes; } @@ -90,7 +88,7 @@ class FastSearcherTester { mockFS4ResourcePool.setResponding(hostname, responding); // Make the search cluster monitor notice right now in this thread - SearchCluster.Node node = mockDispatcher.searchCluster().nodesByHost().get(hostname).iterator().next(); + Node node = mockDispatcher.searchCluster().nodesByHost().get(hostname).iterator().next(); mockDispatcher.searchCluster().ping(node, MoreExecutors.directExecutor()); } @@ -99,7 +97,7 @@ class FastSearcherTester { mockFS4ResourcePool.setActiveDocuments(hostname, activeDocuments); // Make the search cluster monitor notice right now in this thread - SearchCluster.Node node = mockDispatcher.searchCluster().nodesByHost().get(hostname).iterator().next(); + Node node = mockDispatcher.searchCluster().nodesByHost().get(hostname).iterator().next(); mockDispatcher.searchCluster().ping(node, MoreExecutors.directExecutor()); mockDispatcher.searchCluster().pingIterationCompleted(); } diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java index 2b25b2a3796..800b1bc21f0 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java @@ -5,7 +5,7 @@ import com.yahoo.container.handler.VipStatus; import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.search.Result; import com.yahoo.search.dispatch.Dispatcher; -import com.yahoo.search.dispatch.SearchCluster; +import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Collections; @@ -13,27 +13,27 @@ import java.util.List; class MockDispatcher extends Dispatcher { - public MockDispatcher(SearchCluster.Node node) { + public MockDispatcher(Node node) { this(Collections.singletonList(node)); } - public MockDispatcher(List nodes) { + public MockDispatcher(List nodes) { super(toDispatchConfig(nodes), new FS4ResourcePool(1), 1, new VipStatus()); } - public MockDispatcher(List nodes, VipStatus vipStatus) { + public MockDispatcher(List nodes, VipStatus vipStatus) { super(toDispatchConfig(nodes), new FS4ResourcePool(1), 1, vipStatus); } - public MockDispatcher(List nodes, FS4ResourcePool fs4ResourcePool, + public MockDispatcher(List nodes, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { super(toDispatchConfig(nodes), fs4ResourcePool, containerClusterSize, vipStatus); } - private static DispatchConfig toDispatchConfig(List nodes) { + private static DispatchConfig toDispatchConfig(List nodes) { DispatchConfig.Builder dispatchConfigBuilder = new DispatchConfig.Builder(); int key = 0; - for (SearchCluster.Node node : nodes) { + for (Node node : nodes) { DispatchConfig.Node.Builder dispatchConfigNodeBuilder = new DispatchConfig.Node.Builder(); dispatchConfigNodeBuilder.host(node.hostname()); dispatchConfigNodeBuilder.fs4port(node.fs4port()); @@ -44,7 +44,7 @@ class MockDispatcher extends Dispatcher { } return new DispatchConfig(dispatchConfigBuilder); } - + public void fill(Result result, String summaryClass) { } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java index 9311ddab3c6..698cee743e4 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java @@ -2,8 +2,9 @@ package com.yahoo.search.dispatch; import com.yahoo.search.Query; -import com.yahoo.search.dispatch.SearchCluster.Group; -import com.yahoo.search.dispatch.SearchCluster.Node; +import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; import junit.framework.AssertionFailedError; import org.junit.Test; @@ -21,11 +22,11 @@ import static org.junit.Assert.assertThat; public class LoadBalancerTest { @Test public void requreThatLoadBalancerServesSingleNodeSetups() { - Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); + Node n1 = new Node(0, "test-node1", 0, 0); SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster, true); - Optional grp = lb.takeGroupForQuery(new Query()); + Optional grp = lb.takeGroupForQuery(new Query(), null); Group group = grp.orElseGet(() -> { throw new AssertionFailedError("Expected a SearchCluster.Group"); }); @@ -34,12 +35,12 @@ public class LoadBalancerTest { @Test public void requreThatLoadBalancerServesMultiGroupSetups() { - Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); - Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); + Node n1 = new Node(0, "test-node1", 0, 0); + Node n2 = new Node(1, "test-node2", 1, 1); SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1, n2), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster, true); - Optional grp = lb.takeGroupForQuery(new Query()); + Optional grp = lb.takeGroupForQuery(new Query(), null); Group group = grp.orElseGet(() -> { throw new AssertionFailedError("Expected a SearchCluster.Group"); }); @@ -48,51 +49,51 @@ public class LoadBalancerTest { @Test public void requreThatLoadBalancerServesClusteredGroups() { - Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); - Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0); - Node n3 = new SearchCluster.Node(0, "test-node3", 0, 1); - Node n4 = new SearchCluster.Node(1, "test-node4", 1, 1); + Node n1 = new Node(0, "test-node1", 0, 0); + Node n2 = new Node(1, "test-node2", 1, 0); + Node n3 = new Node(0, "test-node3", 0, 1); + Node n4 = new Node(1, "test-node4", 1, 1); SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1, n2, n3, n4), null, 2, null); LoadBalancer lb = new LoadBalancer(cluster, true); - Optional grp = lb.takeGroupForQuery(new Query()); + Optional grp = lb.takeGroupForQuery(new Query(), null); assertThat(grp.isPresent(), is(true)); } @Test public void requreThatLoadBalancerReturnsDifferentGroups() { - Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); - Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); + Node n1 = new Node(0, "test-node1", 0, 0); + Node n2 = new Node(1, "test-node2", 1, 1); SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1, n2), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster, true); // get first group - Optional grp = lb.takeGroupForQuery(new Query()); + Optional grp = lb.takeGroupForQuery(new Query(), null); Group group = grp.get(); int id1 = group.id(); // release allocation lb.releaseGroup(group); // get second group - grp = lb.takeGroupForQuery(new Query()); + grp = lb.takeGroupForQuery(new Query(), null); group = grp.get(); assertThat(group.id(), not(equalTo(id1))); } @Test public void requreThatLoadBalancerReturnsGroupWithShortestQueue() { - Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); - Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); + Node n1 = new Node(0, "test-node1", 0, 0); + Node n2 = new Node(1, "test-node2", 1, 1); SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1, n2), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster, true); // get first group - Optional grp = lb.takeGroupForQuery(new Query()); + Optional grp = lb.takeGroupForQuery(new Query(), null); Group group = grp.get(); int id1 = group.id(); // get second group - grp = lb.takeGroupForQuery(new Query()); + grp = lb.takeGroupForQuery(new Query(), null); group = grp.get(); int id2 = group.id(); assertThat(id2, not(equalTo(id1))); @@ -100,7 +101,7 @@ public class LoadBalancerTest { lb.releaseGroup(group); // get third group - grp = lb.takeGroupForQuery(new Query()); + grp = lb.takeGroupForQuery(new Query(), null); group = grp.get(); assertThat(group.id(), equalTo(id2)); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java index ee903fd3fa0..0c0a65ded17 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java @@ -3,6 +3,9 @@ package com.yahoo.search.dispatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; +import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.ArrayList; import java.util.Collections; diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java index 77cb8d5c353..a1f926d3201 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java @@ -2,6 +2,7 @@ package com.yahoo.search.dispatch; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; +import com.yahoo.search.dispatch.searchcluster.Node; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -77,7 +78,7 @@ public class SearchPathTest { assertThat(distKeysAsString(SearchPath.selectNodes("[1,88>/1", cluster)), equalTo("5,6")); } - private static String distKeysAsString(Collection nodes) { - return nodes.stream().map(SearchCluster.Node::key).map(Object::toString).collect(Collectors.joining(",")); + private static String distKeysAsString(Collection nodes) { + return nodes.stream().map(Node::key).map(Object::toString).collect(Collectors.joining(",")); } } -- cgit v1.2.3