summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-04-15 22:19:34 +0200
committerJon Bratseth <bratseth@gmail.com>2020-04-15 22:19:34 +0200
commit0c68f849c445c82e75ad439965e1b057cff5181e (patch)
tree467383d181ad413ef96cd4a1f7f25bfbe405c1a5 /container-search
parent1eecdac6160ea9f7ed6de5e3fc478bc211561dc2 (diff)
Avoid a single group rejecting feed
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/query/parser/SpecialTokens.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java19
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java4
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java3
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java20
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java40
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java40
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java6
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java11
10 files changed, 108 insertions, 50 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/query/parser/SpecialTokens.java b/container-search/src/main/java/com/yahoo/prelude/query/parser/SpecialTokens.java
index c206ff7567e..8020088c2e3 100644
--- a/container-search/src/main/java/com/yahoo/prelude/query/parser/SpecialTokens.java
+++ b/container-search/src/main/java/com/yahoo/prelude/query/parser/SpecialTokens.java
@@ -159,7 +159,7 @@ public class SpecialTokens {
@Override
public int hashCode() { return token.hashCode(); }
- public Token toToken(int start,String rawSource) {
+ public Token toToken(int start, String rawSource) {
return new Token(Token.Kind.WORD, replace(), true, new Substring(start, start + token.length(), rawSource)); // XXX: Unsafe?
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 9c46d194fb3..869ea6436de 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* A dispatcher communicates with search nodes to perform queries and fill hits.
@@ -215,7 +216,7 @@ public class Dispatcher extends AbstractComponent {
int covered = searchCluster.groupsWithSufficientCoverage();
int groups = searchCluster.orderedGroups().size();
int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS);
- Set<Integer> rejected = null;
+ Set<Integer> rejected = rejectGroupBlockingFeed(searchCluster.orderedGroups());
for (int i = 0; i < max; i++) {
Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected);
if (groupInCluster.isEmpty()) break; // No groups available
@@ -244,4 +245,20 @@ public class Dispatcher extends AbstractComponent {
throw new IllegalStateException("No suitable groups to dispatch query. Rejected: " + rejected);
}
+ /**
+ * We want to avoid groups blocking feed because their data may be out of date.
+ * If there is a single group blocking feed, we want to reject it.
+ * If multiple groups are blocking feed we should use them anyway as we may not have remaining
+ * capacity otherwise.
+ *
+ * @return a modifiable set containing the single group to reject, or null otherwise
+ */
+ private Set<Integer> rejectGroupBlockingFeed(List<Group> groups) {
+ List<Group> groupsRejectingFeed = groups.stream().filter(Group::isBlockingWrites).collect(Collectors.toList());
+ if (groupsRejectingFeed.size() != 1) return null;
+ Set<Integer> rejected = new HashSet<>();
+ rejected.add(groupsRejectingFeed.get(0).id());
+ return rejected;
+ }
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
index 1c3a90ac6ab..03160e6c9c7 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -43,7 +43,7 @@ public abstract class InvokerFactory {
* @param nodes pre-selected list of content nodes
* @param acceptIncompleteCoverage if some of the nodes are unavailable and this parameter is
* false, verify that the remaining set of nodes has sufficient coverage
- * @return Optional containing the SearchInvoker or empty if some node in the
+ * @return the invoker or empty if some node in the
* list is invalid and the remaining coverage is not sufficient
*/
public Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher,
@@ -82,7 +82,7 @@ public abstract class InvokerFactory {
if ( ! searchCluster.isPartialGroupCoverageSufficient(groupId, success) && !acceptIncompleteCoverage) {
return Optional.empty();
}
- if(invokers.size() == 0) {
+ if (invokers.size() == 0) {
return Optional.of(createCoverageErrorInvoker(nodes, failed));
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
index 210ab5777d2..05e1ea6e2f9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
@@ -134,6 +134,7 @@ public class LoadBalancer {
}
private static class RoundRobinScheduler implements GroupScheduler {
+
private int needle = 0;
private final List<GroupStatus> scoreboard;
@@ -204,6 +205,7 @@ public class LoadBalancer {
}
static class AdaptiveScheduler implements GroupScheduler {
+
private final Random random;
private final List<GroupStatus> scoreboard;
@@ -251,4 +253,5 @@ public class LoadBalancer {
return selectGroup(needle, false, rejectedGroups);
}
}
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
index 0e4e87b9a6a..ad2581696a7 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
@@ -21,6 +21,7 @@ public class Group {
private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true);
private final AtomicBoolean hasFullCoverage = new AtomicBoolean(true);
private final AtomicLong activeDocuments = new AtomicLong(0);
+ private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false);
public Group(int id, List<Node> nodes) {
this.id = id;
@@ -61,21 +62,16 @@ public class Group {
return nodesUp;
}
- void aggregateActiveDocuments() {
- long activeDocumentsInGroup = 0;
- for (Node node : nodes) {
- if (node.isWorking() == Boolean.TRUE) {
- activeDocumentsInGroup += node.getActiveDocuments();
- }
- }
- activeDocuments.set(activeDocumentsInGroup);
-
+ void aggregateNodeValues() {
+ activeDocuments.set(nodes.stream().filter(Node::isWorking).mapToLong(Node::getActiveDocuments).sum());
+ isBlockingWrites.set(nodes.stream().anyMatch(node -> node.isBlockingWrites()));
}
/** Returns the active documents on this node. If unknown, 0 is returned. */
- long getActiveDocuments() {
- return this.activeDocuments.get();
- }
+ long getActiveDocuments() { return activeDocuments.get(); }
+
+ /** Returns whether any node in this group is currently blocking write operations */
+ public boolean isBlockingWrites() { return isBlockingWrites.get(); }
public boolean isFullCoverageStatusChanged(boolean hasFullCoverageNow) {
boolean previousState = hasFullCoverage.getAndSet(hasFullCoverageNow);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
index e93b633f09d..8f465070de4 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
@@ -23,6 +23,7 @@ public class Node {
private final AtomicLong activeDocuments = new AtomicLong(0);
private final AtomicLong pingSequence = new AtomicLong(0);
private final AtomicLong lastPong = new AtomicLong(0);
+ private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false);
public Node(int key, String hostname, int group) {
this.key = key;
@@ -70,14 +71,14 @@ public class Node {
}
/** Updates the active documents on this node */
- void setActiveDocuments(long activeDocuments) {
- this.activeDocuments.set(activeDocuments);
- }
+ void setActiveDocuments(long activeDocuments) { this.activeDocuments.set(activeDocuments); }
/** Returns the active documents on this node. If unknown, 0 is returned. */
- long getActiveDocuments() {
- return activeDocuments.get();
- }
+ long getActiveDocuments() { return activeDocuments.get(); }
+
+ public void setBlockingWrites(boolean isBlockingWrites) { this.isBlockingWrites.set(isBlockingWrites); }
+
+ boolean isBlockingWrites() { return isBlockingWrites.get(); }
@Override
public int hashCode() { return Objects.hash(hostname, key, pathIndex, group); }
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 7862648ba51..27b4472e324 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -91,7 +91,7 @@ public class SearchCluster implements NodeManager<Node> {
}
public void addMonitoring(ClusterMonitor clusterMonitor) {
- for (var group : orderedGroups) {
+ for (var group : orderedGroups()) {
for (var node : group.nodes())
clusterMonitor.add(node, true);
}
@@ -147,8 +147,8 @@ public class SearchCluster implements NodeManager<Node> {
/** Returns the n'th (zero-indexed) group in the cluster if possible */
public Optional<Group> group(int n) {
- if (orderedGroups.size() > n) {
- return Optional.of(orderedGroups.get(n));
+ if (orderedGroups().size() > n) {
+ return Optional.of(orderedGroups().get(n));
} else {
return Optional.empty();
}
@@ -156,13 +156,13 @@ public class SearchCluster implements NodeManager<Node> {
/** Returns the number of nodes per group - size()/groups.size() */
public int groupSize() {
- if (groups.size() == 0) return size();
- return size() / groups.size();
+ if (groups().size() == 0) return size();
+ return size() / groups().size();
}
public int groupsWithSufficientCoverage() {
int covered = 0;
- for (Group g : orderedGroups) {
+ for (Group g : orderedGroups()) {
if (g.hasSufficientCoverage()) {
covered++;
}
@@ -178,7 +178,7 @@ public class SearchCluster implements NodeManager<Node> {
if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty();
// Only use direct dispatch if the local group has sufficient coverage
- Group localSearchGroup = groups.get(localCorpusDispatchTarget.get().group());
+ Group localSearchGroup = groups().get(localCorpusDispatchTarget.get().group());
if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
// Only use direct dispatch if the local search node is not down
@@ -257,12 +257,15 @@ public class SearchCluster implements NodeManager<Node> {
}
private static class PongCallback implements PongHandler {
+
private final ClusterMonitor<Node> clusterMonitor;
private final Node node;
+
PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) {
this.node = node;
this.clusterMonitor = clusterMonitor;
}
+
@Override
public void handle(Pong pong) {
if (pong.badResponse()) {
@@ -270,10 +273,12 @@ public class SearchCluster implements NodeManager<Node> {
} else {
if (pong.activeDocuments().isPresent()) {
node.setActiveDocuments(pong.activeDocuments().get());
+ node.setBlockingWrites(pong.isBlockingWrites());
}
clusterMonitor.responded(node);
}
}
+
}
/** Used by the cluster monitor to manage node status */
@@ -284,8 +289,8 @@ public class SearchCluster implements NodeManager<Node> {
}
private void pingIterationCompletedSingleGroup() {
- Group group = groups.values().iterator().next();
- group.aggregateActiveDocuments();
+ Group group = groups().values().iterator().next();
+ group.aggregateNodeValues();
// With just one group sufficient coverage may not be the same as full coverage, as the
// group will always be marked sufficient for use.
updateSufficientCoverage(group, true);
@@ -295,21 +300,20 @@ public class SearchCluster implements NodeManager<Node> {
}
private void pingIterationCompletedMultipleGroups() {
- int numGroups = orderedGroups.size();
+ int numGroups = orderedGroups().size();
// Update active documents per group and use it to decide if the group should be active
-
long[] activeDocumentsInGroup = new long[numGroups];
long sumOfActiveDocuments = 0;
for(int i = 0; i < numGroups; i++) {
- Group group = orderedGroups.get(i);
- group.aggregateActiveDocuments();
+ Group group = orderedGroups().get(i);
+ group.aggregateNodeValues();
activeDocumentsInGroup[i] = group.getActiveDocuments();
sumOfActiveDocuments += activeDocumentsInGroup[i];
}
boolean anyGroupsSufficientCoverage = false;
for (int i = 0; i < numGroups; i++) {
- Group group = orderedGroups.get(i);
+ Group group = orderedGroups().get(i);
long activeDocuments = activeDocumentsInGroup[i];
long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1);
boolean sufficientCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), activeDocuments, averageDocumentsInOtherGroups);
@@ -326,7 +330,7 @@ public class SearchCluster implements NodeManager<Node> {
*/
@Override
public void pingIterationCompleted() {
- int numGroups = orderedGroups.size();
+ int numGroups = orderedGroups().size();
if (numGroups == 1) {
pingIterationCompletedSingleGroup();
} else {
@@ -357,7 +361,7 @@ public class SearchCluster implements NodeManager<Node> {
* Calculate whether a subset of nodes in a group has enough coverage
*/
public boolean isPartialGroupCoverageSufficient(OptionalInt knownGroupId, List<Node> nodes) {
- if (orderedGroups.size() == 1) {
+ if (orderedGroups().size() == 1) {
boolean sufficient = nodes.size() >= groupSize() - dispatchConfig.maxNodesDownPerGroup();
return sufficient;
}
@@ -366,14 +370,14 @@ public class SearchCluster implements NodeManager<Node> {
return false;
}
int groupId = knownGroupId.getAsInt();
- Group group = groups.get(groupId);
+ Group group = groups().get(groupId);
if (group == null) {
return false;
}
int nodesInGroup = group.nodes().size();
long sumOfActiveDocuments = 0;
int otherGroups = 0;
- for (Group g : orderedGroups) {
+ for (Group g : orderedGroups()) {
if (g.id() != groupId) {
sumOfActiveDocuments += g.getActiveDocuments();
otherGroups++;
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
index 5433a28dd6e..aa7a0c6b4f4 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
@@ -35,7 +35,7 @@ public class DispatcherTest {
q.getModel().setSearchPath("1/0"); // second node in first group
MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (nodes, a) -> {
assertEquals(1, nodes.size());
- assertEquals(2, nodes.get(0).key());
+ assertEquals(1, nodes.get(0).key());
return true;
});
Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric());
@@ -92,6 +92,41 @@ public class DispatcherTest {
}
}
+ @Test
+ public void testGroup1IsSelected() {
+ SearchCluster cluster = new MockSearchCluster("1", 3, 1);
+ Dispatcher dispatcher = new Dispatcher(new ClusterMonitor(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true), new MockMetric());
+ cluster.pingIterationCompleted();
+ assertEquals(0,
+ dispatcher.getSearchInvoker(new Query(), null).distributionKey().get().longValue());
+ dispatcher.deconstruct();
+ }
+
+ @Test
+ public void testGroup1IsSkippedWhenItIsBlockingFeed() {
+ SearchCluster cluster = new MockSearchCluster("1", 3, 1);
+ Dispatcher dispatcher = new Dispatcher(new ClusterMonitor(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true), new MockMetric());
+ cluster.group(0).get().nodes().get(0).setBlockingWrites(true);
+ cluster.pingIterationCompleted();
+ assertEquals("Blocking group is avoided",
+ 1,
+ (dispatcher.getSearchInvoker(new Query(), null).distributionKey().get()).longValue());
+ dispatcher.deconstruct();
+ }
+
+ @Test
+ public void testGroup1IsSelectedWhenMoreAreBlockingFeed() {
+ SearchCluster cluster = new MockSearchCluster("1", 2, 1);
+ Dispatcher dispatcher = new Dispatcher(new ClusterMonitor(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true), new MockMetric());
+ cluster.group(0).get().nodes().get(0).setBlockingWrites(true);
+ cluster.group(1).get().nodes().get(0).setBlockingWrites(true);
+ cluster.pingIterationCompleted();
+ assertEquals("Blocking group is used when multiple groups are blocking",
+ 0,
+ dispatcher.getSearchInvoker(new Query(), null).distributionKey().get().longValue());
+ dispatcher.deconstruct();
+ }
+
interface FactoryStep {
boolean returnInvoker(List<Node> nodes, boolean acceptIncompleteCoverage);
}
@@ -119,7 +154,7 @@ public class DispatcherTest {
boolean nonEmpty = events[step].returnInvoker(nodes, acceptIncompleteCoverage);
step++;
if (nonEmpty) {
- return Optional.of(new MockInvoker(1));
+ return Optional.of(new MockInvoker(nodes.get(0).key()));
} else {
return Optional.empty();
}
@@ -150,4 +185,5 @@ public class DispatcherTest {
return null;
}
}
+
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
index 32c6738fc3b..cebe5c5e2ba 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
@@ -35,15 +35,15 @@ public class MockSearchCluster extends SearchCluster {
ImmutableList.Builder<Group> orderedGroupBuilder = ImmutableList.builder();
ImmutableMap.Builder<Integer, Group> groupBuilder = ImmutableMap.builder();
ImmutableMultimap.Builder<String, Node> hostBuilder = ImmutableMultimap.builder();
- int dk = 1;
+ int distributionKey = 0;
for (int group = 0; group < groups; group++) {
List<Node> nodes = new ArrayList<>();
for (int node = 0; node < nodesPerGroup; node++) {
- Node n = new Node(dk, "host" + dk, group);
+ Node n = new Node(distributionKey, "host" + distributionKey, group);
n.setWorking(true);
nodes.add(n);
hostBuilder.put(n.hostname(), n);
- dk++;
+ distributionKey++;
}
Group g = new Group(group, nodes);
groupBuilder.put(group, g);
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java
index 5a4457780e2..58042dcf228 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/SearchPathTest.java
@@ -18,6 +18,7 @@ import static org.junit.Assert.assertThat;
* @author ollivir
*/
public class SearchPathTest {
+
@Test
public void requreThatSearchPathsAreParsedCorrectly() {
assertThat(SearchPath.fromString("0/0").get().toString(), equalTo("0/0"));
@@ -71,11 +72,11 @@ public class SearchPathTest {
public void searchPathMustFilterNodesBasedOnDefinition() {
MockSearchCluster cluster = new MockSearchCluster("a",3, 3);
- assertThat(distKeysAsString(SearchPath.selectNodes("1/1", cluster)), equalTo("5"));
- assertThat(distKeysAsString(SearchPath.selectNodes("/1", cluster)), equalTo("4,5,6"));
- assertThat(distKeysAsString(SearchPath.selectNodes("0,1/2", cluster)), equalTo("7,8"));
- assertThat(distKeysAsString(SearchPath.selectNodes("[1,3>/1", cluster)), equalTo("5,6"));
- assertThat(distKeysAsString(SearchPath.selectNodes("[1,88>/1", cluster)), equalTo("5,6"));
+ assertThat(distKeysAsString(SearchPath.selectNodes("1/1", cluster)), equalTo("4"));
+ assertThat(distKeysAsString(SearchPath.selectNodes("/1", cluster)), equalTo("3,4,5"));
+ assertThat(distKeysAsString(SearchPath.selectNodes("0,1/2", cluster)), equalTo("6,7"));
+ assertThat(distKeysAsString(SearchPath.selectNodes("[1,3>/1", cluster)), equalTo("4,5"));
+ assertThat(distKeysAsString(SearchPath.selectNodes("[1,88>/1", cluster)), equalTo("4,5"));
}
private static String distKeysAsString(Collection<Node> nodes) {