diff options
author | Jon Bratseth <bratseth@verizonmedia.com> | 2020-01-08 23:01:46 +0100 |
---|---|---|
committer | Jon Bratseth <bratseth@verizonmedia.com> | 2020-01-08 23:01:46 +0100 |
commit | c49a1256ef4c431f7a59018e4d62d1942fdeaf1e (patch) | |
tree | 883f7411e58ffbbb09238fd95d7bf76e6076bf92 | |
parent | 93057cb2141dee8846c9304eb6a9aae2ba9c4dc0 (diff) |
Support max-hits-per-node in Dispatcher
11 files changed, 75 insertions, 26 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index c04553ae2f5..14604d61c0a 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -85,7 +85,7 @@ public class FastSearcher extends VespaBackEndSearcher { public Result doSearch2(Query query, Execution execution) { if (dispatcher.searchCluster().groupSize() == 1) forceSinglePassGrouping(query); - try(SearchInvoker invoker = getSearchInvoker(query)) { + try (SearchInvoker invoker = getSearchInvoker(query)) { Result result = invoker.search(query, execution); injectSource(result.hits()); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 224facd0c5b..03b51fbaf70 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -66,6 +66,8 @@ public class Dispatcher extends AbstractComponent { private final Metric metric; private final Metric.Context metricContext; + private final int maxHitsPerNode; + private static final QueryProfileType argumentType; static { @@ -120,6 +122,7 @@ public class Dispatcher extends AbstractComponent { this.invokerFactory = invokerFactory; this.metric = metric; this.metricContext = metric.createContext(null); + this.maxHitsPerNode = dispatchConfig.maxHitsPerNode(); searchCluster.startClusterMonitoring(pingFactory); } @@ -161,7 +164,11 @@ public class Dispatcher extends AbstractComponent { if (nodes.isEmpty()) return Optional.empty(); query.trace(false, 2, "Dispatching with search path ", searchPath); - return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), nodes, true); + return invokerFactory.createSearchInvoker(searcher, query, + OptionalInt.empty(), + nodes, + true, + maxHitsPerNode); } catch (InvalidSearchPathException e) { return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage()))); } @@ -172,7 +179,12 @@ public class Dispatcher extends AbstractComponent { if (directNode.isPresent()) { Node node = directNode.get(); query.trace(false, 2, "Dispatching to ", node); - return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), Arrays.asList(node), true) + return invokerFactory.createSearchInvoker(searcher, + query, + OptionalInt.empty(), + Arrays.asList(node), + true, + maxHitsPerNode) .orElseThrow(() -> new IllegalStateException("Could not dispatch directly to " + node)); } @@ -190,7 +202,8 @@ public class Dispatcher extends AbstractComponent { query, OptionalInt.of(group.id()), group.nodes(), - acceptIncompleteCoverage); + acceptIncompleteCoverage, + maxHitsPerNode); if (invoker.isPresent()) { query.trace(false, 2, "Dispatching to group ", group.id()); query.getModel().setSearchPath("/" + group.id()); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java index 84e5e7d747f..cec3e94d551 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -36,6 +36,7 @@ import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT; * @author ollivir */ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseMonitor<SearchInvoker> { + private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName()); private final Set<SearchInvoker> invokers; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index 6030e989595..1c3a90ac6ab 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -27,7 +27,10 @@ public abstract class InvokerFactory { this.searchCluster = searchCluster; } - protected abstract Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, Node node); + protected abstract Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, + Query query, + int maxHits, + Node node); public abstract FillInvoker createFillInvoker(VespaBackEndSearcher searcher, Result result); @@ -47,13 +50,14 @@ public abstract class InvokerFactory { Query query, OptionalInt groupId, List<Node> nodes, - boolean acceptIncompleteCoverage) { + boolean acceptIncompleteCoverage, + int maxHits) { List<SearchInvoker> invokers = new ArrayList<>(nodes.size()); Set<Integer> failed = null; for (Node node : nodes) { boolean nodeAdded = false; if (node.isWorking() != Boolean.FALSE) { - Optional<SearchInvoker> invoker = createNodeSearchInvoker(searcher, query, node); + Optional<SearchInvoker> invoker = createNodeSearchInvoker(searcher, query, maxHits, node); if (invoker.isPresent()) { invokers.add(invoker.get()); nodeAdded = true; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java index 52a45dc421c..a32931c43c8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java @@ -18,6 +18,7 @@ import java.util.Optional; * @author ollivir */ public class SearchErrorInvoker extends SearchInvoker { + private final ErrorMessage message; private Query query; private final Coverage coverage; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java index 77b3df7c83a..45ed1b87746 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java @@ -18,6 +18,7 @@ import java.util.Optional; * @author ollivir */ public abstract class SearchInvoker extends CloseableInvoker { + private final Optional<Node> node; private ResponseMonitor<SearchInvoker> monitor; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java index 58d7035c5e8..ae2258c4546 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java @@ -38,12 +38,12 @@ public class ProtobufSerialization { private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024; - static byte[] serializeSearchRequest(Query query, String serverId) { - return convertFromQuery(query, serverId).toByteArray(); + static byte[] serializeSearchRequest(Query query, int hits, String serverId) { + return convertFromQuery(query, hits, serverId).toByteArray(); } - private static SearchProtocol.SearchRequest convertFromQuery(Query query, String serverId) { - var builder = SearchProtocol.SearchRequest.newBuilder().setHits(query.getHits()).setOffset(query.getOffset()) + private static SearchProtocol.SearchRequest convertFromQuery(Query query, int hits, String serverId) { + var builder = SearchProtocol.SearchRequest.newBuilder().setHits(hits).setOffset(query.getOffset()) .setTimeout((int) query.getTimeLeft()); var documentDb = query.getModel().getDocumentDb(); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index 870f7aef9c5..5c9928de924 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -35,8 +35,11 @@ public class RpcInvokerFactory extends InvokerFactory implements PingFactory { } @Override - protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, Node node) { - return Optional.of(new RpcSearchInvoker(searcher, node, rpcResourcePool)); + protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, + Query query, + int maxHits, + Node node) { + return Optional.of(new RpcSearchInvoker(searcher, node, rpcResourcePool, maxHits)); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java index 59f17501c32..07d8439ff46 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java @@ -25,25 +25,28 @@ import java.util.concurrent.TimeUnit; * @author ollivir */ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseReceiver { + private static final String RPC_METHOD = "vespa.searchprotocol.search"; private final VespaBackEndSearcher searcher; private final Node node; private final RpcResourcePool resourcePool; private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses; + private final int maxHits; private Query query; - RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool) { + RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool, int maxHits) { super(Optional.of(node)); this.searcher = searcher; this.node = node; this.resourcePool = resourcePool; this.responses = new LinkedBlockingQueue<>(1); + this.maxHits = maxHits; } @Override - protected void sendSearchRequest(Query query) throws IOException { + protected void sendSearchRequest(Query query) { this.query = query; Client.NodeConnection nodeConnection = resourcePool.getConnection(node.key()); @@ -54,7 +57,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe } query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key()); - var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId()); + var payload = ProtobufSerialization.serializeSearchRequest(query, Math.min(query.getHits(), maxHits), searcher.getServerId()); double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; Compressor.Compression compressionResult = resourcePool.compress(query, payload); nodeConnection.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), this, timeoutSeconds); diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index 291b0f4890a..de6bafa267a 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -108,7 +108,8 @@ public class DispatcherTest { Query query, OptionalInt groupId, List<Node> nodes, - boolean acceptIncompleteCoverage) { + boolean acceptIncompleteCoverage, + int maxHitsPerNode) { if (step >= events.length) { throw new RuntimeException("Was not expecting more calls to getSearchInvoker"); } @@ -126,7 +127,10 @@ public class DispatcherTest { } @Override - protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, Node node) { + protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, + Query query, + int maxHitsPerNode, + Node node) { fail("Unexpected call to createNodeSearchInvoker"); return null; } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java index c07bf119782..ce19224b35f 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java @@ -19,15 +19,15 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** * @author ollivir */ public class RpcSearchInvokerTest { + @Test public void testProtobufSerialization() throws IOException { var compressionTypeHolder = new AtomicReference<CompressionType>(); @@ -35,8 +35,7 @@ public class RpcSearchInvokerTest { var lengthHolder = new AtomicInteger(); var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder); var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123))); - @SuppressWarnings("resource") - var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool); + var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool, 1000); Query q = new Query("search/?query=test&hits=10&offset=3"); invoker.sendSearchRequest(q); @@ -44,9 +43,28 @@ public class RpcSearchInvokerTest { var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get()); var request = SearchProtocol.SearchRequest.newBuilder().mergeFrom(bytes).build(); - assertThat(request.getHits(), equalTo(10)); - assertThat(request.getOffset(), equalTo(3)); - assertThat(request.getQueryTreeBlob().size(), greaterThan(0)); + assertEquals(10, request.getHits()); + assertEquals(3, request.getOffset()); + assertTrue(request.getQueryTreeBlob().size() > 0); + } + + @Test + public void testProtobufSerializationWithMaxHitsSet() throws IOException { + int maxHits = 5; + var compressionTypeHolder = new AtomicReference<CompressionType>(); + var payloadHolder = new AtomicReference<byte[]>(); + var lengthHolder = new AtomicInteger(); + var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder); + var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123))); + var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool, maxHits); + + Query q = new Query("search/?query=test&hits=10&offset=3"); + invoker.sendSearchRequest(q); + + var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get()); + var request = SearchProtocol.SearchRequest.newBuilder().mergeFrom(bytes).build(); + + assertEquals(maxHits, request.getHits()); } private Client parameterCollectorClient(AtomicReference<CompressionType> compressionTypeHolder, AtomicReference<byte[]> payloadHolder, @@ -91,4 +109,5 @@ public class RpcSearchInvokerTest { } }; } + } |