summaryrefslogtreecommitdiffstats
path: root/container-search/src
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@verizonmedia.com>2020-01-08 23:01:46 +0100
committerJon Bratseth <bratseth@verizonmedia.com>2020-01-08 23:01:46 +0100
commitc49a1256ef4c431f7a59018e4d62d1942fdeaf1e (patch)
tree883f7411e58ffbbb09238fd95d7bf76e6076bf92 /container-search/src
parent93057cb2141dee8846c9304eb6a9aae2ba9c4dc0 (diff)
Support max-hits-per-node in Dispatcher
Diffstat (limited to 'container-search/src')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java19
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java1
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java10
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java1
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java1
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java8
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java9
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java8
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java35
11 files changed, 75 insertions, 26 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index c04553ae2f5..14604d61c0a 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -85,7 +85,7 @@ public class FastSearcher extends VespaBackEndSearcher {
public Result doSearch2(Query query, Execution execution) {
if (dispatcher.searchCluster().groupSize() == 1)
forceSinglePassGrouping(query);
- try(SearchInvoker invoker = getSearchInvoker(query)) {
+ try (SearchInvoker invoker = getSearchInvoker(query)) {
Result result = invoker.search(query, execution);
injectSource(result.hits());
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 224facd0c5b..03b51fbaf70 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -66,6 +66,8 @@ public class Dispatcher extends AbstractComponent {
private final Metric metric;
private final Metric.Context metricContext;
+ private final int maxHitsPerNode;
+
private static final QueryProfileType argumentType;
static {
@@ -120,6 +122,7 @@ public class Dispatcher extends AbstractComponent {
this.invokerFactory = invokerFactory;
this.metric = metric;
this.metricContext = metric.createContext(null);
+ this.maxHitsPerNode = dispatchConfig.maxHitsPerNode();
searchCluster.startClusterMonitoring(pingFactory);
}
@@ -161,7 +164,11 @@ public class Dispatcher extends AbstractComponent {
if (nodes.isEmpty()) return Optional.empty();
query.trace(false, 2, "Dispatching with search path ", searchPath);
- return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), nodes, true);
+ return invokerFactory.createSearchInvoker(searcher, query,
+ OptionalInt.empty(),
+ nodes,
+ true,
+ maxHitsPerNode);
} catch (InvalidSearchPathException e) {
return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage())));
}
@@ -172,7 +179,12 @@ public class Dispatcher extends AbstractComponent {
if (directNode.isPresent()) {
Node node = directNode.get();
query.trace(false, 2, "Dispatching to ", node);
- return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), Arrays.asList(node), true)
+ return invokerFactory.createSearchInvoker(searcher,
+ query,
+ OptionalInt.empty(),
+ Arrays.asList(node),
+ true,
+ maxHitsPerNode)
.orElseThrow(() -> new IllegalStateException("Could not dispatch directly to " + node));
}
@@ -190,7 +202,8 @@ public class Dispatcher extends AbstractComponent {
query,
OptionalInt.of(group.id()),
group.nodes(),
- acceptIncompleteCoverage);
+ acceptIncompleteCoverage,
+ maxHitsPerNode);
if (invoker.isPresent()) {
query.trace(false, 2, "Dispatching to group ", group.id());
query.getModel().setSearchPath("/" + group.id());
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index 84e5e7d747f..cec3e94d551 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -36,6 +36,7 @@ import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT;
* @author ollivir
*/
public class InterleavedSearchInvoker extends SearchInvoker implements ResponseMonitor<SearchInvoker> {
+
private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName());
private final Set<SearchInvoker> invokers;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
index 6030e989595..1c3a90ac6ab 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -27,7 +27,10 @@ public abstract class InvokerFactory {
this.searchCluster = searchCluster;
}
- protected abstract Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, Node node);
+ protected abstract Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher,
+ Query query,
+ int maxHits,
+ Node node);
public abstract FillInvoker createFillInvoker(VespaBackEndSearcher searcher, Result result);
@@ -47,13 +50,14 @@ public abstract class InvokerFactory {
Query query,
OptionalInt groupId,
List<Node> nodes,
- boolean acceptIncompleteCoverage) {
+ boolean acceptIncompleteCoverage,
+ int maxHits) {
List<SearchInvoker> invokers = new ArrayList<>(nodes.size());
Set<Integer> failed = null;
for (Node node : nodes) {
boolean nodeAdded = false;
if (node.isWorking() != Boolean.FALSE) {
- Optional<SearchInvoker> invoker = createNodeSearchInvoker(searcher, query, node);
+ Optional<SearchInvoker> invoker = createNodeSearchInvoker(searcher, query, maxHits, node);
if (invoker.isPresent()) {
invokers.add(invoker.get());
nodeAdded = true;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java
index 52a45dc421c..a32931c43c8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java
@@ -18,6 +18,7 @@ import java.util.Optional;
* @author ollivir
*/
public class SearchErrorInvoker extends SearchInvoker {
+
private final ErrorMessage message;
private Query query;
private final Coverage coverage;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
index 77b3df7c83a..45ed1b87746 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
@@ -18,6 +18,7 @@ import java.util.Optional;
* @author ollivir
*/
public abstract class SearchInvoker extends CloseableInvoker {
+
private final Optional<Node> node;
private ResponseMonitor<SearchInvoker> monitor;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
index 58d7035c5e8..ae2258c4546 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
@@ -38,12 +38,12 @@ public class ProtobufSerialization {
private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024;
- static byte[] serializeSearchRequest(Query query, String serverId) {
- return convertFromQuery(query, serverId).toByteArray();
+ static byte[] serializeSearchRequest(Query query, int hits, String serverId) {
+ return convertFromQuery(query, hits, serverId).toByteArray();
}
- private static SearchProtocol.SearchRequest convertFromQuery(Query query, String serverId) {
- var builder = SearchProtocol.SearchRequest.newBuilder().setHits(query.getHits()).setOffset(query.getOffset())
+ private static SearchProtocol.SearchRequest convertFromQuery(Query query, int hits, String serverId) {
+ var builder = SearchProtocol.SearchRequest.newBuilder().setHits(hits).setOffset(query.getOffset())
.setTimeout((int) query.getTimeLeft());
var documentDb = query.getModel().getDocumentDb();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index 870f7aef9c5..5c9928de924 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -35,8 +35,11 @@ public class RpcInvokerFactory extends InvokerFactory implements PingFactory {
}
@Override
- protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, Node node) {
- return Optional.of(new RpcSearchInvoker(searcher, node, rpcResourcePool));
+ protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher,
+ Query query,
+ int maxHits,
+ Node node) {
+ return Optional.of(new RpcSearchInvoker(searcher, node, rpcResourcePool, maxHits));
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index 59f17501c32..07d8439ff46 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -25,25 +25,28 @@ import java.util.concurrent.TimeUnit;
* @author ollivir
*/
public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseReceiver {
+
private static final String RPC_METHOD = "vespa.searchprotocol.search";
private final VespaBackEndSearcher searcher;
private final Node node;
private final RpcResourcePool resourcePool;
private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses;
+ private final int maxHits;
private Query query;
- RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool) {
+ RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool, int maxHits) {
super(Optional.of(node));
this.searcher = searcher;
this.node = node;
this.resourcePool = resourcePool;
this.responses = new LinkedBlockingQueue<>(1);
+ this.maxHits = maxHits;
}
@Override
- protected void sendSearchRequest(Query query) throws IOException {
+ protected void sendSearchRequest(Query query) {
this.query = query;
Client.NodeConnection nodeConnection = resourcePool.getConnection(node.key());
@@ -54,7 +57,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
}
query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key());
- var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId());
+ var payload = ProtobufSerialization.serializeSearchRequest(query, Math.min(query.getHits(), maxHits), searcher.getServerId());
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
Compressor.Compression compressionResult = resourcePool.compress(query, payload);
nodeConnection.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), this, timeoutSeconds);
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
index 291b0f4890a..de6bafa267a 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
@@ -108,7 +108,8 @@ public class DispatcherTest {
Query query,
OptionalInt groupId,
List<Node> nodes,
- boolean acceptIncompleteCoverage) {
+ boolean acceptIncompleteCoverage,
+ int maxHitsPerNode) {
if (step >= events.length) {
throw new RuntimeException("Was not expecting more calls to getSearchInvoker");
}
@@ -126,7 +127,10 @@ public class DispatcherTest {
}
@Override
- protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, Node node) {
+ protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher,
+ Query query,
+ int maxHitsPerNode,
+ Node node) {
fail("Unexpected call to createNodeSearchInvoker");
return null;
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
index c07bf119782..ce19224b35f 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
@@ -19,15 +19,15 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* @author ollivir
*/
public class RpcSearchInvokerTest {
+
@Test
public void testProtobufSerialization() throws IOException {
var compressionTypeHolder = new AtomicReference<CompressionType>();
@@ -35,8 +35,7 @@ public class RpcSearchInvokerTest {
var lengthHolder = new AtomicInteger();
var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder);
var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123)));
- @SuppressWarnings("resource")
- var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool);
+ var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool, 1000);
Query q = new Query("search/?query=test&hits=10&offset=3");
invoker.sendSearchRequest(q);
@@ -44,9 +43,28 @@ public class RpcSearchInvokerTest {
var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get());
var request = SearchProtocol.SearchRequest.newBuilder().mergeFrom(bytes).build();
- assertThat(request.getHits(), equalTo(10));
- assertThat(request.getOffset(), equalTo(3));
- assertThat(request.getQueryTreeBlob().size(), greaterThan(0));
+ assertEquals(10, request.getHits());
+ assertEquals(3, request.getOffset());
+ assertTrue(request.getQueryTreeBlob().size() > 0);
+ }
+
+ @Test
+ public void testProtobufSerializationWithMaxHitsSet() throws IOException {
+ int maxHits = 5;
+ var compressionTypeHolder = new AtomicReference<CompressionType>();
+ var payloadHolder = new AtomicReference<byte[]>();
+ var lengthHolder = new AtomicInteger();
+ var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder);
+ var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123)));
+ var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool, maxHits);
+
+ Query q = new Query("search/?query=test&hits=10&offset=3");
+ invoker.sendSearchRequest(q);
+
+ var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get());
+ var request = SearchProtocol.SearchRequest.newBuilder().mergeFrom(bytes).build();
+
+ assertEquals(maxHits, request.getHits());
}
private Client parameterCollectorClient(AtomicReference<CompressionType> compressionTypeHolder, AtomicReference<byte[]> payloadHolder,
@@ -91,4 +109,5 @@ public class RpcSearchInvokerTest {
}
};
}
+
}