diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-09-17 22:39:26 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-17 22:39:26 +0200 |
commit | a573985d1127835f0ecb5047694ffe23e8baefe7 (patch) | |
tree | 463ab5145edf4a26243cc4a92d04bfb82a3c1580 /container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java | |
parent | ddb9cd0a539b57c41587ccdec1040b48169d3cec (diff) |
Revert "Balder/no more fs4 dispatching from fastsearcher"
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java | 125 |
1 files changed, 118 insertions, 7 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index b0b3a7800e9..6b0041a9e86 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -1,14 +1,23 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.PingPacket; +import com.yahoo.fs4.PongPacket; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; import com.yahoo.prelude.Ping; import com.yahoo.prelude.Pong; import com.yahoo.prelude.querytransform.QueryRewrite; +import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.GroupingOperation; import com.yahoo.search.query.Ranking; @@ -37,13 +46,21 @@ import static com.yahoo.container.util.Util.quote; // catch and unwrap into a results with an error in high level methods. -Jon public class FastSearcher extends VespaBackEndSearcher { + /** If this is turned on this will make search queries directly to the local search node when possible */ + private final static CompoundName dispatchDirect = new CompoundName("dispatch.direct"); + /** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */ private final Dispatcher dispatcher; + private final Backend dispatchBackend; + private final FS4ResourcePool fs4ResourcePool; + /** * Creates a Fastsearcher. * - * @param serverId the resource pool used to create direct connections to the local search nodes when + * @param dispatchBackend The backend object containing the connection to the dispatch node this should talk to + * over the fs4 protocol + * @param fs4ResourcePool the resource pool used to create direct connections to the local search nodes when * bypassing the dispatch node * @param dispatcher the dispatcher used (when enabled) to send summary requests over the rpc protocol. * Eventually we will move everything to this protocol and never use dispatch nodes. @@ -53,11 +70,13 @@ public class FastSearcher extends VespaBackEndSearcher { * @param clusterParams the cluster number, and other cluster backend parameters * @param documentdbInfoConfig document database parameters */ - public FastSearcher(String serverId, Dispatcher dispatcher, + public FastSearcher(Backend dispatchBackend, FS4ResourcePool fs4ResourcePool, Dispatcher dispatcher, SummaryParameters docSumParams, ClusterParams clusterParams, DocumentdbInfoConfig documentdbInfoConfig) { - init(serverId, docSumParams, clusterParams, documentdbInfoConfig); + init(fs4ResourcePool.getServerId(), docSumParams, clusterParams, documentdbInfoConfig); + this.dispatchBackend = dispatchBackend; this.dispatcher = dispatcher; + this.fs4ResourcePool = fs4ResourcePool; } /** @@ -65,7 +84,58 @@ public class FastSearcher extends VespaBackEndSearcher { */ @Override public Pong ping(Ping ping, Execution execution) { - throw new IllegalStateException("This ping should not have been called."); + return ping(ping, dispatchBackend, getName()); + } + + public static Pong ping(Ping ping, Backend backend, String name) { + FS4Channel channel = backend.openPingChannel(); + + // If you want to change this code, you need to understand + // com.yahoo.prelude.cluster.ClusterSearcher.ping(Searcher) and + // com.yahoo.prelude.cluster.TrafficNodeMonitor.failed(ErrorMessage) + try { + PingPacket pingPacket = new PingPacket(); + try { + boolean couldSend = channel.sendPacket(pingPacket); + if ( ! couldSend) { + return new Pong(ErrorMessage.createBackendCommunicationError("Could not ping " + name)); + } + } catch (InvalidChannelException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel " + name)); + } catch (IllegalStateException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage())); + } catch (IOException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("IO error while sending ping: " + e.getMessage())); + } + + // We should only get a single packet + BasicPacket[] packets; + + try { + packets = channel.receivePackets(ping.getTimeout(), 1); + } catch (ChannelTimeoutException e) { + return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("timeout while waiting for fdispatch for " + name)); + } catch (InvalidChannelException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Invalid channel for " + name)); + } + + if (packets.length == 0) { + return new Pong(ErrorMessage.createBackendCommunicationError(name + " got no packets back")); + } + + try { + packets[0].ensureInstanceOf(PongPacket.class, name); + } catch (TimeoutException e) { + return new Pong(ErrorMessage.createTimeout(e.getMessage())); + } catch (IOException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Unexpected packet class returned after ping: " + e.getMessage())); + } + return new Pong((PongPacket)packets[0]); + } finally { + if (channel != null) { + channel.close(); + } + } } @Override @@ -147,7 +217,18 @@ public class FastSearcher extends VespaBackEndSearcher { * on the same host. */ private SearchInvoker getSearchInvoker(Query query) { - return dispatcher.getSearchInvoker(query, this).get(); + Optional<SearchInvoker> invoker = dispatcher.getSearchInvoker(query, this); + if (invoker.isPresent()) { + return invoker.get(); + } + + Optional<Node> direct = getDirectNode(query); + if(direct.isPresent()) { + var node = direct.get(); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + return new FS4SearchInvoker(this, query, backend.openChannel(), direct); + } + return new FS4SearchInvoker(this, query, dispatchBackend.openChannel(), Optional.empty()); } /** @@ -156,17 +237,47 @@ public class FastSearcher extends VespaBackEndSearcher { * content nodes. */ private FillInvoker getFillInvoker(Result result) { - return dispatcher.getFillInvoker(result, this).get(); + Query query = result.getQuery(); + Optional<FillInvoker> invoker = dispatcher.getFillInvoker(result, this); + if (invoker.isPresent()) { + return invoker.get(); + } + + Optional<Node> direct = getDirectNode(query); + if (direct.isPresent()) { + var node = direct.get(); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + return new FS4FillInvoker(this, query, backend); + } + return new FS4FillInvoker(this, query, dispatchBackend); } + /** + * If the query can be directed to a single local content node, returns that node. Otherwise, + * returns an empty value. + */ + private Optional<Node> getDirectNode(Query query) { + if (!query.properties().getBoolean(dispatchDirect, true)) + return Optional.empty(); + if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) + return Optional.empty(); + + Optional<Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); + if (!directDispatchRecipient.isPresent()) + return Optional.empty(); + // Dispatch directly to the single, local search node + Node local = directDispatchRecipient.get(); + query.trace(false, 2, "Dispatching directly to ", local); + return Optional.of(local); + } private static Optional<String> quotedSummaryClass(String summaryClass) { return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass)); } public String toString() { - return "fast searcher (" + getName() + ") "; + return "fast searcher (" + getName() + ") " + dispatchBackend; } protected boolean isLoggingFine() { |