diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude')
20 files changed, 963 insertions, 28 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/Pong.java b/container-search/src/main/java/com/yahoo/prelude/Pong.java index a60fba9a4f7..a6bc3e7975d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/Pong.java +++ b/container-search/src/main/java/com/yahoo/prelude/Pong.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude; +import com.yahoo.fs4.PongPacket; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.statistics.ElapsedTime; @@ -18,19 +19,28 @@ public class Pong { private String pingInfo=""; private final List<ErrorMessage> errors = new ArrayList<>(1); + private final Optional<PongPacket> pongPacket; private ElapsedTime elapsed = new ElapsedTime(); private final Optional<Long> activeDocuments; public Pong() { + this.pongPacket = Optional.empty(); this.activeDocuments = Optional.empty(); } public Pong(ErrorMessage error) { errors.add(error); + this.pongPacket = Optional.empty(); + this.activeDocuments = Optional.empty(); + } + + public Pong(PongPacket pongPacket) { + this.pongPacket = Optional.of(pongPacket); this.activeDocuments = Optional.empty(); } public Pong(long activeDocuments) { + this.pongPacket = Optional.empty(); this.activeDocuments = Optional.of(activeDocuments); } @@ -42,14 +52,21 @@ public class Pong { return errors.get(i); } + public int getErrorSize() { + return errors.size(); + } + /** Returns the number of active documents in the backend responding in this Pong, if available */ public Optional<Long> activeDocuments() { - return activeDocuments; + if (activeDocuments.isPresent()) return activeDocuments; + if ( ! pongPacket.isPresent()) return Optional.empty(); + return pongPacket.get().getActiveDocuments(); } /** Returns the number of nodes which responded to this Pong, if available */ public Optional<Integer> activeNodes() { - return Optional.empty(); + if ( ! pongPacket.isPresent()) return Optional.empty(); + return pongPacket.get().getActiveNodes(); } public List<ErrorMessage> getErrors() { @@ -61,6 +78,16 @@ public class Pong { return ! errors.isEmpty(); } + /** Sets information about the ping used to produce this. This is included when returning the tostring of this. */ + public void setPingInfo(String pingInfo) { + if (pingInfo==null) + pingInfo=""; + this.pingInfo=pingInfo; + } + + /** Returns information about the ping use, or "" (never null) if none */ + public String getPingInfo() { return pingInfo; } + public ElapsedTime getElapsedTime() { return elapsed; } diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java new file mode 100644 index 00000000000..c075a0f842b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java @@ -0,0 +1,161 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.cluster; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.yahoo.component.provider.Freezable; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.search.result.ErrorMessage; + +/** + * Monitors of a cluster of remote nodes. The monitor uses an internal thread + * for node monitoring. + * + * @author bratseth + * @author Steinar Knutsen + */ +public class ClusterMonitor implements Runnable, Freezable { + + // The ping thread wil start using the system, but we cannot be guaranteed that all components + // in the system is up. As a workaround for not being able to find out when the system + // is ready to be used, we wait some time before starting the ping thread + private static final int pingThreadInitialDelayMs = 3000; + + private final MonitorConfiguration configuration; + + private final static Logger log = Logger.getLogger(ClusterMonitor.class.getName()); + + private final ClusterSearcher nodeManager; + + private final Optional<VipStatus> vipStatus; + + /** A map from Node to corresponding MonitoredNode */ + private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>(); + + private ScheduledFuture<?> future; + + private boolean isFrozen = false; + + ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, Optional<VipStatus> vipStatus) { + configuration = new MonitorConfiguration(monitorConfig); + nodeManager = manager; + this.vipStatus = vipStatus; + log.fine("checkInterval is " + configuration.getCheckInterval() + " ms"); + } + + /** Returns the configuration of this cluster monitor */ + MonitorConfiguration getConfiguration() { + return configuration; + } + + void startPingThread() { + if ( ! isFrozen()) + throw new IllegalStateException("Do not start the monitoring thread before the set of " + + "nodes to monitor is complete/the ClusterMonitor is frozen."); + future = nodeManager.getScheduledExecutor().scheduleAtFixedRate(this, pingThreadInitialDelayMs, configuration.getCheckInterval(), TimeUnit.MILLISECONDS); + } + + /** + * Adds a new node for monitoring. + */ + void add(VespaBackEndSearcher node) { + if (isFrozen()) + throw new IllegalStateException("Can not add new nodes after ClusterMonitor has been frozen."); + nodeMonitors.put(node, new NodeMonitor(node)); + updateVipStatus(); + } + + /** Called from ClusterSearcher/NodeManager when a node failed */ + void failed(VespaBackEndSearcher node, ErrorMessage error) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasWorking = monitor.isWorking(); + monitor.failed(error); + if (wasWorking && !monitor.isWorking()) { + log.info("Failed monitoring node '" + node + "' due to '" + error); + nodeManager.failed(node); + } + updateVipStatus(); + } + + /** Called when a node responded */ + void responded(VespaBackEndSearcher node, boolean hasSearchNodesOnline) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasFailing = !monitor.isWorking(); + monitor.responded(hasSearchNodesOnline); + if (wasFailing && monitor.isWorking()) { + log.info("Failed node '" + node + "' started working again."); + nodeManager.working(node); + } + updateVipStatus(); + } + + private void updateVipStatus() { + if ( ! vipStatus.isPresent()) return; + if ( ! hasInformationAboutAllNodes()) return; + + if (hasWorkingNodesWithDocumentsOnline()) { + vipStatus.get().addToRotation(nodeManager.getId().stringValue()); + } else { + vipStatus.get().removeFromRotation(nodeManager.getId().stringValue()); + } + } + + private boolean hasInformationAboutAllNodes() { + for (NodeMonitor monitor : nodeMonitors.values()) { + if ( ! monitor.statusIsKnown()) + return false; + } + return true; + } + + private boolean hasWorkingNodesWithDocumentsOnline() { + for (NodeMonitor node : nodeMonitors.values()) { + if (node.isWorking() && node.searchNodesOnline()) + return true; + } + return false; + } + + /** + * Ping all nodes which needs pinging to discover state changes + */ + private void ping() throws InterruptedException { + for (NodeMonitor monitor : nodeMonitors.values()) { + nodeManager.ping(monitor.getNode()); + } + } + + @Override + public void run() { + log.finest("Activating ping"); + try { + ping(); + } catch (Exception e) { + log.log(Level.WARNING, "Error in monitor thread", e); + } + } + + public void shutdown() { + if (future != null) { + future.cancel(true); + } + } + + @Override + public void freeze() { + isFrozen = true; + + } + + @Override + public boolean isFrozen() { + return isFrozen; + } + +} diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java index 0780d5e9d65..4ffcc0a4330 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java @@ -2,13 +2,20 @@ package com.yahoo.prelude.cluster; import com.yahoo.cloud.config.ClusterInfoConfig; +import com.yahoo.collections.Tuple2; import com.yahoo.component.ComponentId; +import com.yahoo.component.chain.Chain; import com.yahoo.component.chain.dependencies.After; +import com.yahoo.concurrent.Receiver; +import com.yahoo.concurrent.Receiver.MessageState; import com.yahoo.container.QrSearchersConfig; import com.yahoo.container.handler.VipStatus; +import com.yahoo.fs4.mplex.Backend; import com.yahoo.jdisc.Metric; import com.yahoo.net.HostName; import com.yahoo.prelude.IndexFacts; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; import com.yahoo.prelude.fastsearch.FS4ResourcePool; @@ -39,7 +46,11 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Logger; import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.STREAMING; @@ -53,6 +64,10 @@ import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.S @After("*") public class ClusterSearcher extends Searcher { + private final static Logger log = Logger.getLogger(ClusterSearcher.class.getName()); + + private final ClusterMonitor monitor; + private final Value cacheHitRatio; private final String clusterModelName; @@ -63,6 +78,8 @@ public class ClusterSearcher extends Searcher { // Mapping from rank profile names to document types containing them private final Map<String, Set<String>> rankProfiles = new HashMap<>(); + private final FS4ResourcePool fs4ResourcePool; + private final long maxQueryTimeout; // in milliseconds private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L; @@ -71,6 +88,7 @@ public class ClusterSearcher extends Searcher { private VespaBackEndSearcher server = null; + /** * Creates a new ClusterSearcher. */ @@ -78,6 +96,7 @@ public class ClusterSearcher extends Searcher { QrSearchersConfig qrsConfig, ClusterConfig clusterConfig, DocumentdbInfoConfig documentDbConfig, + QrMonitorConfig monitorConfig, DispatchConfig dispatchConfig, ClusterInfoConfig clusterInfoConfig, Statistics manager, @@ -85,8 +104,13 @@ public class ClusterSearcher extends Searcher { FS4ResourcePool fs4ResourcePool, VipStatus vipStatus) { super(id); + this.fs4ResourcePool = fs4ResourcePool; + + Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, fs4ResourcePool, clusterInfoConfig.nodeCount(), vipStatus, metric); - Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), vipStatus, metric); + monitor = (dispatcher.searchCluster().directDispatchTarget().isPresent()) // dispatcher should decide vip status instead + ? new ClusterMonitor(this, monitorConfig, Optional.empty()) + : new ClusterMonitor(this, monitorConfig, Optional.of(vipStatus)); int searchClusterIndex = clusterConfig.clusterId(); clusterModelName = clusterConfig.clusterName(); @@ -124,8 +148,9 @@ public class ClusterSearcher extends Searcher { for (int dispatcherIndex = 0; dispatcherIndex < searchClusterConfig.dispatcher().size(); dispatcherIndex++) { try { if ( ! isRemote(searchClusterConfig.dispatcher(dispatcherIndex).host())) { - FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool.getServerId(), docSumParams, - documentDbConfig, dispatcher, dispatcherIndex); + Backend dispatchBackend = createBackend(searchClusterConfig.dispatcher(dispatcherIndex)); + FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool, docSumParams, + documentDbConfig, dispatchBackend, dispatcher, dispatcherIndex); addBackendSearcher(searcher); } } catch (UnknownHostException e) { @@ -137,6 +162,8 @@ public class ClusterSearcher extends Searcher { if ( server == null ) { throw new IllegalStateException("ClusterSearcher should have a top level dispatch."); } + monitor.freeze(); + monitor.startPingThread(); } private static QrSearchersConfig.Searchcluster getSearchClusterConfigFromClusterName(QrSearchersConfig config, String name) { @@ -162,14 +189,15 @@ public class ClusterSearcher extends Searcher { } private static FastSearcher searchDispatch(int searchclusterIndex, - String serverId, + FS4ResourcePool fs4ResourcePool, SummaryParameters docSumParams, DocumentdbInfoConfig documentdbInfoConfig, + Backend backend, Dispatcher dispatcher, int dispatcherIndex) { ClusterParams clusterParams = makeClusterParams(searchclusterIndex, dispatcherIndex); - return new FastSearcher(serverId, dispatcher, docSumParams, clusterParams, documentdbInfoConfig); + return new FastSearcher(backend, fs4ResourcePool, dispatcher, docSumParams, clusterParams, documentdbInfoConfig); } private static VdsStreamingSearcher vdsCluster(String serverId, @@ -194,14 +222,25 @@ public class ClusterSearcher extends Searcher { /** Do not use, for internal testing purposes only. **/ ClusterSearcher(Set<String> documentTypes) { this.documentTypes = documentTypes; + monitor = new ClusterMonitor(this, new QrMonitorConfig(new QrMonitorConfig.Builder()), Optional.of(new VipStatus())); cacheHitRatio = new Value("com.yahoo.prelude.cluster.ClusterSearcher.ClusterSearcher().dummy", Statistics.nullImplementation, new Value.Parameters()); clusterModelName = "testScenario"; + fs4ResourcePool = null; maxQueryTimeout = DEFAULT_MAX_QUERY_TIMEOUT; maxQueryCacheTimeout = DEFAULT_MAX_QUERY_CACHE_TIMEOUT; } + private Backend createBackend(QrSearchersConfig.Searchcluster.Dispatcher disp) { + return fs4ResourcePool.getBackend(disp.host(), disp.port()); + } + + ClusterMonitor getMonitor() { + return monitor; + } + void addBackendSearcher(VespaBackEndSearcher searcher) { + monitor.add(searcher); server = searcher; } @@ -440,6 +479,77 @@ public class ClusterSearcher extends Searcher { cacheHitRatio.put(0.0); } + /** NodeManager method, called from ClusterMonitor. */ + void working(VespaBackEndSearcher node) { + server = node; + } + + /** Called from ClusterMonitor. */ + void failed(VespaBackEndSearcher node) { + server = null; + } + + /** + * Pinging a node, called from ClusterMonitor. + */ + void ping(VespaBackEndSearcher node) throws InterruptedException { + log.fine("Sending ping to: " + node); + Pinger pinger = new Pinger(node); + + getExecutor().execute(pinger); + Pong pong = pinger.getPong(); // handles timeout + if (pong == null) { + monitor.failed(node, ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out.")); + } else if (pong.badResponse()) { + monitor.failed(node, pong.getError(0)); + } else { + monitor.responded(node, backendCanServeDocuments(pong)); + } + } + + private boolean backendCanServeDocuments(Pong pong) { + if ( ! pong.activeNodes().isPresent()) return true; // no information; assume true + return pong.activeNodes().get() > 0; + } + @Override - public void deconstruct() { } + public void deconstruct() { + monitor.shutdown(); + } + + ExecutorService getExecutor() { + return fs4ResourcePool.getExecutor(); + } + + ScheduledExecutorService getScheduledExecutor() { + return fs4ResourcePool.getScheduledExecutor(); + } + + private class Pinger implements Runnable { + + private final Searcher searcher; + private final Ping pingChallenge = new Ping(monitor.getConfiguration().getRequestTimeout()); + private final Receiver<Pong> pong = new Receiver<>(); + + Pinger(final Searcher searcher) { + this.searcher = searcher; + } + + @Override + public void run() { + pong.put(createExecution().ping(pingChallenge)); + } + + private Execution createExecution() { + return new Execution(new Chain<>(searcher), + new Execution.Context(null, null, null, null, null)); + } + + public Pong getPong() throws InterruptedException { + Tuple2<MessageState, Pong> reply = pong.get(pingChallenge.getTimeout() + 150); + return (reply.first != MessageState.VALID) ? null : reply.second; + } + + } + } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java index de07839e3e3..524e842eacd 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DataField.java @@ -10,6 +10,8 @@ package com.yahoo.prelude.fastsearch; +import java.nio.ByteBuffer; + import com.yahoo.prelude.hitfield.RawData; import com.yahoo.data.access.simple.Value; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java index 329a9caaf91..4f52ef91725 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumField.java @@ -6,6 +6,7 @@ import com.yahoo.log.LogLevel; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java index f6f8006d2d2..6b1445229ec 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DoubleField.java @@ -1,6 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java new file mode 100644 index 00000000000..59bc781c8b2 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java @@ -0,0 +1,184 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.GetDocSumsPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.FillInvoker; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; + +import java.io.IOException; +import java.util.Iterator; + +import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator; + +/** + * {@link FillInvoker} implementation for FS4 nodes and fdispatch + * + * @author ollivir + */ +public class FS4FillInvoker extends FillInvoker { + private final VespaBackEndSearcher searcher; + private FS4Channel channel; + + private int expectedFillResults = 0; + + // fdispatch code path + FS4FillInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) { + this.searcher = searcher; + this.channel = backend.openChannel(); + channel.setQuery(query); + } + + @Override + protected void sendFillRequest(Result result, String summaryClass) { + + if (countUnfilledFastHits(result, summaryClass) > 0) { + try { + expectedFillResults = requestSummaries(result, summaryClass); + } catch (InvalidChannelException e) { + result.hits() + .addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); + } catch (IOException e) { + result.hits().addError(ErrorMessage.createBackendCommunicationError( + "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage())); + } + } else { + expectedFillResults = 0; + } + } + + + @Override + protected void getFillResults(Result result, String summaryClass) { + if (expectedFillResults == 0) { + return; + } + + Packet[] receivedPackets; + try { + receivedPackets = getSummaryResponses(result); + } catch (InvalidChannelException e1) { + result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); + return; + } catch (ChannelTimeoutException e1) { + result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName())); + return; + } + + if (receivedPackets.length == 0) { + result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)")); + return; + } + + int skippedHits; + try { + FillHitsResult fillHitsResult = searcher.fillHits(result, receivedPackets, summaryClass); + skippedHits = fillHitsResult.skippedHits; + if (fillHitsResult.error != null) { + result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); + return; + } + } catch (TimeoutException e) { + result.hits().addError(ErrorMessage.createTimeout(e.getMessage())); + return; + } catch (IOException e) { + result.hits().addError(ErrorMessage.createBackendCommunicationError( + "Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage())); + return; + } + + if (skippedHits > 0) + result.hits().addError( + ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits")); + result.analyzeHits(); + + if (channel.getQuery().getTraceLevel() >= 3) { + int hitNumber = 0; + for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) { + com.yahoo.search.result.Hit hit = i.next(); + if (!(hit instanceof FastHit)) + continue; + FastHit fastHit = (FastHit) hit; + + String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend"); + if (!fastHit.isFilled(summaryClass)) + traceMsg += ". Error, hit, not filled"; + channel.getQuery().trace(traceMsg, false, 3); + } + } + } + + @Override + public void release() { + if (channel != null) { + channel.close(); + channel = null; + } + } + + private int countUnfilledFastHits(Result result, String summaryClass) { + int count = 0; + for (Iterator<Hit> i = hitIterator(result); i.hasNext();) { + Hit hit = i.next(); + if (hit instanceof FastHit && !hit.isFilled(summaryClass)) + count++; + } + return count; + } + + private int requestSummaries(Result result, String summaryClass) throws InvalidChannelException, ClassCastException, IOException { + + boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery()); + if (result.getQuery().getTraceLevel() >= 3) + result.getQuery().trace((summaryNeedsQuery ? "FS4: Resending " : "Not resending ") + "query during document summary fetching", 3); + + GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery); + int compressionLimit = result.getQuery().properties().getInteger(FS4SearchInvoker.PACKET_COMPRESSION_LIMIT, 0); + docsumsPacket.setCompressionLimit(compressionLimit); + if (compressionLimit != 0) { + docsumsPacket.setCompressionType(result.getQuery().properties().getString(FS4SearchInvoker.PACKET_COMPRESSION_TYPE, "lz4")); + } + + boolean couldSend = channel.sendPacket(docsumsPacket); + if (!couldSend) + throw new IOException("Could not successfully send GetDocSumsPacket."); + + return docsumsPacket.getNumDocsums() + 1; + } + + private Packet[] getSummaryResponses(Result result) throws InvalidChannelException, ChannelTimeoutException { + if(expectedFillResults == 0) { + return new Packet[0]; + } + BasicPacket[] receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), expectedFillResults); + + return convertBasicPackets(receivedPackets); + } + + private static Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException { + // trying to cast a BasicPacket[] to Packet[] will compile, + // but lead to a runtime error. At least that's what I got + // from testing and reading the specification. I'm just happy + // if someone tells me what's the proper Java way of doing + // this. -SK + Packet[] packets = new Packet[basicPackets.length]; + + for (int i = 0; i < basicPackets.length; i++) { + packets[i] = (Packet) basicPackets[i]; + } + return packets; + } + + private String getName() { + return searcher.getName(); + } +} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java new file mode 100644 index 00000000000..2abaf341c58 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4PingFactory.java @@ -0,0 +1,29 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch; + +import com.yahoo.prelude.Pong; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.PingFactory; +import com.yahoo.search.dispatch.searchcluster.Pinger; + +import java.util.concurrent.Callable; + +/** + * FS4PingFactory constructs {@link Pinger} objects that communicate with + * content nodes or dispatchers over the fnet/FS4 protocol + * + * @author ollivir + */ +public class FS4PingFactory implements PingFactory { + private final FS4ResourcePool fs4ResourcePool; + + public FS4PingFactory(FS4ResourcePool fs4ResourcePool) { + this.fs4ResourcePool = fs4ResourcePool; + } + + @Override + public Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor) { + return new Pinger(node, monitor, fs4ResourcePool); + } +} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java index ed9eb72d7dd..f85a4019b78 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4ResourcePool.java @@ -5,6 +5,14 @@ import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.QrConfig; +import com.yahoo.container.search.Fs4Config; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.ConnectionPool; +import com.yahoo.fs4.mplex.ListenerPool; + +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -14,7 +22,7 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * All users will get the same pool instance. + * Provider for {@link com.yahoo.fs4.mplex.ListenerPool}. All users will get the same pool instance. * * @author baldersheim */ @@ -24,18 +32,22 @@ public class FS4ResourcePool extends AbstractComponent { private static final AtomicInteger instanceCounter = new AtomicInteger(0); private final String serverId; private final int instanceId; + private final ListenerPool listeners; + private final Timer timer = new Timer(); // This is a timer for cleaning the closed connections + private final Map<String, Backend> connectionPoolMap = new HashMap<>(); private final ExecutorService executor; private final ScheduledExecutorService scheduledExecutor; @Inject - public FS4ResourcePool(QrConfig config) { - this(config.discriminator()); + public FS4ResourcePool(Fs4Config fs4Config, QrConfig config) { + this(config.discriminator(), fs4Config.numlistenerthreads()); } - public FS4ResourcePool(String serverId) { + public FS4ResourcePool(String serverId, int listenerThreads) { this.serverId = serverId; instanceId = instanceCounter.getAndIncrement(); String name = "FS4-" + instanceId; + listeners = new ListenerPool(name, listenerThreads); executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory(name)); scheduledExecutor = Executors.newScheduledThreadPool(1, ThreadFactoryFactory.getDaemonThreadFactory(name + ".scheduled")); } @@ -45,10 +57,28 @@ public class FS4ResourcePool extends AbstractComponent { public ExecutorService getExecutor() { return executor; } public ScheduledExecutorService getScheduledExecutor() { return scheduledExecutor; } + public Backend getBackend(String host, int port) { + String key = host + ":" + port; + synchronized (connectionPoolMap) { + Backend pool = connectionPoolMap.get(key); + if (pool == null) { + pool = new Backend(host, port, serverId, listeners, new ConnectionPool(timer)); + connectionPoolMap.put(key, pool); + } + return pool; + } + } + @Override public void deconstruct() { logger.log(Level.INFO, "Deconstructing FS4ResourcePool with id '" + instanceId + "'."); super.deconstruct(); + listeners.close(); + timer.cancel(); + for (Backend backend : connectionPoolMap.values()) { + backend.shutdown(); + backend.close(); + } executor.shutdown(); scheduledExecutor.shutdown(); try { diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java new file mode 100644 index 00000000000..f3867288b29 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -0,0 +1,220 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch; + +import com.yahoo.data.access.simple.Value; +import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.DocumentInfo; +import com.yahoo.fs4.FS4Properties; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.fs4.QueryPacketData; +import com.yahoo.fs4.QueryResultPacket; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.ConfigurationException; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.InvokerResult; +import com.yahoo.search.dispatch.LeanHit; +import com.yahoo.search.dispatch.ResponseMonitor; +import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.query.Sorting; +import com.yahoo.search.result.Coverage; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Relevance; +import com.yahoo.search.searchchain.Execution; +import com.yahoo.searchlib.aggregation.Grouping; +import com.yahoo.slime.BinaryFormat; +import com.yahoo.vespa.objects.BufferSerializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.logging.Logger; + +/** + * {@link SearchInvoker} implementation for FS4 nodes and fdispatch + * + * @author ollivir + */ +public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<FS4Channel> { + static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit"); + static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype"); + + private static final Logger log = Logger.getLogger(FS4SearchInvoker.class.getName()); + + private final VespaBackEndSearcher searcher; + private FS4Channel channel; + + private ErrorMessage pendingSearchError = null; + private Query query = null; + private QueryPacket queryPacket = null; + + public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, Optional<Node> node) { + super(node); + this.searcher = searcher; + this.channel = channel; + + channel.setQuery(query); + channel.setResponseMonitor(this); + } + + @Override + protected void sendSearchRequest(Query query) throws IOException { + log.finest("sending query packet"); + + this.query = query; + createQueryPacket(searcher.getServerId(), query); + + try { + boolean couldSend = channel.sendPacket(queryPacket); + if (!couldSend) { + setPendingError("Could not reach '" + getName() + "'"); + } + } catch (InvalidChannelException e) { + setPendingError("Invalid channel " + getName()); + } catch (IllegalStateException e) { + setPendingError("Illegal state in FS4: " + e.getMessage()); + } + } + + private void setPendingError(String message) { + pendingSearchError = ErrorMessage.createBackendCommunicationError(message); + responseAvailable(); + } + + @Override + protected InvokerResult getSearchResult(Execution execution) throws IOException { + if (pendingSearchError != null) { + return errorResult(query, pendingSearchError); + } + BasicPacket[] basicPackets; + + try { + basicPackets = channel.receivePackets(query.getTimeLeft(), 1); + } catch (ChannelTimeoutException e) { + return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); + } catch (InvalidChannelException e) { + return errorResult(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())); + } + + if (basicPackets.length == 0) { + return errorResult(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")); + } + + log.finest(() -> "got packets " + basicPackets.length + " packets"); + + basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName()); + QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; + + log.finest(() -> "got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); + + if (query.getPresentation().getSummary() == null) + query.getPresentation().setSummary(searcher.getDefaultDocsumClass()); + + InvokerResult result = new InvokerResult(query, resultPacket.getDocumentCount()); + + addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result.getResult()); + addUnfilledHits(result.getLeanHits(), resultPacket.getDocuments(), queryPacket.getQueryPacketData()); + + return result; + } + + private QueryPacket createQueryPacket(String serverId, Query query) { + this.queryPacket = QueryPacket.create(serverId, query); + int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0); + queryPacket.setCompressionLimit(compressionLimit); + if (compressionLimit != 0) { + queryPacket.setCompressionType(query.properties().getString(PACKET_COMPRESSION_TYPE, "lz4")); + } + log.fine(() -> "made QueryPacket: " + queryPacket); + + return queryPacket; + } + + private void addMetaInfo(Query query, QueryPacketData queryPacketData, QueryResultPacket resultPacket, Result result) { + result.setTotalHitCount(resultPacket.getTotalDocumentCount()); + + addBackendTrace(query, resultPacket); + + // Grouping + if (resultPacket.getGroupData() != null) { + byte[] data = resultPacket.getGroupData(); + ArrayList<Grouping> list = new ArrayList<>(); + BufferSerializer buf = new BufferSerializer(new GrowableByteBuffer(ByteBuffer.wrap(data))); + int cnt = buf.getInt(null); + for (int i = 0; i < cnt; i++) { + Grouping g = new Grouping(); + g.deserialize(buf); + list.add(g); + } + GroupingListHit hit = new GroupingListHit(list, searcher.getDocsumDefinitionSet(query)); + hit.setQuery(result.getQuery()); + hit.setSource(getName()); + hit.setQueryPacketData(queryPacketData); + result.hits().add(hit); + } + + if (resultPacket.getCoverageFeature()) { + result.setCoverage(new Coverage(resultPacket.getCoverageDocs(), resultPacket.getActiveDocs(), resultPacket.getNodesReplied()) + .setSoonActive(resultPacket.getSoonActiveDocs()) + .setDegradedReason(resultPacket.getDegradedReason()) + .setNodesTried(resultPacket.getNodesQueried())); + } + } + + private void addBackendTrace(Query query, QueryResultPacket resultPacket) { + if (resultPacket.propsArray == null) return; + Value.ArrayValue traces = new Value.ArrayValue(); + for (FS4Properties properties : resultPacket.propsArray) { + if ( ! properties.getName().equals("trace")) continue; + for (FS4Properties.Entry entry : properties.getEntries()) { + traces.add(new SlimeAdapter(BinaryFormat.decode(entry.getValue()).get())); + } + } + query.trace(traces, query.getTraceLevel()); + } + + /** + * Creates unfilled hits from a List of DocumentInfo instances. + */ + private void addUnfilledHits(List<LeanHit> result, List<DocumentInfo> documents, QueryPacketData queryPacketData) { + Optional<Integer> channelDistributionKey = distributionKey(); + + for (DocumentInfo document : documents) { + byte [] sortData = document.getSortData(); + LeanHit hit = (sortData == null) + ? new LeanHit(document.getRawGlobalId(), document.getPartId(), channelDistributionKey.orElse(document.getDistributionKey()), document.getMetric()) + : new LeanHit(document.getRawGlobalId(), document.getPartId(), channelDistributionKey.orElse(document.getDistributionKey()), document.getSortData()); + if (queryPacketData != null) { + hit.setQueryPacketData(queryPacketData); + } + result.add(hit); + } + } + + @Override + public void release() { + if (channel != null) { + channel.close(); + channel = null; + } + } + + private String getName() { + return searcher.getName(); + } + + @Override + public void responseAvailable(FS4Channel from) { + responseAvailable(); + } + +} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java index 244fad4efde..f7f2d08d713 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastHit.java @@ -3,6 +3,7 @@ package com.yahoo.prelude.fastsearch; import com.yahoo.data.access.ObjectTraverser; import com.yahoo.document.GlobalId; +import com.yahoo.fs4.QueryPacketData; import com.yahoo.net.URI; import com.yahoo.search.query.Sorting; import com.yahoo.search.result.Hit; @@ -39,6 +40,9 @@ public class FastHit extends Hit { /** The global id of this document in the backend node which produced it */ private byte [] globalId; + //TODO Remove with fs4 + private transient QueryPacketData queryPacketData = null; + private transient byte[] sortData = null; // TODO I supect this one can be dropped. private transient Sorting sortDataSorting = null; @@ -143,6 +147,27 @@ public class FastHit extends Hit { /** Sets the index of the node this hit originated at */ public void setDistributionKey(int distributionKey) { this.distributionKey = distributionKey; } + /** + * Add the binary data common for the query packet to a Vespa backend and a + * summary fetch packet to a Vespa backend. This method can only be called + * once for a single hit. + * + * @param queryPacketData binary data from a query packet resulting in this hit + * @throws IllegalStateException if the method is called more than once + * @throws NullPointerException if trying to set query packet data to null + */ + public void setQueryPacketData(QueryPacketData queryPacketData) { + if (this.queryPacketData != null) + throw new IllegalStateException("Query packet data already set to " + + this.queryPacketData + ", tried to set it to " + queryPacketData); + if (queryPacketData == null) + throw new NullPointerException("Query packet data reference can not be set to null."); + this.queryPacketData = queryPacketData; + } + + /** Returns a serial encoding of the query which produced this hit, ot null if not available. */ + public QueryPacketData getQueryPacketData() { return queryPacketData; } + public void setSortData(byte[] data, Sorting sorting) { this.sortData = data; this.sortDataSorting = sorting; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index b0b3a7800e9..6b0041a9e86 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -1,14 +1,23 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.PingPacket; +import com.yahoo.fs4.PongPacket; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; import com.yahoo.prelude.Ping; import com.yahoo.prelude.Pong; import com.yahoo.prelude.querytransform.QueryRewrite; +import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.GroupingOperation; import com.yahoo.search.query.Ranking; @@ -37,13 +46,21 @@ import static com.yahoo.container.util.Util.quote; // catch and unwrap into a results with an error in high level methods. -Jon public class FastSearcher extends VespaBackEndSearcher { + /** If this is turned on this will make search queries directly to the local search node when possible */ + private final static CompoundName dispatchDirect = new CompoundName("dispatch.direct"); + /** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */ private final Dispatcher dispatcher; + private final Backend dispatchBackend; + private final FS4ResourcePool fs4ResourcePool; + /** * Creates a Fastsearcher. * - * @param serverId the resource pool used to create direct connections to the local search nodes when + * @param dispatchBackend The backend object containing the connection to the dispatch node this should talk to + * over the fs4 protocol + * @param fs4ResourcePool the resource pool used to create direct connections to the local search nodes when * bypassing the dispatch node * @param dispatcher the dispatcher used (when enabled) to send summary requests over the rpc protocol. * Eventually we will move everything to this protocol and never use dispatch nodes. @@ -53,11 +70,13 @@ public class FastSearcher extends VespaBackEndSearcher { * @param clusterParams the cluster number, and other cluster backend parameters * @param documentdbInfoConfig document database parameters */ - public FastSearcher(String serverId, Dispatcher dispatcher, + public FastSearcher(Backend dispatchBackend, FS4ResourcePool fs4ResourcePool, Dispatcher dispatcher, SummaryParameters docSumParams, ClusterParams clusterParams, DocumentdbInfoConfig documentdbInfoConfig) { - init(serverId, docSumParams, clusterParams, documentdbInfoConfig); + init(fs4ResourcePool.getServerId(), docSumParams, clusterParams, documentdbInfoConfig); + this.dispatchBackend = dispatchBackend; this.dispatcher = dispatcher; + this.fs4ResourcePool = fs4ResourcePool; } /** @@ -65,7 +84,58 @@ public class FastSearcher extends VespaBackEndSearcher { */ @Override public Pong ping(Ping ping, Execution execution) { - throw new IllegalStateException("This ping should not have been called."); + return ping(ping, dispatchBackend, getName()); + } + + public static Pong ping(Ping ping, Backend backend, String name) { + FS4Channel channel = backend.openPingChannel(); + + // If you want to change this code, you need to understand + // com.yahoo.prelude.cluster.ClusterSearcher.ping(Searcher) and + // com.yahoo.prelude.cluster.TrafficNodeMonitor.failed(ErrorMessage) + try { + PingPacket pingPacket = new PingPacket(); + try { + boolean couldSend = channel.sendPacket(pingPacket); + if ( ! couldSend) { + return new Pong(ErrorMessage.createBackendCommunicationError("Could not ping " + name)); + } + } catch (InvalidChannelException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel " + name)); + } catch (IllegalStateException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage())); + } catch (IOException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("IO error while sending ping: " + e.getMessage())); + } + + // We should only get a single packet + BasicPacket[] packets; + + try { + packets = channel.receivePackets(ping.getTimeout(), 1); + } catch (ChannelTimeoutException e) { + return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + name)); + } catch (InvalidChannelException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel for " + name)); + } + + if (packets.length == 0) { + return new Pong(ErrorMessage.createBackendCommunicationError(name + " got no packets back")); + } + + try { + packets[0].ensureInstanceOf(PongPacket.class, name); + } catch (TimeoutException e) { + return new Pong(ErrorMessage.createTimeout(e.getMessage())); + } catch (IOException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Unexpected packet class returned after ping: " + e.getMessage())); + } + return new Pong((PongPacket)packets[0]); + } finally { + if (channel != null) { + channel.close(); + } + } } @Override @@ -147,7 +217,18 @@ public class FastSearcher extends VespaBackEndSearcher { * on the same host. */ private SearchInvoker getSearchInvoker(Query query) { - return dispatcher.getSearchInvoker(query, this).get(); + Optional<SearchInvoker> invoker = dispatcher.getSearchInvoker(query, this); + if (invoker.isPresent()) { + return invoker.get(); + } + + Optional<Node> direct = getDirectNode(query); + if(direct.isPresent()) { + var node = direct.get(); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + return new FS4SearchInvoker(this, query, backend.openChannel(), direct); + } + return new FS4SearchInvoker(this, query, dispatchBackend.openChannel(), Optional.empty()); } /** @@ -156,17 +237,47 @@ public class FastSearcher extends VespaBackEndSearcher { * content nodes. */ private FillInvoker getFillInvoker(Result result) { - return dispatcher.getFillInvoker(result, this).get(); + Query query = result.getQuery(); + Optional<FillInvoker> invoker = dispatcher.getFillInvoker(result, this); + if (invoker.isPresent()) { + return invoker.get(); + } + + Optional<Node> direct = getDirectNode(query); + if (direct.isPresent()) { + var node = direct.get(); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + return new FS4FillInvoker(this, query, backend); + } + return new FS4FillInvoker(this, query, dispatchBackend); } + /** + * If the query can be directed to a single local content node, returns that node. Otherwise, + * returns an empty value. + */ + private Optional<Node> getDirectNode(Query query) { + if (!query.properties().getBoolean(dispatchDirect, true)) + return Optional.empty(); + if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) + return Optional.empty(); + + Optional<Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); + if (!directDispatchRecipient.isPresent()) + return Optional.empty(); + // Dispatch directly to the single, local search node + Node local = directDispatchRecipient.get(); + query.trace(false, 2, "Dispatching directly to ", local); + return Optional.of(local); + } private static Optional<String> quotedSummaryClass(String summaryClass) { return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass)); } public String toString() { - return "fast searcher (" + getName() + ") "; + return "fast searcher (" + getName() + ") " + dispatchBackend; } protected boolean isLoggingFine() { diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java index 740b9592efc..e0d569c6ae1 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java @@ -3,6 +3,7 @@ package com.yahoo.prelude.fastsearch; import java.util.List; +import com.yahoo.fs4.QueryPacketData; import com.yahoo.search.result.Hit; import com.yahoo.searchlib.aggregation.Grouping; @@ -26,4 +27,15 @@ public class GroupingListHit extends Hit { private final List<Grouping> groupingList; private final DocsumDefinitionSet defs; + private QueryPacketData queryPacketData; + + public void setQueryPacketData(QueryPacketData queryPacketData) { + this.queryPacketData = queryPacketData; + } + + /** Returns encoded query data from the query used to create this, or null if none present */ + public QueryPacketData getQueryPacketData() { + return queryPacketData; + } + } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java index 396a84a28bd..f690d9d4da4 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/Int64Field.java @@ -5,6 +5,9 @@ */ package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java index bec39393359..a02d9813793 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/IntegerField.java @@ -5,6 +5,9 @@ */ package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java index 388c96b453d..bf77c517d50 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/LongdataField.java @@ -5,6 +5,9 @@ */ package com.yahoo.prelude.fastsearch; +import java.nio.ByteBuffer; + +import com.yahoo.io.SlowInflate; import com.yahoo.prelude.hitfield.RawData; import com.yahoo.data.access.simple.Value; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java index b94c902693a..5e3d0babe98 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/ShortField.java @@ -5,6 +5,9 @@ */ package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + import com.yahoo.search.result.NanNumber; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java index 4df12bd82bd..408cbbbb62d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/StringField.java @@ -5,6 +5,10 @@ */ package com.yahoo.prelude.fastsearch; + +import java.nio.ByteBuffer; + +import com.yahoo.text.Utf8; import com.yahoo.data.access.Inspector; diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index c98cf23ec61..430ad015493 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -3,6 +3,7 @@ package com.yahoo.prelude.fastsearch; import com.yahoo.collections.TinyIdentitySet; import com.yahoo.fs4.DocsumPacket; +import com.yahoo.fs4.Packet; import com.yahoo.prelude.query.Item; import com.yahoo.prelude.query.NullItem; import com.yahoo.prelude.query.textualrepresentation.TextualQueryRepresentation; @@ -19,6 +20,7 @@ import com.yahoo.search.result.Hit; import com.yahoo.search.searchchain.Execution; import com.yahoo.searchlib.aggregation.Grouping; +import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Iterator; @@ -49,7 +51,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { private String defaultDocsumClass = null; /** Returns an iterator which returns all hits below this result **/ - private static Iterator<Hit> hitIterator(Result result) { + static Iterator<Hit> hitIterator(Result result) { return result.hits().unorderedDeepIterator(); } @@ -228,7 +230,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if ((query.getTraceLevel()<level) || query.properties().getBoolean(TRACE_DISABLE)) return; StringBuilder s = new StringBuilder(); - s.append(sourceName).append(" ").append(type).append(" to dispatch: ") + s.append(sourceName).append(" " + type + " to dispatch: ") .append("query=[") .append(query.getModel().getQueryTree().getRoot().toString()) .append("]"); @@ -318,7 +320,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { } } - private FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) { + FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) { if (packet != null) { byte[] docsumdata = packet.getData(); if (docsumdata.length > 0) { @@ -342,7 +344,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { * @return the number of hits that we did not return data for, and an optional error message. * when things are working normally we return 0. */ - protected FillHitsResult fillHits(Result result, DocsumPacket[] packets, String summaryClass) { + public FillHitsResult fillHits(Result result, Packet[] packets, String summaryClass) throws IOException { int skippedHits = 0; String lastError = null; int packetIndex = 0; @@ -352,7 +354,8 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if (hit instanceof FastHit && ! hit.isFilled(summaryClass)) { FastHit fastHit = (FastHit) hit; - DocsumPacket docsum = packets[packetIndex]; + packets[packetIndex].ensureInstanceOf(DocsumPacket.class, getName()); + DocsumPacket docsum = (DocsumPacket) packets[packetIndex]; packetIndex++; FillHitResult fr = fillHit(fastHit, docsum, summaryClass); @@ -381,7 +384,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { return decodeSummary(summaryClass, hit, docsumdata, db.getDocsumDefinitionSet()); } - private static String decodeSummary(String summaryClass, FastHit hit, byte[] docsumdata, DocsumDefinitionSet docsumSet) { + private String decodeSummary(String summaryClass, FastHit hit, byte[] docsumdata, DocsumDefinitionSet docsumSet) { String error = docsumSet.lazyDecode(summaryClass, docsumdata, hit); if (error == null) { hit.setFilled(summaryClass); diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java index 00bdc474119..d768dda2657 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/XMLField.java @@ -1,5 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - +/** + * Class converting data (historically XML-encoded) from a document summary field. + * This has only been used to represent geographical positions. + */ package com.yahoo.prelude.fastsearch; import com.yahoo.data.access.Inspector; @@ -8,8 +11,6 @@ import com.yahoo.prelude.hitfield.XMLString; import com.yahoo.search.result.PositionsData; /** - * Class converting data (historically XML-encoded) from a document summary field. - * This has only been used to represent geographical positions. * @author Steinar Knutsen */ public class XMLField extends DocsumField { |