aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/prelude
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/Pong.java31
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java161
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java122
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java2
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java1
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java3
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java184
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java29
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java38
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java220
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java25
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java125
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java12
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java3
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java3
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java3
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java3
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java4
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java15
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java7
20 files changed, 963 insertions, 28 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/Pong.java b/container-search/src/main/java/com/yahoo/prelude/Pong.java
index a60fba9a4f7..a6bc3e7975d 100644
--- a/container-search/src/main/java/com/yahoo/prelude/Pong.java
+++ b/container-search/src/main/java/com/yahoo/prelude/Pong.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude;
+import com.yahoo.fs4.PongPacket;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.statistics.ElapsedTime;
@@ -18,19 +19,28 @@ public class Pong {
private String pingInfo="";
private final List<ErrorMessage> errors = new ArrayList<>(1);
+ private final Optional<PongPacket> pongPacket;
private ElapsedTime elapsed = new ElapsedTime();
private final Optional<Long> activeDocuments;
public Pong() {
+ this.pongPacket = Optional.empty();
this.activeDocuments = Optional.empty();
}
public Pong(ErrorMessage error) {
errors.add(error);
+ this.pongPacket = Optional.empty();
+ this.activeDocuments = Optional.empty();
+ }
+
+ public Pong(PongPacket pongPacket) {
+ this.pongPacket = Optional.of(pongPacket);
this.activeDocuments = Optional.empty();
}
public Pong(long activeDocuments) {
+ this.pongPacket = Optional.empty();
this.activeDocuments = Optional.of(activeDocuments);
}
@@ -42,14 +52,21 @@ public class Pong {
return errors.get(i);
}
+ public int getErrorSize() {
+ return errors.size();
+ }
+
/** Returns the number of active documents in the backend responding in this Pong, if available */
public Optional<Long> activeDocuments() {
- return activeDocuments;
+ if (activeDocuments.isPresent()) return activeDocuments;
+ if ( ! pongPacket.isPresent()) return Optional.empty();
+ return pongPacket.get().getActiveDocuments();
}
/** Returns the number of nodes which responded to this Pong, if available */
public Optional<Integer> activeNodes() {
- return Optional.empty();
+ if ( ! pongPacket.isPresent()) return Optional.empty();
+ return pongPacket.get().getActiveNodes();
}
public List<ErrorMessage> getErrors() {
@@ -61,6 +78,16 @@ public class Pong {
return ! errors.isEmpty();
}
+ /** Sets information about the ping used to produce this. This is included when returning the tostring of this. */
+ public void setPingInfo(String pingInfo) {
+ if (pingInfo==null)
+ pingInfo="";
+ this.pingInfo=pingInfo;
+ }
+
+ /** Returns information about the ping use, or "" (never null) if none */
+ public String getPingInfo() { return pingInfo; }
+
public ElapsedTime getElapsedTime() {
return elapsed;
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java
new file mode 100644
index 00000000000..c075a0f842b
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java
@@ -0,0 +1,161 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.cluster;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.yahoo.component.provider.Freezable;
+import com.yahoo.container.handler.VipStatus;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
+import com.yahoo.search.result.ErrorMessage;
+
+/**
+ * Monitors of a cluster of remote nodes. The monitor uses an internal thread
+ * for node monitoring.
+ *
+ * @author bratseth
+ * @author Steinar Knutsen
+ */
+public class ClusterMonitor implements Runnable, Freezable {
+
+ // The ping thread wil start using the system, but we cannot be guaranteed that all components
+ // in the system is up. As a workaround for not being able to find out when the system
+ // is ready to be used, we wait some time before starting the ping thread
+ private static final int pingThreadInitialDelayMs = 3000;
+
+ private final MonitorConfiguration configuration;
+
+ private final static Logger log = Logger.getLogger(ClusterMonitor.class.getName());
+
+ private final ClusterSearcher nodeManager;
+
+ private final Optional<VipStatus> vipStatus;
+
+ /** A map from Node to corresponding MonitoredNode */
+ private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>();
+
+ private ScheduledFuture<?> future;
+
+ private boolean isFrozen = false;
+
+ ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, Optional<VipStatus> vipStatus) {
+ configuration = new MonitorConfiguration(monitorConfig);
+ nodeManager = manager;
+ this.vipStatus = vipStatus;
+ log.fine("checkInterval is " + configuration.getCheckInterval() + " ms");
+ }
+
+ /** Returns the configuration of this cluster monitor */
+ MonitorConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ void startPingThread() {
+ if ( ! isFrozen())
+ throw new IllegalStateException("Do not start the monitoring thread before the set of " +
+ "nodes to monitor is complete/the ClusterMonitor is frozen.");
+ future = nodeManager.getScheduledExecutor().scheduleAtFixedRate(this, pingThreadInitialDelayMs, configuration.getCheckInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Adds a new node for monitoring.
+ */
+ void add(VespaBackEndSearcher node) {
+ if (isFrozen())
+ throw new IllegalStateException("Can not add new nodes after ClusterMonitor has been frozen.");
+ nodeMonitors.put(node, new NodeMonitor(node));
+ updateVipStatus();
+ }
+
+ /** Called from ClusterSearcher/NodeManager when a node failed */
+ void failed(VespaBackEndSearcher node, ErrorMessage error) {
+ NodeMonitor monitor = nodeMonitors.get(node);
+ boolean wasWorking = monitor.isWorking();
+ monitor.failed(error);
+ if (wasWorking && !monitor.isWorking()) {
+ log.info("Failed monitoring node '" + node + "' due to '" + error);
+ nodeManager.failed(node);
+ }
+ updateVipStatus();
+ }
+
+ /** Called when a node responded */
+ void responded(VespaBackEndSearcher node, boolean hasSearchNodesOnline) {
+ NodeMonitor monitor = nodeMonitors.get(node);
+ boolean wasFailing = !monitor.isWorking();
+ monitor.responded(hasSearchNodesOnline);
+ if (wasFailing && monitor.isWorking()) {
+ log.info("Failed node '" + node + "' started working again.");
+ nodeManager.working(node);
+ }
+ updateVipStatus();
+ }
+
+ private void updateVipStatus() {
+ if ( ! vipStatus.isPresent()) return;
+ if ( ! hasInformationAboutAllNodes()) return;
+
+ if (hasWorkingNodesWithDocumentsOnline()) {
+ vipStatus.get().addToRotation(nodeManager.getId().stringValue());
+ } else {
+ vipStatus.get().removeFromRotation(nodeManager.getId().stringValue());
+ }
+ }
+
+ private boolean hasInformationAboutAllNodes() {
+ for (NodeMonitor monitor : nodeMonitors.values()) {
+ if ( ! monitor.statusIsKnown())
+ return false;
+ }
+ return true;
+ }
+
+ private boolean hasWorkingNodesWithDocumentsOnline() {
+ for (NodeMonitor node : nodeMonitors.values()) {
+ if (node.isWorking() && node.searchNodesOnline())
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Ping all nodes which needs pinging to discover state changes
+ */
+ private void ping() throws InterruptedException {
+ for (NodeMonitor monitor : nodeMonitors.values()) {
+ nodeManager.ping(monitor.getNode());
+ }
+ }
+
+ @Override
+ public void run() {
+ log.finest("Activating ping");
+ try {
+ ping();
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Error in monitor thread", e);
+ }
+ }
+
+ public void shutdown() {
+ if (future != null) {
+ future.cancel(true);
+ }
+ }
+
+ @Override
+ public void freeze() {
+ isFrozen = true;
+
+ }
+
+ @Override
+ public boolean isFrozen() {
+ return isFrozen;
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
index 0780d5e9d65..4ffcc0a4330 100644
--- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
@@ -2,13 +2,20 @@
package com.yahoo.prelude.cluster;
import com.yahoo.cloud.config.ClusterInfoConfig;
+import com.yahoo.collections.Tuple2;
import com.yahoo.component.ComponentId;
+import com.yahoo.component.chain.Chain;
import com.yahoo.component.chain.dependencies.After;
+import com.yahoo.concurrent.Receiver;
+import com.yahoo.concurrent.Receiver.MessageState;
import com.yahoo.container.QrSearchersConfig;
import com.yahoo.container.handler.VipStatus;
+import com.yahoo.fs4.mplex.Backend;
import com.yahoo.jdisc.Metric;
import com.yahoo.net.HostName;
import com.yahoo.prelude.IndexFacts;
+import com.yahoo.prelude.Ping;
+import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.ClusterParams;
import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
@@ -39,7 +46,11 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.STREAMING;
@@ -53,6 +64,10 @@ import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.S
@After("*")
public class ClusterSearcher extends Searcher {
+ private final static Logger log = Logger.getLogger(ClusterSearcher.class.getName());
+
+ private final ClusterMonitor monitor;
+
private final Value cacheHitRatio;
private final String clusterModelName;
@@ -63,6 +78,8 @@ public class ClusterSearcher extends Searcher {
// Mapping from rank profile names to document types containing them
private final Map<String, Set<String>> rankProfiles = new HashMap<>();
+ private final FS4ResourcePool fs4ResourcePool;
+
private final long maxQueryTimeout; // in milliseconds
private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L;
@@ -71,6 +88,7 @@ public class ClusterSearcher extends Searcher {
private VespaBackEndSearcher server = null;
+
/**
* Creates a new ClusterSearcher.
*/
@@ -78,6 +96,7 @@ public class ClusterSearcher extends Searcher {
QrSearchersConfig qrsConfig,
ClusterConfig clusterConfig,
DocumentdbInfoConfig documentDbConfig,
+ QrMonitorConfig monitorConfig,
DispatchConfig dispatchConfig,
ClusterInfoConfig clusterInfoConfig,
Statistics manager,
@@ -85,8 +104,13 @@ public class ClusterSearcher extends Searcher {
FS4ResourcePool fs4ResourcePool,
VipStatus vipStatus) {
super(id);
+ this.fs4ResourcePool = fs4ResourcePool;
+
+ Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, fs4ResourcePool, clusterInfoConfig.nodeCount(), vipStatus, metric);
- Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), vipStatus, metric);
+ monitor = (dispatcher.searchCluster().directDispatchTarget().isPresent()) // dispatcher should decide vip status instead
+ ? new ClusterMonitor(this, monitorConfig, Optional.empty())
+ : new ClusterMonitor(this, monitorConfig, Optional.of(vipStatus));
int searchClusterIndex = clusterConfig.clusterId();
clusterModelName = clusterConfig.clusterName();
@@ -124,8 +148,9 @@ public class ClusterSearcher extends Searcher {
for (int dispatcherIndex = 0; dispatcherIndex < searchClusterConfig.dispatcher().size(); dispatcherIndex++) {
try {
if ( ! isRemote(searchClusterConfig.dispatcher(dispatcherIndex).host())) {
- FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool.getServerId(), docSumParams,
- documentDbConfig, dispatcher, dispatcherIndex);
+ Backend dispatchBackend = createBackend(searchClusterConfig.dispatcher(dispatcherIndex));
+ FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool, docSumParams,
+ documentDbConfig, dispatchBackend, dispatcher, dispatcherIndex);
addBackendSearcher(searcher);
}
} catch (UnknownHostException e) {
@@ -137,6 +162,8 @@ public class ClusterSearcher extends Searcher {
if ( server == null ) {
throw new IllegalStateException("ClusterSearcher should have a top level dispatch.");
}
+ monitor.freeze();
+ monitor.startPingThread();
}
private static QrSearchersConfig.Searchcluster getSearchClusterConfigFromClusterName(QrSearchersConfig config, String name) {
@@ -162,14 +189,15 @@ public class ClusterSearcher extends Searcher {
}
private static FastSearcher searchDispatch(int searchclusterIndex,
- String serverId,
+ FS4ResourcePool fs4ResourcePool,
SummaryParameters docSumParams,
DocumentdbInfoConfig documentdbInfoConfig,
+ Backend backend,
Dispatcher dispatcher,
int dispatcherIndex)
{
ClusterParams clusterParams = makeClusterParams(searchclusterIndex, dispatcherIndex);
- return new FastSearcher(serverId, dispatcher, docSumParams, clusterParams, documentdbInfoConfig);
+ return new FastSearcher(backend, fs4ResourcePool, dispatcher, docSumParams, clusterParams, documentdbInfoConfig);
}
private static VdsStreamingSearcher vdsCluster(String serverId,
@@ -194,14 +222,25 @@ public class ClusterSearcher extends Searcher {
/** Do not use, for internal testing purposes only. **/
ClusterSearcher(Set<String> documentTypes) {
this.documentTypes = documentTypes;
+ monitor = new ClusterMonitor(this, new QrMonitorConfig(new QrMonitorConfig.Builder()), Optional.of(new VipStatus()));
cacheHitRatio = new Value("com.yahoo.prelude.cluster.ClusterSearcher.ClusterSearcher().dummy",
Statistics.nullImplementation, new Value.Parameters());
clusterModelName = "testScenario";
+ fs4ResourcePool = null;
maxQueryTimeout = DEFAULT_MAX_QUERY_TIMEOUT;
maxQueryCacheTimeout = DEFAULT_MAX_QUERY_CACHE_TIMEOUT;
}
+ private Backend createBackend(QrSearchersConfig.Searchcluster.Dispatcher disp) {
+ return fs4ResourcePool.getBackend(disp.host(), disp.port());
+ }
+
+ ClusterMonitor getMonitor() {
+ return monitor;
+ }
+
void addBackendSearcher(VespaBackEndSearcher searcher) {
+ monitor.add(searcher);
server = searcher;
}
@@ -440,6 +479,77 @@ public class ClusterSearcher extends Searcher {
cacheHitRatio.put(0.0);
}
+ /** NodeManager method, called from ClusterMonitor. */
+ void working(VespaBackEndSearcher node) {
+ server = node;
+ }
+
+ /** Called from ClusterMonitor. */
+ void failed(VespaBackEndSearcher node) {
+ server = null;
+ }
+
+ /**
+ * Pinging a node, called from ClusterMonitor.
+ */
+ void ping(VespaBackEndSearcher node) throws InterruptedException {
+ log.fine("Sending ping to: " + node);
+ Pinger pinger = new Pinger(node);
+
+ getExecutor().execute(pinger);
+ Pong pong = pinger.getPong(); // handles timeout
+ if (pong == null) {
+ monitor.failed(node, ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out."));
+ } else if (pong.badResponse()) {
+ monitor.failed(node, pong.getError(0));
+ } else {
+ monitor.responded(node, backendCanServeDocuments(pong));
+ }
+ }
+
+ private boolean backendCanServeDocuments(Pong pong) {
+ if ( ! pong.activeNodes().isPresent()) return true; // no information; assume true
+ return pong.activeNodes().get() > 0;
+ }
+
@Override
- public void deconstruct() { }
+ public void deconstruct() {
+ monitor.shutdown();
+ }
+
+ ExecutorService getExecutor() {
+ return fs4ResourcePool.getExecutor();
+ }
+
+ ScheduledExecutorService getScheduledExecutor() {
+ return fs4ResourcePool.getScheduledExecutor();
+ }
+
+ private class Pinger implements Runnable {
+
+ private final Searcher searcher;
+ private final Ping pingChallenge = new Ping(monitor.getConfiguration().getRequestTimeout());
+ private final Receiver<Pong> pong = new Receiver<>();
+
+ Pinger(final Searcher searcher) {
+ this.searcher = searcher;
+ }
+
+ @Override
+ public void run() {
+ pong.put(createExecution().ping(pingChallenge));
+ }
+
+ private Execution createExecution() {
+ return new Execution(new Chain<>(searcher),
+ new Execution.Context(null, null, null, null, null));
+ }
+
+ public Pong getPong() throws InterruptedException {
+ Tuple2<MessageState, Pong> reply = pong.get(pingChallenge.getTimeout() + 150);
+ return (reply.first != MessageState.VALID) ? null : reply.second;
+ }
+
+ }
+
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java
index de07839e3e3..524e842eacd 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java
@@ -10,6 +10,8 @@
package com.yahoo.prelude.fastsearch;
+import java.nio.ByteBuffer;
+
import com.yahoo.prelude.hitfield.RawData;
import com.yahoo.data.access.simple.Value;
import com.yahoo.data.access.Inspector;
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java
index 329a9caaf91..4f52ef91725 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java
@@ -6,6 +6,7 @@ import com.yahoo.log.LogLevel;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java
index f6f8006d2d2..6b1445229ec 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java
@@ -1,6 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.fastsearch;
+
+import java.nio.ByteBuffer;
+
import com.yahoo.search.result.NanNumber;
import com.yahoo.data.access.Inspector;
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java
new file mode 100644
index 00000000000..59bc781c8b2
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java
@@ -0,0 +1,184 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.fastsearch;
+
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.GetDocSumsPacket;
+import com.yahoo.fs4.Packet;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.FillInvoker;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator;
+
+/**
+ * {@link FillInvoker} implementation for FS4 nodes and fdispatch
+ *
+ * @author ollivir
+ */
+public class FS4FillInvoker extends FillInvoker {
+ private final VespaBackEndSearcher searcher;
+ private FS4Channel channel;
+
+ private int expectedFillResults = 0;
+
+ // fdispatch code path
+ FS4FillInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) {
+ this.searcher = searcher;
+ this.channel = backend.openChannel();
+ channel.setQuery(query);
+ }
+
+ @Override
+ protected void sendFillRequest(Result result, String summaryClass) {
+
+ if (countUnfilledFastHits(result, summaryClass) > 0) {
+ try {
+ expectedFillResults = requestSummaries(result, summaryClass);
+ } catch (InvalidChannelException e) {
+ result.hits()
+ .addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)"));
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage()));
+ }
+ } else {
+ expectedFillResults = 0;
+ }
+ }
+
+
+ @Override
+ protected void getFillResults(Result result, String summaryClass) {
+ if (expectedFillResults == 0) {
+ return;
+ }
+
+ Packet[] receivedPackets;
+ try {
+ receivedPackets = getSummaryResponses(result);
+ } catch (InvalidChannelException e1) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)"));
+ return;
+ } catch (ChannelTimeoutException e1) {
+ result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName()));
+ return;
+ }
+
+ if (receivedPackets.length == 0) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)"));
+ return;
+ }
+
+ int skippedHits;
+ try {
+ FillHitsResult fillHitsResult = searcher.fillHits(result, receivedPackets, summaryClass);
+ skippedHits = fillHitsResult.skippedHits;
+ if (fillHitsResult.error != null) {
+ result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error));
+ return;
+ }
+ } catch (TimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout(e.getMessage()));
+ return;
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage()));
+ return;
+ }
+
+ if (skippedHits > 0)
+ result.hits().addError(
+ ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits"));
+ result.analyzeHits();
+
+ if (channel.getQuery().getTraceLevel() >= 3) {
+ int hitNumber = 0;
+ for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) {
+ com.yahoo.search.result.Hit hit = i.next();
+ if (!(hit instanceof FastHit))
+ continue;
+ FastHit fastHit = (FastHit) hit;
+
+ String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend");
+ if (!fastHit.isFilled(summaryClass))
+ traceMsg += ". Error, hit, not filled";
+ channel.getQuery().trace(traceMsg, false, 3);
+ }
+ }
+ }
+
+ @Override
+ public void release() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ }
+
+ private int countUnfilledFastHits(Result result, String summaryClass) {
+ int count = 0;
+ for (Iterator<Hit> i = hitIterator(result); i.hasNext();) {
+ Hit hit = i.next();
+ if (hit instanceof FastHit && !hit.isFilled(summaryClass))
+ count++;
+ }
+ return count;
+ }
+
+ private int requestSummaries(Result result, String summaryClass) throws InvalidChannelException, ClassCastException, IOException {
+
+ boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery());
+ if (result.getQuery().getTraceLevel() >= 3)
+ result.getQuery().trace((summaryNeedsQuery ? "FS4: Resending " : "Not resending ") + "query during document summary fetching", 3);
+
+ GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery);
+ int compressionLimit = result.getQuery().properties().getInteger(FS4SearchInvoker.PACKET_COMPRESSION_LIMIT, 0);
+ docsumsPacket.setCompressionLimit(compressionLimit);
+ if (compressionLimit != 0) {
+ docsumsPacket.setCompressionType(result.getQuery().properties().getString(FS4SearchInvoker.PACKET_COMPRESSION_TYPE, "lz4"));
+ }
+
+ boolean couldSend = channel.sendPacket(docsumsPacket);
+ if (!couldSend)
+ throw new IOException("Could not successfully send GetDocSumsPacket.");
+
+ return docsumsPacket.getNumDocsums() + 1;
+ }
+
+ private Packet[] getSummaryResponses(Result result) throws InvalidChannelException, ChannelTimeoutException {
+ if(expectedFillResults == 0) {
+ return new Packet[0];
+ }
+ BasicPacket[] receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), expectedFillResults);
+
+ return convertBasicPackets(receivedPackets);
+ }
+
+ private static Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException {
+ // trying to cast a BasicPacket[] to Packet[] will compile,
+ // but lead to a runtime error. At least that's what I got
+ // from testing and reading the specification. I'm just happy
+ // if someone tells me what's the proper Java way of doing
+ // this. -SK
+ Packet[] packets = new Packet[basicPackets.length];
+
+ for (int i = 0; i < basicPackets.length; i++) {
+ packets[i] = (Packet) basicPackets[i];
+ }
+ return packets;
+ }
+
+ private String getName() {
+ return searcher.getName();
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java
new file mode 100644
index 00000000000..2abaf341c58
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java
@@ -0,0 +1,29 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.fastsearch;
+
+import com.yahoo.prelude.Pong;
+import com.yahoo.search.cluster.ClusterMonitor;
+import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.dispatch.searchcluster.PingFactory;
+import com.yahoo.search.dispatch.searchcluster.Pinger;
+
+import java.util.concurrent.Callable;
+
+/**
+ * FS4PingFactory constructs {@link Pinger} objects that communicate with
+ * content nodes or dispatchers over the fnet/FS4 protocol
+ *
+ * @author ollivir
+ */
+public class FS4PingFactory implements PingFactory {
+ private final FS4ResourcePool fs4ResourcePool;
+
+ public FS4PingFactory(FS4ResourcePool fs4ResourcePool) {
+ this.fs4ResourcePool = fs4ResourcePool;
+ }
+
+ @Override
+ public Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor) {
+ return new Pinger(node, monitor, fs4ResourcePool);
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java
index ed9eb72d7dd..f85a4019b78 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java
@@ -5,6 +5,14 @@ import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.container.QrConfig;
+import com.yahoo.container.search.Fs4Config;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.ConnectionPool;
+import com.yahoo.fs4.mplex.ListenerPool;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -14,7 +22,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * All users will get the same pool instance.
+ * Provider for {@link com.yahoo.fs4.mplex.ListenerPool}. All users will get the same pool instance.
*
* @author baldersheim
*/
@@ -24,18 +32,22 @@ public class FS4ResourcePool extends AbstractComponent {
private static final AtomicInteger instanceCounter = new AtomicInteger(0);
private final String serverId;
private final int instanceId;
+ private final ListenerPool listeners;
+ private final Timer timer = new Timer(); // This is a timer for cleaning the closed connections
+ private final Map<String, Backend> connectionPoolMap = new HashMap<>();
private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
@Inject
- public FS4ResourcePool(QrConfig config) {
- this(config.discriminator());
+ public FS4ResourcePool(Fs4Config fs4Config, QrConfig config) {
+ this(config.discriminator(), fs4Config.numlistenerthreads());
}
- public FS4ResourcePool(String serverId) {
+ public FS4ResourcePool(String serverId, int listenerThreads) {
this.serverId = serverId;
instanceId = instanceCounter.getAndIncrement();
String name = "FS4-" + instanceId;
+ listeners = new ListenerPool(name, listenerThreads);
executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory(name));
scheduledExecutor = Executors.newScheduledThreadPool(1, ThreadFactoryFactory.getDaemonThreadFactory(name + ".scheduled"));
}
@@ -45,10 +57,28 @@ public class FS4ResourcePool extends AbstractComponent {
public ExecutorService getExecutor() { return executor; }
public ScheduledExecutorService getScheduledExecutor() { return scheduledExecutor; }
+ public Backend getBackend(String host, int port) {
+ String key = host + ":" + port;
+ synchronized (connectionPoolMap) {
+ Backend pool = connectionPoolMap.get(key);
+ if (pool == null) {
+ pool = new Backend(host, port, serverId, listeners, new ConnectionPool(timer));
+ connectionPoolMap.put(key, pool);
+ }
+ return pool;
+ }
+ }
+
@Override
public void deconstruct() {
logger.log(Level.INFO, "Deconstructing FS4ResourcePool with id '" + instanceId + "'.");
super.deconstruct();
+ listeners.close();
+ timer.cancel();
+ for (Backend backend : connectionPoolMap.values()) {
+ backend.shutdown();
+ backend.close();
+ }
executor.shutdown();
scheduledExecutor.shutdown();
try {
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
new file mode 100644
index 00000000000..f3867288b29
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
@@ -0,0 +1,220 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.fastsearch;
+
+import com.yahoo.data.access.simple.Value;
+import com.yahoo.data.access.slime.SlimeAdapter;
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.DocumentInfo;
+import com.yahoo.fs4.FS4Properties;
+import com.yahoo.fs4.QueryPacket;
+import com.yahoo.fs4.QueryPacketData;
+import com.yahoo.fs4.QueryResultPacket;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
+import com.yahoo.io.GrowableByteBuffer;
+import com.yahoo.log.LogLevel;
+import com.yahoo.prelude.ConfigurationException;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.InvokerResult;
+import com.yahoo.search.dispatch.LeanHit;
+import com.yahoo.search.dispatch.ResponseMonitor;
+import com.yahoo.search.dispatch.SearchInvoker;
+import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.query.Sorting;
+import com.yahoo.search.result.Coverage;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Relevance;
+import com.yahoo.search.searchchain.Execution;
+import com.yahoo.searchlib.aggregation.Grouping;
+import com.yahoo.slime.BinaryFormat;
+import com.yahoo.vespa.objects.BufferSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.logging.Logger;
+
+/**
+ * {@link SearchInvoker} implementation for FS4 nodes and fdispatch
+ *
+ * @author ollivir
+ */
+public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<FS4Channel> {
+ static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit");
+ static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype");
+
+ private static final Logger log = Logger.getLogger(FS4SearchInvoker.class.getName());
+
+ private final VespaBackEndSearcher searcher;
+ private FS4Channel channel;
+
+ private ErrorMessage pendingSearchError = null;
+ private Query query = null;
+ private QueryPacket queryPacket = null;
+
+ public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, Optional<Node> node) {
+ super(node);
+ this.searcher = searcher;
+ this.channel = channel;
+
+ channel.setQuery(query);
+ channel.setResponseMonitor(this);
+ }
+
+ @Override
+ protected void sendSearchRequest(Query query) throws IOException {
+ log.finest("sending query packet");
+
+ this.query = query;
+ createQueryPacket(searcher.getServerId(), query);
+
+ try {
+ boolean couldSend = channel.sendPacket(queryPacket);
+ if (!couldSend) {
+ setPendingError("Could not reach '" + getName() + "'");
+ }
+ } catch (InvalidChannelException e) {
+ setPendingError("Invalid channel " + getName());
+ } catch (IllegalStateException e) {
+ setPendingError("Illegal state in FS4: " + e.getMessage());
+ }
+ }
+
+ private void setPendingError(String message) {
+ pendingSearchError = ErrorMessage.createBackendCommunicationError(message);
+ responseAvailable();
+ }
+
+ @Override
+ protected InvokerResult getSearchResult(Execution execution) throws IOException {
+ if (pendingSearchError != null) {
+ return errorResult(query, pendingSearchError);
+ }
+ BasicPacket[] basicPackets;
+
+ try {
+ basicPackets = channel.receivePackets(query.getTimeLeft(), 1);
+ } catch (ChannelTimeoutException e) {
+ return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
+ } catch (InvalidChannelException e) {
+ return errorResult(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName()));
+ }
+
+ if (basicPackets.length == 0) {
+ return errorResult(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back"));
+ }
+
+ log.finest(() -> "got packets " + basicPackets.length + " packets");
+
+ basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName());
+ QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0];
+
+ log.finest(() -> "got query packet. " + "docsumClass=" + query.getPresentation().getSummary());
+
+ if (query.getPresentation().getSummary() == null)
+ query.getPresentation().setSummary(searcher.getDefaultDocsumClass());
+
+ InvokerResult result = new InvokerResult(query, resultPacket.getDocumentCount());
+
+ addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result.getResult());
+ addUnfilledHits(result.getLeanHits(), resultPacket.getDocuments(), queryPacket.getQueryPacketData());
+
+ return result;
+ }
+
+ private QueryPacket createQueryPacket(String serverId, Query query) {
+ this.queryPacket = QueryPacket.create(serverId, query);
+ int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0);
+ queryPacket.setCompressionLimit(compressionLimit);
+ if (compressionLimit != 0) {
+ queryPacket.setCompressionType(query.properties().getString(PACKET_COMPRESSION_TYPE, "lz4"));
+ }
+ log.fine(() -> "made QueryPacket: " + queryPacket);
+
+ return queryPacket;
+ }
+
+ private void addMetaInfo(Query query, QueryPacketData queryPacketData, QueryResultPacket resultPacket, Result result) {
+ result.setTotalHitCount(resultPacket.getTotalDocumentCount());
+
+ addBackendTrace(query, resultPacket);
+
+ // Grouping
+ if (resultPacket.getGroupData() != null) {
+ byte[] data = resultPacket.getGroupData();
+ ArrayList<Grouping> list = new ArrayList<>();
+ BufferSerializer buf = new BufferSerializer(new GrowableByteBuffer(ByteBuffer.wrap(data)));
+ int cnt = buf.getInt(null);
+ for (int i = 0; i < cnt; i++) {
+ Grouping g = new Grouping();
+ g.deserialize(buf);
+ list.add(g);
+ }
+ GroupingListHit hit = new GroupingListHit(list, searcher.getDocsumDefinitionSet(query));
+ hit.setQuery(result.getQuery());
+ hit.setSource(getName());
+ hit.setQueryPacketData(queryPacketData);
+ result.hits().add(hit);
+ }
+
+ if (resultPacket.getCoverageFeature()) {
+ result.setCoverage(new Coverage(resultPacket.getCoverageDocs(), resultPacket.getActiveDocs(), resultPacket.getNodesReplied())
+ .setSoonActive(resultPacket.getSoonActiveDocs())
+ .setDegradedReason(resultPacket.getDegradedReason())
+ .setNodesTried(resultPacket.getNodesQueried()));
+ }
+ }
+
+ private void addBackendTrace(Query query, QueryResultPacket resultPacket) {
+ if (resultPacket.propsArray == null) return;
+ Value.ArrayValue traces = new Value.ArrayValue();
+ for (FS4Properties properties : resultPacket.propsArray) {
+ if ( ! properties.getName().equals("trace")) continue;
+ for (FS4Properties.Entry entry : properties.getEntries()) {
+ traces.add(new SlimeAdapter(BinaryFormat.decode(entry.getValue()).get()));
+ }
+ }
+ query.trace(traces, query.getTraceLevel());
+ }
+
+ /**
+ * Creates unfilled hits from a List of DocumentInfo instances.
+ */
+ private void addUnfilledHits(List<LeanHit> result, List<DocumentInfo> documents, QueryPacketData queryPacketData) {
+ Optional<Integer> channelDistributionKey = distributionKey();
+
+ for (DocumentInfo document : documents) {
+ byte [] sortData = document.getSortData();
+ LeanHit hit = (sortData == null)
+ ? new LeanHit(document.getRawGlobalId(), document.getPartId(), channelDistributionKey.orElse(document.getDistributionKey()), document.getMetric())
+ : new LeanHit(document.getRawGlobalId(), document.getPartId(), channelDistributionKey.orElse(document.getDistributionKey()), document.getSortData());
+ if (queryPacketData != null) {
+ hit.setQueryPacketData(queryPacketData);
+ }
+ result.add(hit);
+ }
+ }
+
+ @Override
+ public void release() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ }
+
+ private String getName() {
+ return searcher.getName();
+ }
+
+ @Override
+ public void responseAvailable(FS4Channel from) {
+ responseAvailable();
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java
index 244fad4efde..f7f2d08d713 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java
@@ -3,6 +3,7 @@ package com.yahoo.prelude.fastsearch;
import com.yahoo.data.access.ObjectTraverser;
import com.yahoo.document.GlobalId;
+import com.yahoo.fs4.QueryPacketData;
import com.yahoo.net.URI;
import com.yahoo.search.query.Sorting;
import com.yahoo.search.result.Hit;
@@ -39,6 +40,9 @@ public class FastHit extends Hit {
/** The global id of this document in the backend node which produced it */
private byte [] globalId;
+ //TODO Remove with fs4
+ private transient QueryPacketData queryPacketData = null;
+
private transient byte[] sortData = null;
// TODO I supect this one can be dropped.
private transient Sorting sortDataSorting = null;
@@ -143,6 +147,27 @@ public class FastHit extends Hit {
/** Sets the index of the node this hit originated at */
public void setDistributionKey(int distributionKey) { this.distributionKey = distributionKey; }
+ /**
+ * Add the binary data common for the query packet to a Vespa backend and a
+ * summary fetch packet to a Vespa backend. This method can only be called
+ * once for a single hit.
+ *
+ * @param queryPacketData binary data from a query packet resulting in this hit
+ * @throws IllegalStateException if the method is called more than once
+ * @throws NullPointerException if trying to set query packet data to null
+ */
+ public void setQueryPacketData(QueryPacketData queryPacketData) {
+ if (this.queryPacketData != null)
+ throw new IllegalStateException("Query packet data already set to "
+ + this.queryPacketData + ", tried to set it to " + queryPacketData);
+ if (queryPacketData == null)
+ throw new NullPointerException("Query packet data reference can not be set to null.");
+ this.queryPacketData = queryPacketData;
+ }
+
+ /** Returns a serial encoding of the query which produced this hit, ot null if not available. */
+ public QueryPacketData getQueryPacketData() { return queryPacketData; }
+
public void setSortData(byte[] data, Sorting sorting) {
this.sortData = data;
this.sortDataSorting = sorting;
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index b0b3a7800e9..6b0041a9e86 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -1,14 +1,23 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.fastsearch;
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.PingPacket;
+import com.yahoo.fs4.PongPacket;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
import com.yahoo.prelude.Ping;
import com.yahoo.prelude.Pong;
import com.yahoo.prelude.querytransform.QueryRewrite;
+import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.SearchInvoker;
+import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.grouping.GroupingRequest;
import com.yahoo.search.grouping.request.GroupingOperation;
import com.yahoo.search.query.Ranking;
@@ -37,13 +46,21 @@ import static com.yahoo.container.util.Util.quote;
// catch and unwrap into a results with an error in high level methods. -Jon
public class FastSearcher extends VespaBackEndSearcher {
+ /** If this is turned on this will make search queries directly to the local search node when possible */
+ private final static CompoundName dispatchDirect = new CompoundName("dispatch.direct");
+
/** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */
private final Dispatcher dispatcher;
+ private final Backend dispatchBackend;
+ private final FS4ResourcePool fs4ResourcePool;
+
/**
* Creates a Fastsearcher.
*
- * @param serverId the resource pool used to create direct connections to the local search nodes when
+ * @param dispatchBackend The backend object containing the connection to the dispatch node this should talk to
+ * over the fs4 protocol
+ * @param fs4ResourcePool the resource pool used to create direct connections to the local search nodes when
* bypassing the dispatch node
* @param dispatcher the dispatcher used (when enabled) to send summary requests over the rpc protocol.
* Eventually we will move everything to this protocol and never use dispatch nodes.
@@ -53,11 +70,13 @@ public class FastSearcher extends VespaBackEndSearcher {
* @param clusterParams the cluster number, and other cluster backend parameters
* @param documentdbInfoConfig document database parameters
*/
- public FastSearcher(String serverId, Dispatcher dispatcher,
+ public FastSearcher(Backend dispatchBackend, FS4ResourcePool fs4ResourcePool, Dispatcher dispatcher,
SummaryParameters docSumParams, ClusterParams clusterParams,
DocumentdbInfoConfig documentdbInfoConfig) {
- init(serverId, docSumParams, clusterParams, documentdbInfoConfig);
+ init(fs4ResourcePool.getServerId(), docSumParams, clusterParams, documentdbInfoConfig);
+ this.dispatchBackend = dispatchBackend;
this.dispatcher = dispatcher;
+ this.fs4ResourcePool = fs4ResourcePool;
}
/**
@@ -65,7 +84,58 @@ public class FastSearcher extends VespaBackEndSearcher {
*/
@Override
public Pong ping(Ping ping, Execution execution) {
- throw new IllegalStateException("This ping should not have been called.");
+ return ping(ping, dispatchBackend, getName());
+ }
+
+ public static Pong ping(Ping ping, Backend backend, String name) {
+ FS4Channel channel = backend.openPingChannel();
+
+ // If you want to change this code, you need to understand
+ // com.yahoo.prelude.cluster.ClusterSearcher.ping(Searcher) and
+ // com.yahoo.prelude.cluster.TrafficNodeMonitor.failed(ErrorMessage)
+ try {
+ PingPacket pingPacket = new PingPacket();
+ try {
+ boolean couldSend = channel.sendPacket(pingPacket);
+ if ( ! couldSend) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Could not ping " + name));
+ }
+ } catch (InvalidChannelException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel " + name));
+ } catch (IllegalStateException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage()));
+ } catch (IOException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("IO error while sending ping: " + e.getMessage()));
+ }
+
+ // We should only get a single packet
+ BasicPacket[] packets;
+
+ try {
+ packets = channel.receivePackets(ping.getTimeout(), 1);
+ } catch (ChannelTimeoutException e) {
+ return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + name));
+ } catch (InvalidChannelException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel for " + name));
+ }
+
+ if (packets.length == 0) {
+ return new Pong(ErrorMessage.createBackendCommunicationError(name + " got no packets back"));
+ }
+
+ try {
+ packets[0].ensureInstanceOf(PongPacket.class, name);
+ } catch (TimeoutException e) {
+ return new Pong(ErrorMessage.createTimeout(e.getMessage()));
+ } catch (IOException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Unexpected packet class returned after ping: " + e.getMessage()));
+ }
+ return new Pong((PongPacket)packets[0]);
+ } finally {
+ if (channel != null) {
+ channel.close();
+ }
+ }
}
@Override
@@ -147,7 +217,18 @@ public class FastSearcher extends VespaBackEndSearcher {
* on the same host.
*/
private SearchInvoker getSearchInvoker(Query query) {
- return dispatcher.getSearchInvoker(query, this).get();
+ Optional<SearchInvoker> invoker = dispatcher.getSearchInvoker(query, this);
+ if (invoker.isPresent()) {
+ return invoker.get();
+ }
+
+ Optional<Node> direct = getDirectNode(query);
+ if(direct.isPresent()) {
+ var node = direct.get();
+ Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port());
+ return new FS4SearchInvoker(this, query, backend.openChannel(), direct);
+ }
+ return new FS4SearchInvoker(this, query, dispatchBackend.openChannel(), Optional.empty());
}
/**
@@ -156,17 +237,47 @@ public class FastSearcher extends VespaBackEndSearcher {
* content nodes.
*/
private FillInvoker getFillInvoker(Result result) {
- return dispatcher.getFillInvoker(result, this).get();
+ Query query = result.getQuery();
+ Optional<FillInvoker> invoker = dispatcher.getFillInvoker(result, this);
+ if (invoker.isPresent()) {
+ return invoker.get();
+ }
+
+ Optional<Node> direct = getDirectNode(query);
+ if (direct.isPresent()) {
+ var node = direct.get();
+ Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port());
+ return new FS4FillInvoker(this, query, backend);
+ }
+ return new FS4FillInvoker(this, query, dispatchBackend);
}
+ /**
+ * If the query can be directed to a single local content node, returns that node. Otherwise,
+ * returns an empty value.
+ */
+ private Optional<Node> getDirectNode(Query query) {
+ if (!query.properties().getBoolean(dispatchDirect, true))
+ return Optional.empty();
+ if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE))
+ return Optional.empty();
+
+ Optional<Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget();
+ if (!directDispatchRecipient.isPresent())
+ return Optional.empty();
+ // Dispatch directly to the single, local search node
+ Node local = directDispatchRecipient.get();
+ query.trace(false, 2, "Dispatching directly to ", local);
+ return Optional.of(local);
+ }
private static Optional<String> quotedSummaryClass(String summaryClass) {
return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass));
}
public String toString() {
- return "fast searcher (" + getName() + ") ";
+ return "fast searcher (" + getName() + ") " + dispatchBackend;
}
protected boolean isLoggingFine() {
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java
index 740b9592efc..e0d569c6ae1 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java
@@ -3,6 +3,7 @@ package com.yahoo.prelude.fastsearch;
import java.util.List;
+import com.yahoo.fs4.QueryPacketData;
import com.yahoo.search.result.Hit;
import com.yahoo.searchlib.aggregation.Grouping;
@@ -26,4 +27,15 @@ public class GroupingListHit extends Hit {
private final List<Grouping> groupingList;
private final DocsumDefinitionSet defs;
+ private QueryPacketData queryPacketData;
+
+ public void setQueryPacketData(QueryPacketData queryPacketData) {
+ this.queryPacketData = queryPacketData;
+ }
+
+ /** Returns encoded query data from the query used to create this, or null if none present */
+ public QueryPacketData getQueryPacketData() {
+ return queryPacketData;
+ }
+
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java
index 396a84a28bd..f690d9d4da4 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java
@@ -5,6 +5,9 @@
*/
package com.yahoo.prelude.fastsearch;
+
+import java.nio.ByteBuffer;
+
import com.yahoo.search.result.NanNumber;
import com.yahoo.data.access.Inspector;
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java
index bec39393359..a02d9813793 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java
@@ -5,6 +5,9 @@
*/
package com.yahoo.prelude.fastsearch;
+
+import java.nio.ByteBuffer;
+
import com.yahoo.search.result.NanNumber;
import com.yahoo.data.access.Inspector;
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java
index 388c96b453d..bf77c517d50 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java
@@ -5,6 +5,9 @@
*/
package com.yahoo.prelude.fastsearch;
+import java.nio.ByteBuffer;
+
+import com.yahoo.io.SlowInflate;
import com.yahoo.prelude.hitfield.RawData;
import com.yahoo.data.access.simple.Value;
import com.yahoo.data.access.Inspector;
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java
index b94c902693a..5e3d0babe98 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java
@@ -5,6 +5,9 @@
*/
package com.yahoo.prelude.fastsearch;
+
+import java.nio.ByteBuffer;
+
import com.yahoo.search.result.NanNumber;
import com.yahoo.data.access.Inspector;
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java
index 4df12bd82bd..408cbbbb62d 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java
@@ -5,6 +5,10 @@
*/
package com.yahoo.prelude.fastsearch;
+
+import java.nio.ByteBuffer;
+
+import com.yahoo.text.Utf8;
import com.yahoo.data.access.Inspector;
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
index c98cf23ec61..430ad015493 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
@@ -3,6 +3,7 @@ package com.yahoo.prelude.fastsearch;
import com.yahoo.collections.TinyIdentitySet;
import com.yahoo.fs4.DocsumPacket;
+import com.yahoo.fs4.Packet;
import com.yahoo.prelude.query.Item;
import com.yahoo.prelude.query.NullItem;
import com.yahoo.prelude.query.textualrepresentation.TextualQueryRepresentation;
@@ -19,6 +20,7 @@ import com.yahoo.search.result.Hit;
import com.yahoo.search.searchchain.Execution;
import com.yahoo.searchlib.aggregation.Grouping;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Iterator;
@@ -49,7 +51,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
private String defaultDocsumClass = null;
/** Returns an iterator which returns all hits below this result **/
- private static Iterator<Hit> hitIterator(Result result) {
+ static Iterator<Hit> hitIterator(Result result) {
return result.hits().unorderedDeepIterator();
}
@@ -228,7 +230,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
if ((query.getTraceLevel()<level) || query.properties().getBoolean(TRACE_DISABLE)) return;
StringBuilder s = new StringBuilder();
- s.append(sourceName).append(" ").append(type).append(" to dispatch: ")
+ s.append(sourceName).append(" " + type + " to dispatch: ")
.append("query=[")
.append(query.getModel().getQueryTree().getRoot().toString())
.append("]");
@@ -318,7 +320,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
}
}
- private FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) {
+ FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) {
if (packet != null) {
byte[] docsumdata = packet.getData();
if (docsumdata.length > 0) {
@@ -342,7 +344,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
* @return the number of hits that we did not return data for, and an optional error message.
* when things are working normally we return 0.
*/
- protected FillHitsResult fillHits(Result result, DocsumPacket[] packets, String summaryClass) {
+ public FillHitsResult fillHits(Result result, Packet[] packets, String summaryClass) throws IOException {
int skippedHits = 0;
String lastError = null;
int packetIndex = 0;
@@ -352,7 +354,8 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
if (hit instanceof FastHit && ! hit.isFilled(summaryClass)) {
FastHit fastHit = (FastHit) hit;
- DocsumPacket docsum = packets[packetIndex];
+ packets[packetIndex].ensureInstanceOf(DocsumPacket.class, getName());
+ DocsumPacket docsum = (DocsumPacket) packets[packetIndex];
packetIndex++;
FillHitResult fr = fillHit(fastHit, docsum, summaryClass);
@@ -381,7 +384,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
return decodeSummary(summaryClass, hit, docsumdata, db.getDocsumDefinitionSet());
}
- private static String decodeSummary(String summaryClass, FastHit hit, byte[] docsumdata, DocsumDefinitionSet docsumSet) {
+ private String decodeSummary(String summaryClass, FastHit hit, byte[] docsumdata, DocsumDefinitionSet docsumSet) {
String error = docsumSet.lazyDecode(summaryClass, docsumdata, hit);
if (error == null) {
hit.setFilled(summaryClass);
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java
index 00bdc474119..d768dda2657 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java
@@ -1,5 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
+/**
+ * Class converting data (historically XML-encoded) from a document summary field.
+ * This has only been used to represent geographical positions.
+ */
package com.yahoo.prelude.fastsearch;
import com.yahoo.data.access.Inspector;
@@ -8,8 +11,6 @@ import com.yahoo.prelude.hitfield.XMLString;
import com.yahoo.search.result.PositionsData;
/**
- * Class converting data (historically XML-encoded) from a document summary field.
- * This has only been used to represent geographical positions.
* @author Steinar Knutsen
*/
public class XMLField extends DocsumField {