From 9593b7025cfc053d2a57e584cfb639b579c99ceb Mon Sep 17 00:00:00 2001 From: Olli Virtanen Date: Fri, 31 Aug 2018 10:54:13 +0200 Subject: Replace chooseBackend(..) with getChannel(..) to abstract the protocol layer --- .../com/yahoo/prelude/fastsearch/FastSearcher.java | 54 ++++++++++------------ .../yahoo/search/dispatch/CloseableChannel.java | 44 ++++++++++++++++++ .../java/com/yahoo/search/dispatch/Dispatcher.java | 5 +- 3 files changed, 72 insertions(+), 31 deletions(-) create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java (limited to 'container-search') diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index cc5c007522c..0f067f33b79 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -22,6 +22,7 @@ import com.yahoo.prelude.querytransform.QueryRewrite; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.dispatch.CloseableChannel; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.SearchCluster; import com.yahoo.search.grouping.GroupingRequest; @@ -172,11 +173,9 @@ public class FastSearcher extends VespaBackEndSearcher { @Override public Result doSearch2(Query query, QueryPacket queryPacket, CacheKey cacheKey, Execution execution) { - FS4Channel channel = null; - try { - if (dispatcher.searchCluster().groupSize() == 1) - forceSinglePassGrouping(query); - channel = chooseBackend(query).openChannel(); + if (dispatcher.searchCluster().groupSize() == 1) + forceSinglePassGrouping(query); + try(CloseableChannel channel = getChannel(query)) { channel.setQuery(query); Result result = searchTwoPhase(channel, query, queryPacket, cacheKey); @@ -199,9 +198,6 @@ public class FastSearcher extends VespaBackEndSearcher { query.trace(getName() + " error response: " + result, false, 1); result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " failed: "+ e.getMessage())); return result; - } finally { - if (channel != null) - channel.close(); } } @@ -218,29 +214,32 @@ public class FastSearcher extends VespaBackEndSearcher { } /** - * Returns the backend object to issue a search request over. - * Normally this is the backend field of this instance, which connects to the dispatch node this talks to - * (which is why this instance was chosen by the cluster controller). However, under certain conditions - * we will instead return a backend instance which connects directly to the relevant search nodes. + * Returns an interface object to issue a search request over. + * Normally this is built from the backend field of this instance, which connects to the dispatch node + * this component talks to (which is why this instance was chosen by the cluster controller). However, + * under certain conditions we will instead return an interface which connects directly to the relevant + * search nodes. */ - private Backend chooseBackend(Query query) { + private CloseableChannel getChannel(Query query) { if (query.properties().getBoolean(dispatchInternal, false)) { - Optional directDispatchBackend = dispatcher.getDispatchBackend(query); - if(directDispatchBackend.isPresent()) { - return directDispatchBackend.get(); + Optional directDispatchChannel = dispatcher.getDispatchBackend(query); + if(directDispatchChannel.isPresent()) { + return directDispatchChannel.get(); } } - if ( ! query.properties().getBoolean(dispatchDirect, true)) return dispatchBackend; - if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) return dispatchBackend; + if (!query.properties().getBoolean(dispatchDirect, true)) + return new CloseableChannel(dispatchBackend); + if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) + return new CloseableChannel(dispatchBackend); Optional directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); - if ( ! directDispatchRecipient.isPresent()) return dispatchBackend; + if (!directDispatchRecipient.isPresent()) + return new CloseableChannel(dispatchBackend); // Dispatch directly to the single, local search node query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get()); - return fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(), - directDispatchRecipient.get().fs4port(), - Optional.of(directDispatchRecipient.get().key())); + return new CloseableChannel(fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(), + directDispatchRecipient.get().fs4port(), Optional.of(directDispatchRecipient.get().key()))); } /** @@ -279,10 +278,9 @@ public class FastSearcher extends VespaBackEndSearcher { packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass); } - FS4Channel channel = chooseBackend(query).openChannel(); - channel.setQuery(query); Packet[] receivedPackets; - try { + try(CloseableChannel channel = getChannel(query)) { + channel.setQuery(query); DocsumPacketKey[] packetKeys; if (countFastHits(result) > 0) { @@ -349,8 +347,6 @@ public class FastSearcher extends VespaBackEndSearcher { query.trace(traceMsg, false, 3); } } - } finally { - channel.close(); } } @@ -382,7 +378,7 @@ public class FastSearcher extends VespaBackEndSearcher { return null; } - private Result searchTwoPhase(FS4Channel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { + private Result searchTwoPhase(CloseableChannel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { if (isLoggingFine()) getLogger().finest("sending query packet"); @@ -462,7 +458,7 @@ public class FastSearcher extends VespaBackEndSearcher { return packets; } - private Packet[] fetchSummaries(FS4Channel channel, Result result, String summaryClass) + private Packet[] fetchSummaries(CloseableChannel channel, Result result, String summaryClass) throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException { BasicPacket[] receivedPackets; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java new file mode 100644 index 00000000000..9a3e7e71031 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java @@ -0,0 +1,44 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.search.Query; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +public class CloseableChannel implements Closeable { + private FS4Channel channel; + + public CloseableChannel(Backend backend) { + this.channel = backend.openChannel(); + } + + public void setQuery(Query query) { + channel.setQuery(query); + } + + public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { + return channel.sendPacket(packet); + } + + public BasicPacket[] receivePackets(long timeout, int packetCount) throws InvalidChannelException, ChannelTimeoutException { + return channel.receivePackets(timeout, packetCount); + } + + public Optional distributionKey() { + return channel.distributionKey(); + } + + @Override + public void close() { + if (channel != null) { + channel.close(); + channel = null; + } + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 473f3932680..d0f03dde3dd 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -284,7 +284,7 @@ public class Dispatcher extends AbstractComponent { } - public Optional getDispatchBackend(Query query) { + public Optional getDispatchBackend(Query query) { Optional groupInCluster = loadBalancer.getGroupForQuery(query); return groupInCluster.flatMap(group -> { @@ -295,7 +295,8 @@ public class Dispatcher extends AbstractComponent { } }).map(node -> { query.trace(false, 2, "Dispatching directly (anywhere) to ", node); - return fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key())); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key())); + return new CloseableChannel(backend); }); } } -- cgit v1.2.3