From 1f554f8bb745cb3b5ea60e9dd2aafd10ceb1f22e Mon Sep 17 00:00:00 2001 From: Olli Virtanen Date: Thu, 30 Aug 2018 10:08:02 +0200 Subject: Clean issues/warnings reported in Eclipse IDE --- .../src/main/java/com/yahoo/search/dispatch/RpcClient.java | 1 - .../src/main/java/com/yahoo/search/dispatch/SearchCluster.java | 1 - container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java | 3 ++- .../test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java | 4 ++-- .../src/test/java/com/yahoo/search/dispatch/FillTestCase.java | 1 - 5 files changed, 4 insertions(+), 6 deletions(-) (limited to 'container-search') diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java index 8107a50e8c0..67e032eca37 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java @@ -15,7 +15,6 @@ import com.yahoo.jrt.Values; import com.yahoo.prelude.fastsearch.FastHit; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; /** * A client which uses rpc request to search nodes to implement the Client API. diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java index efce2fdac9c..48ddba6c301 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.google.common.annotations.Beta; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; diff --git a/container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java b/container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java index cb693d6801d..af54d28c2ac 100644 --- a/container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java +++ b/container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java @@ -64,7 +64,8 @@ final class TypeCheckers { TypeLiteral arg = TypeLiteral.get(type.getActualTypeArguments()[0]); if (OperatorNode.class.isAssignableFrom(arg.getRawType())) { Preconditions.checkArgument(arg.getType() instanceof ParameterizedType, "Type spec must be List>"); - Class optype = (Class) TypeLiteral.get(((ParameterizedType) arg.getType()).getActualTypeArguments()[0]).getRawType(); + Class rawType = (Class) TypeLiteral.get(((ParameterizedType) arg.getType()).getActualTypeArguments()[0]).getRawType(); + Class optype = (Class) rawType; return new OperatorNodeListTypeChecker(parent, idx, optype, ImmutableSet.of()); } else { return new JavaListTypeChecker(parent, idx, arg.getRawType()); diff --git a/container-search/src/test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java b/container-search/src/test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java index 09a439c7bc9..18231785a26 100644 --- a/container-search/src/test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java @@ -329,8 +329,8 @@ public class JSONStringTestCase { String rendered = js.renderFromInspector(); assertTrue(-1 < rendered.indexOf(f1)); - int offsetF2; - assertTrue(-1 < (offsetF2 = rendered.indexOf(f2))); + int offsetF2 = rendered.indexOf(f2); + assertTrue(-1 < offsetF2); offsetF2 += f2.length(); assertTrue(-1 < rendered.indexOf(f2_1, offsetF2)); assertTrue(-1 < rendered.indexOf(f2_2, offsetF2)); diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java index 0191b1a799b..5e3e0dc301e 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java @@ -2,7 +2,6 @@ package com.yahoo.search.dispatch; import com.yahoo.compress.CompressionType; -import com.yahoo.log.event.Collection; import com.yahoo.prelude.fastsearch.DocsumDefinition; import com.yahoo.prelude.fastsearch.DocsumDefinitionSet; import com.yahoo.prelude.fastsearch.DocsumField; -- cgit v1.2.3 From 6e56ca5b6f2064094933859f530df849f2e28716 Mon Sep 17 00:00:00 2001 From: Olli Virtanen Date: Thu, 30 Aug 2018 10:09:42 +0200 Subject: Create code path for extending java side dispatcher --- .../com/yahoo/prelude/fastsearch/FastSearcher.java | 17 ++++-- .../java/com/yahoo/search/dispatch/Dispatcher.java | 23 ++++++++ .../com/yahoo/search/dispatch/LoadBalancer.java | 27 +++++++++ .../yahoo/search/dispatch/LoadBalancerTest.java | 64 ++++++++++++++++++++++ 4 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java create mode 100644 container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java (limited to 'container-search') diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index c93220f0a85..cc5c007522c 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -62,6 +62,10 @@ public class FastSearcher extends VespaBackEndSearcher { /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ private final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); + /** If enabled, the dispatcher internal to the search container will be preferred over fdispatch + * whenever possible */ + private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal"); + /** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */ private final Dispatcher dispatcher; @@ -215,12 +219,17 @@ public class FastSearcher extends VespaBackEndSearcher { /** * Returns the backend object to issue a search request over. - * Normally this is the backend field of this instance, which connects to the dispatch node this talk to - * (which is why this instance was chosen by the cluster controller). However, when certain conditions obtain - * (see below), we will instead return a backend instance which connects directly to the local search node - * for efficiency. + * Normally this is the backend field of this instance, which connects to the dispatch node this talks to + * (which is why this instance was chosen by the cluster controller). However, under certain conditions + * we will instead return a backend instance which connects directly to the relevant search nodes. */ private Backend chooseBackend(Query query) { + if (query.properties().getBoolean(dispatchInternal, false)) { + Optional directDispatchBackend = dispatcher.getDispatchBackend(query); + if(directDispatchBackend.isPresent()) { + return directDispatchBackend.get(); + } + } if ( ! query.properties().getBoolean(dispatchDirect, true)) return dispatchBackend; if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) return dispatchBackend; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 5ef81403f26..473f3932680 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -11,6 +11,7 @@ import com.yahoo.container.protect.Error; import com.yahoo.prelude.fastsearch.DocumentDatabase; import com.yahoo.slime.ArrayTraverser; import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.fs4.mplex.Backend; import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.TimeoutException; @@ -28,6 +29,7 @@ import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -57,10 +59,15 @@ public class Dispatcher extends AbstractComponent { private final Compressor compressor = new Compressor(); + private final LoadBalancer loadBalancer; + private final FS4ResourcePool fs4ResourcePool; + public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { this.client = new RpcClient(); this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus); + this.fs4ResourcePool = fs4ResourcePool; + this.loadBalancer = new LoadBalancer(searchCluster); // Create node rpc connections, indexed by the node distribution key ImmutableMap.Builder nodeConnectionsBuilder = new ImmutableMap.Builder<>(); @@ -75,6 +82,8 @@ public class Dispatcher extends AbstractComponent { this.searchCluster = null; this.nodeConnections = ImmutableMap.copyOf(nodeConnections); this.client = client; + this.fs4ResourcePool = null; + this.loadBalancer = new LoadBalancer(searchCluster); } /** Returns the search cluster this dispatches to */ @@ -275,4 +284,18 @@ public class Dispatcher extends AbstractComponent { } + public Optional getDispatchBackend(Query query) { + Optional groupInCluster = loadBalancer.getGroupForQuery(query); + + return groupInCluster.flatMap(group -> { + if(group.nodes().size() == 1) { + return Optional.of(group.nodes().get(0)); + } else { + return Optional.empty(); + } + }).map(node -> { + query.trace(false, 2, "Dispatching directly (anywhere) to ", node); + return fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key())); + }); + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java new file mode 100644 index 00000000000..8e90eae0eb3 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -0,0 +1,27 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.search.Query; +import com.yahoo.search.dispatch.SearchCluster.Group; + +import java.util.Optional; + +public class LoadBalancer { + + private final SearchCluster searchCluster; + + public LoadBalancer(SearchCluster searchCluster) { + this.searchCluster = searchCluster; + } + + public Optional getGroupForQuery(Query query) { + if (searchCluster.groups().size() == 1) { + for(Group group: searchCluster.groups().values()) { + // since the number of groups is 1, this will run only once + if(group.nodes().size() == 1) { + return Optional.of(group); + } + } + } + return Optional.empty(); + } +} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java new file mode 100644 index 00000000000..448a8d0e894 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java @@ -0,0 +1,64 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.search.dispatch.SearchCluster.Group; +import com.yahoo.search.dispatch.SearchCluster.Node; +import junit.framework.AssertionFailedError; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Optional; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +public class LoadBalancerTest { + @Test + public void requreThatLoadBalancerServesSingleNodeSetups() { + Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); + SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1), null, 1, null); + LoadBalancer lb = new LoadBalancer(cluster); + + Optional grp = lb.getGroupForQuery(null); + Group group = grp.orElseGet(() -> { + throw new AssertionFailedError("Expected a SearchCluster.Group"); + }); + assertThat(group.nodes().size(), Matchers.equalTo(1)); + } + + @Test + public void requreThatLoadBalancerIgnoresMultiGroupSetups() { + Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); + Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); + SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null); + LoadBalancer lb = new LoadBalancer(cluster); + + Optional grp = lb.getGroupForQuery(null); + assertThat(grp.isPresent(), is(false)); + } + + @Test + public void requreThatLoadBalancerIgnoresClusteredSingleGroup() { + Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); + Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0); + SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 2, null); + LoadBalancer lb = new LoadBalancer(cluster); + + Optional grp = lb.getGroupForQuery(null); + assertThat(grp.isPresent(), is(false)); + } + + @Test + public void requreThatLoadBalancerIgnoresClusteredGroups() { + Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); + Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0); + Node n3 = new SearchCluster.Node(0, "test-node3", 0, 1); + Node n4 = new SearchCluster.Node(1, "test-node4", 1, 1); + SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2, n3, n4), null, 2, null); + LoadBalancer lb = new LoadBalancer(cluster); + + Optional grp = lb.getGroupForQuery(null); + assertThat(grp.isPresent(), is(false)); + } +} -- cgit v1.2.3 From 9593b7025cfc053d2a57e584cfb639b579c99ceb Mon Sep 17 00:00:00 2001 From: Olli Virtanen Date: Fri, 31 Aug 2018 10:54:13 +0200 Subject: Replace chooseBackend(..) with getChannel(..) to abstract the protocol layer --- .../com/yahoo/prelude/fastsearch/FastSearcher.java | 54 ++++++++++------------ .../yahoo/search/dispatch/CloseableChannel.java | 44 ++++++++++++++++++ .../java/com/yahoo/search/dispatch/Dispatcher.java | 5 +- 3 files changed, 72 insertions(+), 31 deletions(-) create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java (limited to 'container-search') diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index cc5c007522c..0f067f33b79 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -22,6 +22,7 @@ import com.yahoo.prelude.querytransform.QueryRewrite; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.dispatch.CloseableChannel; import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.SearchCluster; import com.yahoo.search.grouping.GroupingRequest; @@ -172,11 +173,9 @@ public class FastSearcher extends VespaBackEndSearcher { @Override public Result doSearch2(Query query, QueryPacket queryPacket, CacheKey cacheKey, Execution execution) { - FS4Channel channel = null; - try { - if (dispatcher.searchCluster().groupSize() == 1) - forceSinglePassGrouping(query); - channel = chooseBackend(query).openChannel(); + if (dispatcher.searchCluster().groupSize() == 1) + forceSinglePassGrouping(query); + try(CloseableChannel channel = getChannel(query)) { channel.setQuery(query); Result result = searchTwoPhase(channel, query, queryPacket, cacheKey); @@ -199,9 +198,6 @@ public class FastSearcher extends VespaBackEndSearcher { query.trace(getName() + " error response: " + result, false, 1); result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " failed: "+ e.getMessage())); return result; - } finally { - if (channel != null) - channel.close(); } } @@ -218,29 +214,32 @@ public class FastSearcher extends VespaBackEndSearcher { } /** - * Returns the backend object to issue a search request over. - * Normally this is the backend field of this instance, which connects to the dispatch node this talks to - * (which is why this instance was chosen by the cluster controller). However, under certain conditions - * we will instead return a backend instance which connects directly to the relevant search nodes. + * Returns an interface object to issue a search request over. + * Normally this is built from the backend field of this instance, which connects to the dispatch node + * this component talks to (which is why this instance was chosen by the cluster controller). However, + * under certain conditions we will instead return an interface which connects directly to the relevant + * search nodes. */ - private Backend chooseBackend(Query query) { + private CloseableChannel getChannel(Query query) { if (query.properties().getBoolean(dispatchInternal, false)) { - Optional directDispatchBackend = dispatcher.getDispatchBackend(query); - if(directDispatchBackend.isPresent()) { - return directDispatchBackend.get(); + Optional directDispatchChannel = dispatcher.getDispatchBackend(query); + if(directDispatchChannel.isPresent()) { + return directDispatchChannel.get(); } } - if ( ! query.properties().getBoolean(dispatchDirect, true)) return dispatchBackend; - if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) return dispatchBackend; + if (!query.properties().getBoolean(dispatchDirect, true)) + return new CloseableChannel(dispatchBackend); + if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) + return new CloseableChannel(dispatchBackend); Optional directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); - if ( ! directDispatchRecipient.isPresent()) return dispatchBackend; + if (!directDispatchRecipient.isPresent()) + return new CloseableChannel(dispatchBackend); // Dispatch directly to the single, local search node query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get()); - return fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(), - directDispatchRecipient.get().fs4port(), - Optional.of(directDispatchRecipient.get().key())); + return new CloseableChannel(fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(), + directDispatchRecipient.get().fs4port(), Optional.of(directDispatchRecipient.get().key()))); } /** @@ -279,10 +278,9 @@ public class FastSearcher extends VespaBackEndSearcher { packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass); } - FS4Channel channel = chooseBackend(query).openChannel(); - channel.setQuery(query); Packet[] receivedPackets; - try { + try(CloseableChannel channel = getChannel(query)) { + channel.setQuery(query); DocsumPacketKey[] packetKeys; if (countFastHits(result) > 0) { @@ -349,8 +347,6 @@ public class FastSearcher extends VespaBackEndSearcher { query.trace(traceMsg, false, 3); } } - } finally { - channel.close(); } } @@ -382,7 +378,7 @@ public class FastSearcher extends VespaBackEndSearcher { return null; } - private Result searchTwoPhase(FS4Channel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { + private Result searchTwoPhase(CloseableChannel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { if (isLoggingFine()) getLogger().finest("sending query packet"); @@ -462,7 +458,7 @@ public class FastSearcher extends VespaBackEndSearcher { return packets; } - private Packet[] fetchSummaries(FS4Channel channel, Result result, String summaryClass) + private Packet[] fetchSummaries(CloseableChannel channel, Result result, String summaryClass) throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException { BasicPacket[] receivedPackets; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java new file mode 100644 index 00000000000..9a3e7e71031 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java @@ -0,0 +1,44 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.search.Query; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +public class CloseableChannel implements Closeable { + private FS4Channel channel; + + public CloseableChannel(Backend backend) { + this.channel = backend.openChannel(); + } + + public void setQuery(Query query) { + channel.setQuery(query); + } + + public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { + return channel.sendPacket(packet); + } + + public BasicPacket[] receivePackets(long timeout, int packetCount) throws InvalidChannelException, ChannelTimeoutException { + return channel.receivePackets(timeout, packetCount); + } + + public Optional distributionKey() { + return channel.distributionKey(); + } + + @Override + public void close() { + if (channel != null) { + channel.close(); + channel = null; + } + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 473f3932680..d0f03dde3dd 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -284,7 +284,7 @@ public class Dispatcher extends AbstractComponent { } - public Optional getDispatchBackend(Query query) { + public Optional getDispatchBackend(Query query) { Optional groupInCluster = loadBalancer.getGroupForQuery(query); return groupInCluster.flatMap(group -> { @@ -295,7 +295,8 @@ public class Dispatcher extends AbstractComponent { } }).map(node -> { query.trace(false, 2, "Dispatching directly (anywhere) to ", node); - return fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key())); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key())); + return new CloseableChannel(backend); }); } } -- cgit v1.2.3