diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2018-10-22 14:39:59 +0200 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2018-10-22 14:39:59 +0200 |
commit | edf46c3e106da961c522add0691dfa090d8637a1 (patch) | |
tree | 01d8dfe9bf6b3c50b024877d144181092af6192f | |
parent | ed5b6938cb9c5005beb100cac2ce492fc2435b05 (diff) |
Test connection to content node before committing to using it
4 files changed, 86 insertions, 60 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java index 2a90e746378..202ee94383f 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java @@ -354,6 +354,31 @@ public class Backend implements ConnectionFactory { } /** + * Attempt to establish a connection without sending messages and then + * return it to the pool. The assumption is that if the probing is + * successful, the connection will be used soon after. There should be + * minimal overhead since the connection is cached. + */ + public boolean probeConnection() { + if (shutdownInitiated) { + return false; + } + + FS4Connection connection = null; + try { + connection = getConnection(); + } catch (IOException ignored) { + // connection is null + } finally { + if (connection != null) { + returnConnection(connection); + } + } + + return connection != null; + } + + /** * This method should be used to ensure graceful shutdown of the backend. */ public void shutdown() { diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java index 08dcbe17db2..012a0b3a7a9 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -2,9 +2,9 @@ package com.yahoo.prelude.fastsearch; import com.google.common.collect.ImmutableMap; +import com.yahoo.fs4.mplex.Backend; import com.yahoo.search.Query; import com.yahoo.search.Result; -import com.yahoo.search.dispatch.CloseableInvoker; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InterleavedFillInvoker; import com.yahoo.search.dispatch.InterleavedSearchInvoker; @@ -12,7 +12,6 @@ import com.yahoo.search.dispatch.SearchCluster; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.result.Hit; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -43,11 +42,27 @@ public class FS4InvokerFactory { } public SearchInvoker getSearchInvoker(Query query, SearchCluster.Node node) { - return new FS4SearchInvoker(searcher, query, fs4ResourcePool, node); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + return new FS4SearchInvoker(searcher, query, backend.openChannel(), node); } public Optional<SearchInvoker> getSearchInvoker(Query query, List<SearchCluster.Node> nodes) { - return getInvoker(nodes, node -> getSearchInvoker(query, node), InterleavedSearchInvoker::new); + Map<Integer, SearchInvoker> invokers = new HashMap<>(); + for (SearchCluster.Node node : nodes) { + if (node.isWorking()) { + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + if (backend.probeConnection()) { + invokers.put(node.key(), new FS4SearchInvoker(searcher, query, backend.openChannel(), node)); + } else { + return Optional.empty(); + } + } + } + if (invokers.size() == 1) { + return Optional.of(invokers.values().iterator().next()); + } else { + return Optional.of(new InterleavedSearchInvoker(invokers)); + } } public FillInvoker getFillInvoker(Query query, SearchCluster.Node node) { @@ -56,18 +71,22 @@ public class FS4InvokerFactory { public Optional<FillInvoker> getFillInvoker(Result result) { Collection<Integer> requiredNodes = requiredFillNodes(result); - List<SearchCluster.Node> nodes = new ArrayList<>(requiredNodes.size()); + Query query = result.getQuery(); + Map<Integer, FillInvoker> invokers = new HashMap<>(); for (Integer distKey : requiredNodes) { SearchCluster.Node node = nodesByKey.get(distKey); if (node == null) { return Optional.empty(); } - nodes.add(node); + invokers.put(distKey, getFillInvoker(query, node)); } - Query query = result.getQuery(); - return getInvoker(nodes, node -> getFillInvoker(query, node), InterleavedFillInvoker::new); + if (invokers.size() == 1) { + return Optional.of(invokers.values().iterator().next()); + } else { + return Optional.of(new InterleavedFillInvoker(invokers)); + } } private static Collection<Integer> requiredFillNodes(Result result) { @@ -81,40 +100,4 @@ public class FS4InvokerFactory { } return requiredNodes; } - - @FunctionalInterface - private interface InvokerConstructor<INVOKER> { - INVOKER construct(SearchCluster.Node node); - } - - @FunctionalInterface - private interface ClusterInvokerConstructor<CLUSTERINVOKER extends INVOKER, INVOKER> { - CLUSTERINVOKER construct(Map<Integer, INVOKER> subinvokers); - } - - /* Get an invocation object for the provided collection of nodes. If only one - node is used, only the single-node invoker is used. For multiple nodes, each - gets a single-node invoker and they are all wrapped into a cluster invoker. - The functional interfaces are used to allow code reuse with SearchInvokers - and FillInvokers even though they don't share much class hierarchy. */ - private <INVOKER extends CloseableInvoker, CLUSTERINVOKER extends INVOKER> Optional<INVOKER> getInvoker( - Collection<SearchCluster.Node> nodes, InvokerConstructor<INVOKER> singleNodeCtor, - ClusterInvokerConstructor<CLUSTERINVOKER, INVOKER> clusterCtor) { - if (nodes.size() == 1) { - SearchCluster.Node node = nodes.iterator().next(); - return Optional.of(singleNodeCtor.construct(node)); - } else { - Map<Integer, INVOKER> nodeInvokers = new HashMap<>(); - for (SearchCluster.Node node : nodes) { - if (node.isWorking()) { - nodeInvokers.put(node.key(), singleNodeCtor.construct(node)); - } - } - if (nodeInvokers.size() == 1) { - return Optional.of(nodeInvokers.values().iterator().next()); - } else { - return Optional.of(clusterCtor.construct(nodeInvokers)); - } - } - } } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java index ac48aef7063..dc8cd53e638 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -39,12 +39,11 @@ public class FS4SearchInvoker extends SearchInvoker { private Query query = null; private QueryPacket queryPacket = null; - public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, SearchCluster.Node node) { + public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, SearchCluster.Node node) { this.searcher = searcher; this.node = Optional.of(node); + this.channel = channel; - Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); - this.channel = backend.openChannel(); channel.setQuery(query); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 2fdb10067ff..2f67600522f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -14,9 +14,11 @@ import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; /** * A dispatcher communicates with search nodes to perform queries and fill hits. @@ -120,19 +122,36 @@ public class Dispatcher extends AbstractComponent { return invokerFactory.supply(query, Arrays.asList(node)); } - Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query); - if (!groupInCluster.isPresent()) { - return Optional.empty(); - } - SearchCluster.Group group = groupInCluster.get(); - query.trace(false, 2, "Dispatching internally to ", group); - - Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.nodes()); - if (invoker.isPresent()) { - invoker.get().teardown(() -> loadBalancer.releaseGroup(group)); - } else { - loadBalancer.releaseGroup(group); + Set<Integer> tried = null; + int max = searchCluster.groups().size(); + for (int attempt = 0; attempt < max; attempt++) { + Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query); + if (! groupInCluster.isPresent()) { + // No groups available + break; + } + SearchCluster.Group group = groupInCluster.get(); + if (tried != null && tried.contains(group.id())) { + // bail out: LB is offering a previously discarded group + loadBalancer.releaseGroup(group); + break; + } + + Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.nodes()); + if (invoker.isPresent()) { + query.trace(false, 2, "Dispatching internally to ", group); + invoker.get().teardown(() -> loadBalancer.releaseGroup(group)); + return invoker; + } else { + // invoker could not be produced (likely connectivity issue) + loadBalancer.releaseGroup(group); + if (tried == null) { + tried = new HashSet<>(); + } + tried.add(group.id()); + } } - return invoker; + + return Optional.empty(); } } |