summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2018-10-22 14:39:59 +0200
committerOlli Virtanen <olli.virtanen@oath.com>2018-10-22 14:39:59 +0200
commitedf46c3e106da961c522add0691dfa090d8637a1 (patch)
tree01d8dfe9bf6b3c50b024877d144181092af6192f
parented5b6938cb9c5005beb100cac2ce492fc2435b05 (diff)
Test connection to content node before committing to using it
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java25
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java71
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java5
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java45
4 files changed, 86 insertions, 60 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
index 2a90e746378..202ee94383f 100644
--- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java
@@ -354,6 +354,31 @@ public class Backend implements ConnectionFactory {
}
/**
+ * Attempt to establish a connection without sending messages and then
+ * return it to the pool. The assumption is that if the probing is
+ * successful, the connection will be used soon after. There should be
+ * minimal overhead since the connection is cached.
+ */
+ public boolean probeConnection() {
+ if (shutdownInitiated) {
+ return false;
+ }
+
+ FS4Connection connection = null;
+ try {
+ connection = getConnection();
+ } catch (IOException ignored) {
+ // connection is null
+ } finally {
+ if (connection != null) {
+ returnConnection(connection);
+ }
+ }
+
+ return connection != null;
+ }
+
+ /**
* This method should be used to ensure graceful shutdown of the backend.
*/
public void shutdown() {
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
index 08dcbe17db2..012a0b3a7a9 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
@@ -2,9 +2,9 @@
package com.yahoo.prelude.fastsearch;
import com.google.common.collect.ImmutableMap;
+import com.yahoo.fs4.mplex.Backend;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
-import com.yahoo.search.dispatch.CloseableInvoker;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.InterleavedFillInvoker;
import com.yahoo.search.dispatch.InterleavedSearchInvoker;
@@ -12,7 +12,6 @@ import com.yahoo.search.dispatch.SearchCluster;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.result.Hit;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -43,11 +42,27 @@ public class FS4InvokerFactory {
}
public SearchInvoker getSearchInvoker(Query query, SearchCluster.Node node) {
- return new FS4SearchInvoker(searcher, query, fs4ResourcePool, node);
+ Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port());
+ return new FS4SearchInvoker(searcher, query, backend.openChannel(), node);
}
public Optional<SearchInvoker> getSearchInvoker(Query query, List<SearchCluster.Node> nodes) {
- return getInvoker(nodes, node -> getSearchInvoker(query, node), InterleavedSearchInvoker::new);
+ Map<Integer, SearchInvoker> invokers = new HashMap<>();
+ for (SearchCluster.Node node : nodes) {
+ if (node.isWorking()) {
+ Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port());
+ if (backend.probeConnection()) {
+ invokers.put(node.key(), new FS4SearchInvoker(searcher, query, backend.openChannel(), node));
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+ if (invokers.size() == 1) {
+ return Optional.of(invokers.values().iterator().next());
+ } else {
+ return Optional.of(new InterleavedSearchInvoker(invokers));
+ }
}
public FillInvoker getFillInvoker(Query query, SearchCluster.Node node) {
@@ -56,18 +71,22 @@ public class FS4InvokerFactory {
public Optional<FillInvoker> getFillInvoker(Result result) {
Collection<Integer> requiredNodes = requiredFillNodes(result);
- List<SearchCluster.Node> nodes = new ArrayList<>(requiredNodes.size());
+ Query query = result.getQuery();
+ Map<Integer, FillInvoker> invokers = new HashMap<>();
for (Integer distKey : requiredNodes) {
SearchCluster.Node node = nodesByKey.get(distKey);
if (node == null) {
return Optional.empty();
}
- nodes.add(node);
+ invokers.put(distKey, getFillInvoker(query, node));
}
- Query query = result.getQuery();
- return getInvoker(nodes, node -> getFillInvoker(query, node), InterleavedFillInvoker::new);
+ if (invokers.size() == 1) {
+ return Optional.of(invokers.values().iterator().next());
+ } else {
+ return Optional.of(new InterleavedFillInvoker(invokers));
+ }
}
private static Collection<Integer> requiredFillNodes(Result result) {
@@ -81,40 +100,4 @@ public class FS4InvokerFactory {
}
return requiredNodes;
}
-
- @FunctionalInterface
- private interface InvokerConstructor<INVOKER> {
- INVOKER construct(SearchCluster.Node node);
- }
-
- @FunctionalInterface
- private interface ClusterInvokerConstructor<CLUSTERINVOKER extends INVOKER, INVOKER> {
- CLUSTERINVOKER construct(Map<Integer, INVOKER> subinvokers);
- }
-
- /* Get an invocation object for the provided collection of nodes. If only one
- node is used, only the single-node invoker is used. For multiple nodes, each
- gets a single-node invoker and they are all wrapped into a cluster invoker.
- The functional interfaces are used to allow code reuse with SearchInvokers
- and FillInvokers even though they don't share much class hierarchy. */
- private <INVOKER extends CloseableInvoker, CLUSTERINVOKER extends INVOKER> Optional<INVOKER> getInvoker(
- Collection<SearchCluster.Node> nodes, InvokerConstructor<INVOKER> singleNodeCtor,
- ClusterInvokerConstructor<CLUSTERINVOKER, INVOKER> clusterCtor) {
- if (nodes.size() == 1) {
- SearchCluster.Node node = nodes.iterator().next();
- return Optional.of(singleNodeCtor.construct(node));
- } else {
- Map<Integer, INVOKER> nodeInvokers = new HashMap<>();
- for (SearchCluster.Node node : nodes) {
- if (node.isWorking()) {
- nodeInvokers.put(node.key(), singleNodeCtor.construct(node));
- }
- }
- if (nodeInvokers.size() == 1) {
- return Optional.of(nodeInvokers.values().iterator().next());
- } else {
- return Optional.of(clusterCtor.construct(nodeInvokers));
- }
- }
- }
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
index ac48aef7063..dc8cd53e638 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
@@ -39,12 +39,11 @@ public class FS4SearchInvoker extends SearchInvoker {
private Query query = null;
private QueryPacket queryPacket = null;
- public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, SearchCluster.Node node) {
+ public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, SearchCluster.Node node) {
this.searcher = searcher;
this.node = Optional.of(node);
+ this.channel = channel;
- Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port());
- this.channel = backend.openChannel();
channel.setQuery(query);
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 2fdb10067ff..2f67600522f 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -14,9 +14,11 @@ import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
/**
* A dispatcher communicates with search nodes to perform queries and fill hits.
@@ -120,19 +122,36 @@ public class Dispatcher extends AbstractComponent {
return invokerFactory.supply(query, Arrays.asList(node));
}
- Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query);
- if (!groupInCluster.isPresent()) {
- return Optional.empty();
- }
- SearchCluster.Group group = groupInCluster.get();
- query.trace(false, 2, "Dispatching internally to ", group);
-
- Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.nodes());
- if (invoker.isPresent()) {
- invoker.get().teardown(() -> loadBalancer.releaseGroup(group));
- } else {
- loadBalancer.releaseGroup(group);
+ Set<Integer> tried = null;
+ int max = searchCluster.groups().size();
+ for (int attempt = 0; attempt < max; attempt++) {
+ Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query);
+ if (! groupInCluster.isPresent()) {
+ // No groups available
+ break;
+ }
+ SearchCluster.Group group = groupInCluster.get();
+ if (tried != null && tried.contains(group.id())) {
+ // bail out: LB is offering a previously discarded group
+ loadBalancer.releaseGroup(group);
+ break;
+ }
+
+ Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.nodes());
+ if (invoker.isPresent()) {
+ query.trace(false, 2, "Dispatching internally to ", group);
+ invoker.get().teardown(() -> loadBalancer.releaseGroup(group));
+ return invoker;
+ } else {
+ // invoker could not be produced (likely connectivity issue)
+ loadBalancer.releaseGroup(group);
+ if (tried == null) {
+ tried = new HashSet<>();
+ }
+ tried.add(group.id());
+ }
}
- return invoker;
+
+ return Optional.empty();
}
}