summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2018-08-31 10:54:13 +0200
committerOlli Virtanen <olli.virtanen@oath.com>2018-08-31 10:54:13 +0200
commit9593b7025cfc053d2a57e584cfb639b579c99ceb (patch)
tree2f79ae96eec115fd71db952c1193138e1aec7f01 /container-search
parent6e56ca5b6f2064094933859f530df849f2e28716 (diff)
Replace chooseBackend(..) with getChannel(..) to abstract the protocol layer
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java54
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java44
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java5
3 files changed, 72 insertions, 31 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index cc5c007522c..0f067f33b79 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -22,6 +22,7 @@ import com.yahoo.prelude.querytransform.QueryRewrite;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.CloseableChannel;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.SearchCluster;
import com.yahoo.search.grouping.GroupingRequest;
@@ -172,11 +173,9 @@ public class FastSearcher extends VespaBackEndSearcher {
@Override
public Result doSearch2(Query query, QueryPacket queryPacket, CacheKey cacheKey, Execution execution) {
- FS4Channel channel = null;
- try {
- if (dispatcher.searchCluster().groupSize() == 1)
- forceSinglePassGrouping(query);
- channel = chooseBackend(query).openChannel();
+ if (dispatcher.searchCluster().groupSize() == 1)
+ forceSinglePassGrouping(query);
+ try(CloseableChannel channel = getChannel(query)) {
channel.setQuery(query);
Result result = searchTwoPhase(channel, query, queryPacket, cacheKey);
@@ -199,9 +198,6 @@ public class FastSearcher extends VespaBackEndSearcher {
query.trace(getName() + " error response: " + result, false, 1);
result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " failed: "+ e.getMessage()));
return result;
- } finally {
- if (channel != null)
- channel.close();
}
}
@@ -218,29 +214,32 @@ public class FastSearcher extends VespaBackEndSearcher {
}
/**
- * Returns the backend object to issue a search request over.
- * Normally this is the backend field of this instance, which connects to the dispatch node this talks to
- * (which is why this instance was chosen by the cluster controller). However, under certain conditions
- * we will instead return a backend instance which connects directly to the relevant search nodes.
+ * Returns an interface object to issue a search request over.
+ * Normally this is built from the backend field of this instance, which connects to the dispatch node
+ * this component talks to (which is why this instance was chosen by the cluster controller). However,
+ * under certain conditions we will instead return an interface which connects directly to the relevant
+ * search nodes.
*/
- private Backend chooseBackend(Query query) {
+ private CloseableChannel getChannel(Query query) {
if (query.properties().getBoolean(dispatchInternal, false)) {
- Optional<Backend> directDispatchBackend = dispatcher.getDispatchBackend(query);
- if(directDispatchBackend.isPresent()) {
- return directDispatchBackend.get();
+ Optional<CloseableChannel> directDispatchChannel = dispatcher.getDispatchBackend(query);
+ if(directDispatchChannel.isPresent()) {
+ return directDispatchChannel.get();
}
}
- if ( ! query.properties().getBoolean(dispatchDirect, true)) return dispatchBackend;
- if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) return dispatchBackend;
+ if (!query.properties().getBoolean(dispatchDirect, true))
+ return new CloseableChannel(dispatchBackend);
+ if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE))
+ return new CloseableChannel(dispatchBackend);
Optional<SearchCluster.Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget();
- if ( ! directDispatchRecipient.isPresent()) return dispatchBackend;
+ if (!directDispatchRecipient.isPresent())
+ return new CloseableChannel(dispatchBackend);
// Dispatch directly to the single, local search node
query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get());
- return fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(),
- directDispatchRecipient.get().fs4port(),
- Optional.of(directDispatchRecipient.get().key()));
+ return new CloseableChannel(fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(),
+ directDispatchRecipient.get().fs4port(), Optional.of(directDispatchRecipient.get().key())));
}
/**
@@ -279,10 +278,9 @@ public class FastSearcher extends VespaBackEndSearcher {
packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass);
}
- FS4Channel channel = chooseBackend(query).openChannel();
- channel.setQuery(query);
Packet[] receivedPackets;
- try {
+ try(CloseableChannel channel = getChannel(query)) {
+ channel.setQuery(query);
DocsumPacketKey[] packetKeys;
if (countFastHits(result) > 0) {
@@ -349,8 +347,6 @@ public class FastSearcher extends VespaBackEndSearcher {
query.trace(traceMsg, false, 3);
}
}
- } finally {
- channel.close();
}
}
@@ -382,7 +378,7 @@ public class FastSearcher extends VespaBackEndSearcher {
return null;
}
- private Result searchTwoPhase(FS4Channel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
+ private Result searchTwoPhase(CloseableChannel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
if (isLoggingFine())
getLogger().finest("sending query packet");
@@ -462,7 +458,7 @@ public class FastSearcher extends VespaBackEndSearcher {
return packets;
}
- private Packet[] fetchSummaries(FS4Channel channel, Result result, String summaryClass)
+ private Packet[] fetchSummaries(CloseableChannel channel, Result result, String summaryClass)
throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException {
BasicPacket[] receivedPackets;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
new file mode 100644
index 00000000000..9a3e7e71031
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
@@ -0,0 +1,44 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
+import com.yahoo.search.Query;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+public class CloseableChannel implements Closeable {
+ private FS4Channel channel;
+
+ public CloseableChannel(Backend backend) {
+ this.channel = backend.openChannel();
+ }
+
+ public void setQuery(Query query) {
+ channel.setQuery(query);
+ }
+
+ public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException {
+ return channel.sendPacket(packet);
+ }
+
+ public BasicPacket[] receivePackets(long timeout, int packetCount) throws InvalidChannelException, ChannelTimeoutException {
+ return channel.receivePackets(timeout, packetCount);
+ }
+
+ public Optional<Integer> distributionKey() {
+ return channel.distributionKey();
+ }
+
+ @Override
+ public void close() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 473f3932680..d0f03dde3dd 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -284,7 +284,7 @@ public class Dispatcher extends AbstractComponent {
}
- public Optional<Backend> getDispatchBackend(Query query) {
+ public Optional<CloseableChannel> getDispatchBackend(Query query) {
Optional<SearchCluster.Group> groupInCluster = loadBalancer.getGroupForQuery(query);
return groupInCluster.flatMap(group -> {
@@ -295,7 +295,8 @@ public class Dispatcher extends AbstractComponent {
}
}).map(node -> {
query.trace(false, 2, "Dispatching directly (anywhere) to ", node);
- return fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key()));
+ Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key()));
+ return new CloseableChannel(backend);
});
}
}