aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-09-17 22:39:26 +0200
committerGitHub <noreply@github.com>2019-09-17 22:39:26 +0200
commita573985d1127835f0ecb5047694ffe23e8baefe7 (patch)
tree463ab5145edf4a26243cc4a92d04bfb82a3c1580 /container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
parentddb9cd0a539b57c41587ccdec1040b48169d3cec (diff)
Revert "Balder/no more fs4 dispatching from fastsearcher"
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java125
1 files changed, 118 insertions, 7 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index b0b3a7800e9..6b0041a9e86 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -1,14 +1,23 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.fastsearch;
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.PingPacket;
+import com.yahoo.fs4.PongPacket;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
import com.yahoo.prelude.Ping;
import com.yahoo.prelude.Pong;
import com.yahoo.prelude.querytransform.QueryRewrite;
+import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.SearchInvoker;
+import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.grouping.GroupingRequest;
import com.yahoo.search.grouping.request.GroupingOperation;
import com.yahoo.search.query.Ranking;
@@ -37,13 +46,21 @@ import static com.yahoo.container.util.Util.quote;
// catch and unwrap into a results with an error in high level methods. -Jon
public class FastSearcher extends VespaBackEndSearcher {
+ /** If this is turned on this will make search queries directly to the local search node when possible */
+ private final static CompoundName dispatchDirect = new CompoundName("dispatch.direct");
+
/** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */
private final Dispatcher dispatcher;
+ private final Backend dispatchBackend;
+ private final FS4ResourcePool fs4ResourcePool;
+
/**
* Creates a Fastsearcher.
*
- * @param serverId the resource pool used to create direct connections to the local search nodes when
+ * @param dispatchBackend The backend object containing the connection to the dispatch node this should talk to
+ * over the fs4 protocol
+ * @param fs4ResourcePool the resource pool used to create direct connections to the local search nodes when
* bypassing the dispatch node
* @param dispatcher the dispatcher used (when enabled) to send summary requests over the rpc protocol.
* Eventually we will move everything to this protocol and never use dispatch nodes.
@@ -53,11 +70,13 @@ public class FastSearcher extends VespaBackEndSearcher {
* @param clusterParams the cluster number, and other cluster backend parameters
* @param documentdbInfoConfig document database parameters
*/
- public FastSearcher(String serverId, Dispatcher dispatcher,
+ public FastSearcher(Backend dispatchBackend, FS4ResourcePool fs4ResourcePool, Dispatcher dispatcher,
SummaryParameters docSumParams, ClusterParams clusterParams,
DocumentdbInfoConfig documentdbInfoConfig) {
- init(serverId, docSumParams, clusterParams, documentdbInfoConfig);
+ init(fs4ResourcePool.getServerId(), docSumParams, clusterParams, documentdbInfoConfig);
+ this.dispatchBackend = dispatchBackend;
this.dispatcher = dispatcher;
+ this.fs4ResourcePool = fs4ResourcePool;
}
/**
@@ -65,7 +84,58 @@ public class FastSearcher extends VespaBackEndSearcher {
*/
@Override
public Pong ping(Ping ping, Execution execution) {
- throw new IllegalStateException("This ping should not have been called.");
+ return ping(ping, dispatchBackend, getName());
+ }
+
+ public static Pong ping(Ping ping, Backend backend, String name) {
+ FS4Channel channel = backend.openPingChannel();
+
+ // If you want to change this code, you need to understand
+ // com.yahoo.prelude.cluster.ClusterSearcher.ping(Searcher) and
+ // com.yahoo.prelude.cluster.TrafficNodeMonitor.failed(ErrorMessage)
+ try {
+ PingPacket pingPacket = new PingPacket();
+ try {
+ boolean couldSend = channel.sendPacket(pingPacket);
+ if ( ! couldSend) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Could not ping " + name));
+ }
+ } catch (InvalidChannelException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel " + name));
+ } catch (IllegalStateException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage()));
+ } catch (IOException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("IO error while sending ping: " + e.getMessage()));
+ }
+
+ // We should only get a single packet
+ BasicPacket[] packets;
+
+ try {
+ packets = channel.receivePackets(ping.getTimeout(), 1);
+ } catch (ChannelTimeoutException e) {
+ return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + name));
+ } catch (InvalidChannelException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel for " + name));
+ }
+
+ if (packets.length == 0) {
+ return new Pong(ErrorMessage.createBackendCommunicationError(name + " got no packets back"));
+ }
+
+ try {
+ packets[0].ensureInstanceOf(PongPacket.class, name);
+ } catch (TimeoutException e) {
+ return new Pong(ErrorMessage.createTimeout(e.getMessage()));
+ } catch (IOException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Unexpected packet class returned after ping: " + e.getMessage()));
+ }
+ return new Pong((PongPacket)packets[0]);
+ } finally {
+ if (channel != null) {
+ channel.close();
+ }
+ }
}
@Override
@@ -147,7 +217,18 @@ public class FastSearcher extends VespaBackEndSearcher {
* on the same host.
*/
private SearchInvoker getSearchInvoker(Query query) {
- return dispatcher.getSearchInvoker(query, this).get();
+ Optional<SearchInvoker> invoker = dispatcher.getSearchInvoker(query, this);
+ if (invoker.isPresent()) {
+ return invoker.get();
+ }
+
+ Optional<Node> direct = getDirectNode(query);
+ if(direct.isPresent()) {
+ var node = direct.get();
+ Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port());
+ return new FS4SearchInvoker(this, query, backend.openChannel(), direct);
+ }
+ return new FS4SearchInvoker(this, query, dispatchBackend.openChannel(), Optional.empty());
}
/**
@@ -156,17 +237,47 @@ public class FastSearcher extends VespaBackEndSearcher {
* content nodes.
*/
private FillInvoker getFillInvoker(Result result) {
- return dispatcher.getFillInvoker(result, this).get();
+ Query query = result.getQuery();
+ Optional<FillInvoker> invoker = dispatcher.getFillInvoker(result, this);
+ if (invoker.isPresent()) {
+ return invoker.get();
+ }
+
+ Optional<Node> direct = getDirectNode(query);
+ if (direct.isPresent()) {
+ var node = direct.get();
+ Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port());
+ return new FS4FillInvoker(this, query, backend);
+ }
+ return new FS4FillInvoker(this, query, dispatchBackend);
}
+ /**
+ * If the query can be directed to a single local content node, returns that node. Otherwise,
+ * returns an empty value.
+ */
+ private Optional<Node> getDirectNode(Query query) {
+ if (!query.properties().getBoolean(dispatchDirect, true))
+ return Optional.empty();
+ if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE))
+ return Optional.empty();
+
+ Optional<Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget();
+ if (!directDispatchRecipient.isPresent())
+ return Optional.empty();
+ // Dispatch directly to the single, local search node
+ Node local = directDispatchRecipient.get();
+ query.trace(false, 2, "Dispatching directly to ", local);
+ return Optional.of(local);
+ }
private static Optional<String> quotedSummaryClass(String summaryClass) {
return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass));
}
public String toString() {
- return "fast searcher (" + getName() + ") ";
+ return "fast searcher (" + getName() + ") " + dispatchBackend;
}
protected boolean isLoggingFine() {