diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-04-15 22:19:34 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-04-15 22:19:34 +0200 |
commit | 0c68f849c445c82e75ad439965e1b057cff5181e (patch) | |
tree | 467383d181ad413ef96cd4a1f7f25bfbe405c1a5 /container-search | |
parent | 1eecdac6160ea9f7ed6de5e3fc478bc211561dc2 (diff) |
Avoid a single group rejecting feed
Diffstat (limited to 'container-search')
10 files changed, 108 insertions, 50 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/query/parser/SpecialTokens.java b/container-search/src/main/java/com/yahoo/prelude/query/parser/SpecialTokens.java index c206ff7567e..8020088c2e3 100644 --- a/container-search/src/main/java/com/yahoo/prelude/query/parser/SpecialTokens.java +++ b/container-search/src/main/java/com/yahoo/prelude/query/parser/SpecialTokens.java @@ -159,7 +159,7 @@ public class SpecialTokens { @Override public int hashCode() { return token.hashCode(); } - public Token toToken(int start,String rawSource) { + public Token toToken(int start, String rawSource) { return new Token(Token.Kind.WORD, replace(), true, new Substring(start, start + token.length(), rawSource)); // XXX: Unsafe? } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 9c46d194fb3..869ea6436de 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.stream.Collectors; /** * A dispatcher communicates with search nodes to perform queries and fill hits. @@ -215,7 +216,7 @@ public class Dispatcher extends AbstractComponent { int covered = searchCluster.groupsWithSufficientCoverage(); int groups = searchCluster.orderedGroups().size(); int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS); - Set<Integer> rejected = null; + Set<Integer> rejected = rejectGroupBlockingFeed(searchCluster.orderedGroups()); for (int i = 0; i < max; i++) { Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected); if (groupInCluster.isEmpty()) break; // No groups available @@ -244,4 +245,20 @@ public class Dispatcher extends AbstractComponent { throw new IllegalStateException("No suitable groups to dispatch query. Rejected: " + rejected); } + /** + * We want to avoid groups blocking feed because their data may be out of date. + * If there is a single group blocking feed, we want to reject it. + * If multiple groups are blocking feed we should use them anyway as we may not have remaining + * capacity otherwise. + * + * @return a modifiable set containing the single group to reject, or null otherwise + */ + private Set<Integer> rejectGroupBlockingFeed(List<Group> groups) { + List<Group> groupsRejectingFeed = groups.stream().filter(Group::isBlockingWrites).collect(Collectors.toList()); + if (groupsRejectingFeed.size() != 1) return null; + Set<Integer> rejected = new HashSet<>(); + rejected.add(groupsRejectingFeed.get(0).id()); + return rejected; + } + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index 1c3a90ac6ab..03160e6c9c7 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -43,7 +43,7 @@ public abstract class InvokerFactory { * @param nodes pre-selected list of content nodes * @param acceptIncompleteCoverage if some of the nodes are unavailable and this parameter is * false, verify that the remaining set of nodes has sufficient coverage - * @return Optional containing the SearchInvoker or empty if some node in the + * @return the invoker or empty if some node in the * list is invalid and the remaining coverage is not sufficient */ public Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher, @@ -82,7 +82,7 @@ public abstract class InvokerFactory { if ( ! searchCluster.isPartialGroupCoverageSufficient(groupId, success) && !acceptIncompleteCoverage) { return Optional.empty(); } - if(invokers.size() == 0) { + if (invokers.size() == 0) { return Optional.of(createCoverageErrorInvoker(nodes, failed)); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 210ab5777d2..05e1ea6e2f9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -134,6 +134,7 @@ public class LoadBalancer { } private static class RoundRobinScheduler implements GroupScheduler { + private int needle = 0; private final List<GroupStatus> scoreboard; @@ -204,6 +205,7 @@ public class LoadBalancer { } static class AdaptiveScheduler implements GroupScheduler { + private final Random random; private final List<GroupStatus> scoreboard; @@ -251,4 +253,5 @@ public class LoadBalancer { return selectGroup(needle, false, rejectedGroups); } } + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index 0e4e87b9a6a..ad2581696a7 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -21,6 +21,7 @@ public class Group { private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true); private final AtomicBoolean hasFullCoverage = new AtomicBoolean(true); private final AtomicLong activeDocuments = new AtomicLong(0); + private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false); public Group(int id, List<Node> nodes) { this.id = id; @@ -61,21 +62,16 @@ public class Group { return nodesUp; } - void aggregateActiveDocuments() { - long activeDocumentsInGroup = 0; - for (Node node : nodes) { - if (node.isWorking() == Boolean.TRUE) { - activeDocumentsInGroup += node.getActiveDocuments(); - } - } - activeDocuments.set(activeDocumentsInGroup); - + void aggregateNodeValues() { + activeDocuments.set(nodes.stream().filter(Node::isWorking).mapToLong(Node::getActiveDocuments).sum()); + isBlockingWrites.set(nodes.stream().anyMatch(node -> node.isBlockingWrites())); } /** Returns the active documents on this node. If unknown, 0 is returned. */ - long getActiveDocuments() { - return this.activeDocuments.get(); - } + long getActiveDocuments() { return activeDocuments.get(); } + + /** Returns whether any node in this group is currently blocking write operations */ + public boolean isBlockingWrites() { return isBlockingWrites.get(); } public boolean isFullCoverageStatusChanged(boolean hasFullCoverageNow) { boolean previousState = hasFullCoverage.getAndSet(hasFullCoverageNow); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java index e93b633f09d..8f465070de4 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -23,6 +23,7 @@ public class Node { private final AtomicLong activeDocuments = new AtomicLong(0); private final AtomicLong pingSequence = new AtomicLong(0); private final AtomicLong lastPong = new AtomicLong(0); + private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false); public Node(int key, String hostname, int group) { this.key = key; @@ -70,14 +71,14 @@ public class Node { } /** Updates the active documents on this node */ - void setActiveDocuments(long activeDocuments) { - this.activeDocuments.set(activeDocuments); - } + void setActiveDocuments(long activeDocuments) { this.activeDocuments.set(activeDocuments); } /** Returns the active documents on this node. If unknown, 0 is returned. */ - long getActiveDocuments() { - return activeDocuments.get(); - } + long getActiveDocuments() { return activeDocuments.get(); } + + public void setBlockingWrites(boolean isBlockingWrites) { this.isBlockingWrites.set(isBlockingWrites); } + + boolean isBlockingWrites() { return isBlockingWrites.get(); } @Override public int hashCode() { return Objects.hash(hostname, key, pathIndex, group); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 7862648ba51..27b4472e324 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -91,7 +91,7 @@ public class SearchCluster implements NodeManager<Node> { } public void addMonitoring(ClusterMonitor clusterMonitor) { - for (var group : orderedGroups) { + for (var group : orderedGroups()) { for (var node : group.nodes()) clusterMonitor.add(node, true); } @@ -147,8 +147,8 @@ public class SearchCluster implements NodeManager<Node> { /** Returns the n'th (zero-indexed) group in the cluster if possible */ public Optional<Group> group(int n) { - if (orderedGroups.size() > n) { - return Optional.of(orderedGroups.get(n)); + if (orderedGroups().size() > n) { + return Optional.of(orderedGroups().get(n)); } else { return Optional.empty(); } @@ -156,13 +156,13 @@ public class SearchCluster implements NodeManager<Node> { /** Returns the number of nodes per group - size()/groups.size() */ public int groupSize() { - if (groups.size() == 0) return size(); - return size() / groups.size(); + if (groups().size() == 0) return size(); + return size() / groups().size(); } public int groupsWithSufficientCoverage() { int covered = 0; - for (Group g : orderedGroups) { + for (Group g : orderedGroups()) { if (g.hasSufficientCoverage()) { covered++; } @@ -178,7 +178,7 @@ public class SearchCluster implements NodeManager<Node> { if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage - Group localSearchGroup = groups.get(localCorpusDispatchTarget.get().group()); + Group localSearchGroup = groups().get(localCorpusDispatchTarget.get().group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is not down @@ -257,12 +257,15 @@ public class SearchCluster implements NodeManager<Node> { } private static class PongCallback implements PongHandler { + private final ClusterMonitor<Node> clusterMonitor; private final Node node; + PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) { this.node = node; this.clusterMonitor = clusterMonitor; } + @Override public void handle(Pong pong) { if (pong.badResponse()) { @@ -270,10 +273,12 @@ public class SearchCluster implements NodeManager<Node> { } else { if (pong.activeDocuments().isPresent()) { node.setActiveDocuments(pong.activeDocuments().get()); + node.setBlockingWrites(pong.isBlockingWrites()); } clusterMonitor.responded(node); } } + } /** Used by the cluster monitor to manage node status */ @@ -284,8 +289,8 @@ public class SearchCluster implements NodeManager<Node> { } private void pingIterationCompletedSingleGroup() { - Group group = groups.values().iterator().next(); - group.aggregateActiveDocuments(); + Group group = groups().values().iterator().next(); + group.aggregateNodeValues(); // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. updateSufficientCoverage(group, true); @@ -295,21 +300,20 @@ public class SearchCluster implements NodeManager<Node> { } private void pingIterationCompletedMultipleGroups() { - int numGroups = orderedGroups.size(); + int numGroups = orderedGroups().size(); // Update active documents per group and use it to decide if the group should be active - long[] activeDocumentsInGroup = new long[numGroups]; long sumOfActiveDocuments = 0; for(int i = 0; i < numGroups; i++) { - Group group = orderedGroups.get(i); - group.aggregateActiveDocuments(); + Group group = orderedGroups().get(i); + group.aggregateNodeValues(); activeDocumentsInGroup[i] = group.getActiveDocuments(); sumOfActiveDocuments += activeDocumentsInGroup[i]; } boolean anyGroupsSufficientCoverage = false; for (int i = 0; i < numGroups; i++) { - Group group = orderedGroups.get(i); + Group group = orderedGroups().get(i); long activeDocuments = activeDocumentsInGroup[i]; long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); boolean sufficientCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), activeDocuments, averageDocumentsInOtherGroups); @@ -326,7 +330,7 @@ public class SearchCluster implements NodeManager<Node> { */ @Override public void pingIterationCompleted() { - int numGroups = orderedGroups.size(); + int numGroups = orderedGroups().size(); if (numGroups == 1) { pingIterationCompletedSingleGroup(); } else { @@ -357,7 +361,7 @@ public class SearchCluster implements NodeManager<Node> { * Calculate whether a subset of nodes in a group has enough coverage */ public boolean isPartialGroupCoverageSufficient(OptionalInt knownGroupId, List<Node> nodes) { - if (orderedGroups.size() == 1) { + if (orderedGroups().size() == 1) { boolean sufficient = nodes.size() >= groupSize() - dispatchConfig.maxNodesDownPerGroup(); return sufficient; } @@ -366,14 +370,14 @@ public class SearchCluster implements NodeManager<Node> { return false; } int groupId = knownGroupId.getAsInt(); - Group group = groups.get(groupId); + Group group = groups().get(groupId); if (group == null) { return false; } int nodesInGroup = group.nodes().size(); long sumOfActiveDocuments = 0; int otherGroups = 0; - for (Group g : orderedGroups) { + for (Group g : orderedGroups()) { if (g.id() != groupId) { sumOfActiveDocuments += g.getActiveDocuments(); otherGroups++; diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index 5433a28dd6e..aa7a0c6b4f4 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -35,7 +35,7 @@ public class DispatcherTest { q.getModel().setSearchPath("1/0"); // second node in first group MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (nodes, a) -> { assertEquals(1, nodes.size()); - assertEquals(2, nodes.get(0).key()); + assertEquals(1, nodes.get(0).key()); return true; }); Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric()); @@ -92,6 +92,41 @@ public class DispatcherTest { } } + @Test + public void testGroup1IsSelected() { + SearchCluster cluster = new MockSearchCluster("1", 3, 1); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true), new MockMetric()); + cluster.pingIterationCompleted(); + assertEquals(0, + dispatcher.getSearchInvoker(new Query(), null).distributionKey().get().longValue()); + dispatcher.deconstruct(); + } + + @Test + public void testGroup1IsSkippedWhenItIsBlockingFeed() { + SearchCluster cluster = new MockSearchCluster("1", 3, 1); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true), new MockMetric()); + cluster.group(0).get().nodes().get(0).setBlockingWrites(true); + cluster.pingIterationCompleted(); + assertEquals("Blocking group is avoided", + 1, + (dispatcher.getSearchInvoker(new Query(), null).distributionKey().get()).longValue()); + dispatcher.deconstruct(); + } + + @Test + public void testGroup1IsSelectedWhenMoreAreBlockingFeed() { + SearchCluster cluster = new MockSearchCluster("1", 2, 1); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true), new MockMetric()); + cluster.group(0).get().nodes().get(0).setBlockingWrites(true); + cluster.group(1).get().nodes().get(0).setBlockingWrites(true); + cluster.pingIterationCompleted(); + assertEquals("Blocking group is used when multiple groups are blocking", + 0, + dispatcher.getSearchInvoker(new Query(), null).distributionKey().get().longValue()); + dispatcher.deconstruct(); + } + interface FactoryStep { boolean returnInvoker(List<Node> nodes, boolean acceptIncompleteCoverage); } @@ -119,7 +154,7 @@ public class DispatcherTest { boolean nonEmpty = events[step].returnInvoker(nodes, acceptIncompleteCoverage); step++; if (nonEmpty) { - return Optional.of(new MockInvoker(1)); + return Optional.of(new MockInvoker(nodes.get(0).key())); } else { return Optional.empty(); } @@ -150,4 +185,5 @@ public class DispatcherTest { return null; } } + } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java index 32c6738fc3b..cebe5c5e2ba 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java @@ -35,15 +35,15 @@ public class MockSearchCluster extends SearchCluster { ImmutableList.Builder<Group> orderedGroupBuilder = ImmutableList.builder(); ImmutableMap.Builder<Integer, Group> groupBuilder = ImmutableMap.builder(); ImmutableMultimap.Builder<String, Node> hostBuilder = ImmutableMultimap.builder(); - int dk = 1; + int distributionKey = 0; for (int group = 0; group < groups; group++) { List<Node> nodes = new ArrayList<>(); for (int node = 0; node < nodesPerGroup; node++) { - Node n = new Node(dk, "host" + dk, group); + Node n = new Node(distributionKey, "host" + distributionKey, group); n.setWorking(true); nodes.add(n); hostBuilder.put(n.hostname(), n); - dk++; + distributionKey++; } Group g = new Group(group, nodes); groupBuilder.put(group, g); diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java index 5a4457780e2..58042dcf228 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertThat; * @author ollivir */ public class SearchPathTest { + @Test public void requreThatSearchPathsAreParsedCorrectly() { assertThat(SearchPath.fromString("0/0").get().toString(), equalTo("0/0")); @@ -71,11 +72,11 @@ public class SearchPathTest { public void searchPathMustFilterNodesBasedOnDefinition() { MockSearchCluster cluster = new MockSearchCluster("a",3, 3); - assertThat(distKeysAsString(SearchPath.selectNodes("1/1", cluster)), equalTo("5")); - assertThat(distKeysAsString(SearchPath.selectNodes("/1", cluster)), equalTo("4,5,6")); - assertThat(distKeysAsString(SearchPath.selectNodes("0,1/2", cluster)), equalTo("7,8")); - assertThat(distKeysAsString(SearchPath.selectNodes("[1,3>/1", cluster)), equalTo("5,6")); - assertThat(distKeysAsString(SearchPath.selectNodes("[1,88>/1", cluster)), equalTo("5,6")); + assertThat(distKeysAsString(SearchPath.selectNodes("1/1", cluster)), equalTo("4")); + assertThat(distKeysAsString(SearchPath.selectNodes("/1", cluster)), equalTo("3,4,5")); + assertThat(distKeysAsString(SearchPath.selectNodes("0,1/2", cluster)), equalTo("6,7")); + assertThat(distKeysAsString(SearchPath.selectNodes("[1,3>/1", cluster)), equalTo("4,5")); + assertThat(distKeysAsString(SearchPath.selectNodes("[1,88>/1", cluster)), equalTo("4,5")); } private static String distKeysAsString(Collection<Node> nodes) { |