summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2019-04-05 14:23:22 +0200
committerOlli Virtanen <olli.virtanen@oath.com>2019-04-05 14:23:22 +0200
commit9faa5a3bc2d198e619732936459e11fb14634800 (patch)
treed83fd3a9484f2b9886465abf19ab687175e71280
parent747114c0f7835bf9a3e47a19e924856227efde8d (diff)
Optimizations for search protocol over jrt/protobuf
-rw-r--r--configdefinitions/src/vespa/dispatch.def3
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java50
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java6
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java71
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java10
-rw-r--r--container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java3
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java15
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java113
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java35
12 files changed, 183 insertions, 160 deletions
diff --git a/configdefinitions/src/vespa/dispatch.def b/configdefinitions/src/vespa/dispatch.def
index 7d5979bcdf1..477a781ebbc 100644
--- a/configdefinitions/src/vespa/dispatch.def
+++ b/configdefinitions/src/vespa/dispatch.def
@@ -40,6 +40,9 @@ minWaitAfterCoverageFactor double default=0
# Maximum wait time for full coverage after minimum coverage is achieved, factored based on time left at minimum coverage
maxWaitAfterCoverageFactor double default=1
+# Number of JRT connection supervisors
+numJrtSupervisors int default=8
+
# The unique key of a search node
node[].key int
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index cc37df04a62..e54e2187818 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -2,8 +2,6 @@
package com.yahoo.search.dispatch.rpc;
import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
-import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.FastHit;
import java.util.List;
@@ -15,14 +13,6 @@ import java.util.Optional;
* @author bratseth
*/
interface Client {
-
- void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression,
- int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver,
- double timeoutSeconds);
-
- void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength,
- byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds);
-
/** Creates a connection to a particular node in this */
NodeConnection createConnection(String hostname, int port);
@@ -91,6 +81,11 @@ interface Client {
}
interface NodeConnection {
+ void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, byte[] compressedSlime,
+ RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds);
+
+ void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ ResponseReceiver responseReceiver, double timeoutSeconds);
/** Closes this connection */
void close();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 2aa01b05955..7e48733106a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -29,31 +29,6 @@ class RpcClient implements Client {
return new RpcNodeConnection(hostname, port, supervisor);
}
- @Override
- public void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, int uncompressedLength,
- byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
- Request request = new Request("proton.getDocsums");
- request.parameters().add(new Int8Value(compression.getCode()));
- request.parameters().add(new Int32Value(uncompressedLength));
- request.parameters().add(new DataValue(compressedSlime));
-
- request.setContext(hits);
- RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
- rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver));
- }
-
- @Override
- public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
- ResponseReceiver responseReceiver, double timeoutSeconds) {
- Request request = new Request(rpcMethod);
- request.parameters().add(new Int8Value(compression.getCode()));
- request.parameters().add(new Int32Value(uncompressedLength));
- request.parameters().add(new DataValue(compressedPayload));
-
- RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
- rpcNode.invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(rpcNode, responseReceiver));
- }
-
private static class RpcNodeConnection implements NodeConnection {
// Information about the connected node
@@ -73,7 +48,30 @@ class RpcClient implements Client {
description = "rpc node connection to " + hostname + ":" + port;
}
- public void invokeAsync(Request req, double timeout, RequestWaiter waiter) {
+ @Override
+ public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength,
+ byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
+ Request request = new Request("proton.getDocsums");
+ request.parameters().add(new Int8Value(compression.getCode()));
+ request.parameters().add(new Int32Value(uncompressedLength));
+ request.parameters().add(new DataValue(compressedSlime));
+
+ request.setContext(hits);
+ invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(this, responseReceiver));
+ }
+
+ @Override
+ public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ ResponseReceiver responseReceiver, double timeoutSeconds) {
+ Request request = new Request(rpcMethod);
+ request.parameters().add(new Int8Value(compression.getCode()));
+ request.parameters().add(new Int32Value(uncompressedLength));
+ request.parameters().add(new DataValue(compressedPayload));
+
+ invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(this, responseReceiver));
+ }
+
+ private void invokeAsync(Request req, double timeout, RequestWaiter waiter) {
// TODO: Consider replacing this by a watcher on the target
synchronized(this) { // ensure we have exactly 1 valid connection across threads
if (target == null || ! target.isValid())
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
index 760f7486923..aa72823c809 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
@@ -100,7 +100,7 @@ public class RpcFillInvoker extends FillInvoker {
/** Send a getDocsums request to a node. Responses will be added to the given receiver. */
private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, CompressionType compression,
Result result, GetDocsumsResponseReceiver responseReceiver) {
- Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId);
+ Client.NodeConnection node = resourcePool.getConnection(nodeId);
if (node == null) {
String error = "Could not fill hits from unknown node " + nodeId;
responseReceiver.receive(Client.ResponseOrError.fromError(error));
@@ -114,9 +114,8 @@ public class RpcFillInvoker extends FillInvoker {
byte[] serializedSlime = BinaryFormat
.encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(), hits));
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
- Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, serializedSlime);
- resourcePool.client().getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(),
- responseReceiver, timeoutSeconds);
+ Compressor.Compression compressionResult = resourcePool.compress(query, serializedSlime);
+ node.getDocsums(hits, compressionResult.type(), serializedSlime.length, compressionResult.data(), responseReceiver, timeoutSeconds);
}
static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List<FastHit> hits) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
index f3479e2e4a9..c001b51ef11 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -52,12 +52,12 @@ public class RpcPing implements Callable<Pong> {
}
private void sendPing(LinkedBlockingQueue<ResponseOrError<ProtobufResponse>> queue) {
- var connection = resourcePool.nodeConnections().get(node.key());
+ var connection = resourcePool.getConnection(node.key());
var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray();
double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0;
Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping);
- resourcePool.client().request(RPC_METHOD, connection, compressionResult.type(), ping.length, compressionResult.data(),
- rsp -> queue.add(rsp), timeoutSeconds);
+ connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), rsp -> queue.add(rsp),
+ timeoutSeconds);
}
private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
index 3ec821beba8..cd4ba191a7d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -66,9 +66,6 @@ public class RpcProtobufFillInvoker extends FillInvoker {
protected void sendFillRequest(Result result, String summaryClass) {
ListMap<Integer, FastHit> hitsByNode = hitsByNode(result);
- CompressionType compression = CompressionType
- .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase());
-
result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf");
outstandingResponses = hitsByNode.size();
@@ -77,7 +74,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
var builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), serverId, summaryClass, summaryNeedsQuery);
for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) {
var payload = ProtobufSerialization.serializeDocsumRequest(builder, nodeHits.getValue());
- sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, compression, result);
+ sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, result);
}
}
@@ -117,8 +114,8 @@ public class RpcProtobufFillInvoker extends FillInvoker {
}
/** Send a docsums request to a node. Responses will be added to the given receiver. */
- private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, CompressionType compression, Result result) {
- Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId);
+ private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, Result result) {
+ Client.NodeConnection node = resourcePool.getConnection(nodeId);
if (node == null) {
String error = "Could not fill hits from unknown node " + nodeId;
receive(Client.ResponseOrError.fromError(error), hits);
@@ -129,9 +126,9 @@ public class RpcProtobufFillInvoker extends FillInvoker {
Query query = result.getQuery();
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
- Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload);
- resourcePool.client().request(RPC_METHOD, node, compressionResult.type(), payload.length, compressionResult.data(),
- roe -> receive(roe, hits), timeoutSeconds);
+ Compressor.Compression compressionResult = resourcePool.compress(query, payload);
+ node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> receive(roe, hits),
+ timeoutSeconds);
}
private void processResponses(Result result, String summaryClass) throws TimeoutException {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 830ba45ef0f..cccf8dd3693 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -2,12 +2,20 @@
package com.yahoo.search.dispatch.rpc;
import com.google.common.collect.ImmutableMap;
+import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
+import com.yahoo.compress.Compressor.Compression;
import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
import com.yahoo.search.dispatch.FillInvoker;
+import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
import com.yahoo.vespa.config.search.DispatchConfig;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
/**
* RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains
@@ -19,43 +27,70 @@ public class RpcResourcePool {
/** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
- private final Compressor compressor = new Compressor();
- private final Client client;
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 32);
+ private final Random random = new Random();
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
- private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections;
+ private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
- public RpcResourcePool(Client client, Map<Integer, Client.NodeConnection> nodeConnections) {
- this.client = client;
- this.nodeConnections = ImmutableMap.copyOf(nodeConnections);
+ public RpcResourcePool(Map<Integer, Client.NodeConnection> nodeConnections) {
+ var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
+ nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection))));
+ this.nodeConnectionPools = builder.build();
}
public RpcResourcePool(DispatchConfig dispatchConfig) {
- this.client = new RpcClient();
+ var clients = new ArrayList<RpcClient>(dispatchConfig.numJrtSupervisors());
+ for (int i = 0; i < dispatchConfig.numJrtSupervisors(); i++) {
+ clients.add(new RpcClient());
+ }
- // Create node rpc connections, indexed by the node distribution key
- ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>();
- for (DispatchConfig.Node node : dispatchConfig.node()) {
- nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port()));
+ // Create node rpc connection pools, indexed by the node distribution key
+ var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
+ for (var node : dispatchConfig.node()) {
+ var connections = new ArrayList<Client.NodeConnection>(clients.size());
+ clients.forEach(client -> connections.add(client.createConnection(node.host(), node.port())));
+ builder.put(node.key(), new NodeConnectionPool(connections));
}
- this.nodeConnections = nodeConnectionsBuilder.build();
+ this.nodeConnectionPools = builder.build();
}
public Compressor compressor() {
return compressor;
}
- public Client client() {
- return client;
+ public Compression compress(Query query, byte[] payload) {
+ CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase());
+ return compressor.compress(compression, payload);
}
- public ImmutableMap<Integer, Client.NodeConnection> nodeConnections() {
- return nodeConnections;
+ public NodeConnection getConnection(int nodeId) {
+ var pool = nodeConnectionPools.get(nodeId);
+ if (pool == null) {
+ return null;
+ } else {
+ return pool.nextConnection();
+ }
}
public void release() {
- for (Client.NodeConnection nodeConnection : nodeConnections.values()) {
- nodeConnection.close();
+ nodeConnectionPools.values().forEach(NodeConnectionPool::release);
+ }
+
+ private class NodeConnectionPool {
+ private final List<Client.NodeConnection> connections;
+
+ NodeConnectionPool(List<Client.NodeConnection> connections) {
+ this.connections = connections;
+ }
+
+ Client.NodeConnection nextConnection() {
+ int slot = random.nextInt(connections.size());
+ return connections.get(slot);
+ }
+
+ void release() {
+ connections.forEach(Client.NodeConnection::close);
}
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index d70a7d95b63..75e9b06f445 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -46,10 +46,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
protected void sendSearchRequest(Query query) throws IOException {
this.query = query;
- CompressionType compression = CompressionType
- .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase());
-
- Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key());
+ Client.NodeConnection nodeConnection = resourcePool.getConnection(node.key());
if (nodeConnection == null) {
responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + node.key()));
responseAvailable();
@@ -59,9 +56,8 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId());
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
- Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload);
- resourcePool.client().request(RPC_METHOD, nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this,
- timeoutSeconds);
+ Compressor.Compression compressionResult = resourcePool.compress(query, payload);
+ nodeConnection.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), this, timeoutSeconds);
}
@Override
diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java
index f4be2943f5f..497f05e8341 100644
--- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java
+++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java
@@ -154,7 +154,8 @@ public class FastSearcherTestCase {
doFill(fastSearcher, result);
ErrorMessage error = result.hits().getError();
assertEquals("Since we don't actually run summary backends we get this error when the Dispatcher is used",
- "Error response from rpc node connection to host1:0: Connection error", error.getDetailedMessage());
+ "Error response from rpc node connection to hostX:0: Connection error",
+ error.getDetailedMessage().replaceAll("host[12]", "hostX"));
}
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java
index e059008acac..6d1f19eeaf2 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java
@@ -22,7 +22,6 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-
/**
* Tests using a dispatcher to fill a result
*
@@ -38,7 +37,7 @@ public class FillTestCase {
nodes.put(0, client.createConnection("host0", 123));
nodes.put(1, client.createConnection("host1", 123));
nodes.put(2, client.createConnection("host2", 123));
- RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
+ RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes);
RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true);
Query query = new Query();
@@ -75,7 +74,7 @@ public class FillTestCase {
nodes.put(0, client.createConnection("host0", 123));
nodes.put(1, client.createConnection("host1", 123));
nodes.put(2, client.createConnection("host2", 123));
- RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
+ RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes);
RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true);
Query query = new Query();
@@ -90,7 +89,7 @@ public class FillTestCase {
client.setDocsumReponse("host2", 1, "summaryClass1", map("field1", "s.2.1", "field2", 1));
client.setDocsumReponse("host1", 2, "summaryClass1", new HashMap<>());
client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3));
- client.setDocsumReponse("host0", 4, "summaryClass1",new HashMap<>());
+ client.setDocsumReponse("host0", 4, "summaryClass1", new HashMap<>());
factory.createFillInvoker(db()).fill(result, "summaryClass1");
@@ -115,7 +114,7 @@ public class FillTestCase {
Map<Integer, Client.NodeConnection> nodes = new HashMap<>();
nodes.put(0, client.createConnection("host0", 123));
- RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
+ RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes);
RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true);
Query query = new Query();
@@ -133,7 +132,7 @@ public class FillTestCase {
Map<Integer, Client.NodeConnection> nodes = new HashMap<>();
nodes.put(0, client.createConnection("host0", 123));
- RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
+ RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes);
RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true);
Query query = new Query();
@@ -141,7 +140,6 @@ public class FillTestCase {
result.hits().add(createHit(0, 0));
result.hits().add(createHit(1, 1));
-
factory.createFillInvoker(db()).fill(result, "summaryClass1");
assertEquals("Could not fill hits from unknown node 1", result.hits().getError().getDetailedMessage());
@@ -151,8 +149,7 @@ public class FillTestCase {
List<DocsumField> fields = new ArrayList<>();
fields.add(DocsumField.create("field1", "string"));
fields.add(DocsumField.create("field2", "int64"));
- DocsumDefinitionSet docsums = new DocsumDefinitionSet(Collections.singleton(new DocsumDefinition("summaryClass1",
- fields)));
+ DocsumDefinitionSet docsums = new DocsumDefinitionSet(Collections.singleton(new DocsumDefinition("summaryClass1", fields)));
return new DocumentDatabase("default", docsums, Collections.emptySet());
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java
index 687d3e728c0..3cc3257194c 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java
@@ -36,62 +36,6 @@ public class MockClient implements Client {
return new MockNodeConnection(hostname, port);
}
- @Override
- public void getDocsums(List<FastHit> hitsContext, NodeConnection node, CompressionType compression,
- int uncompressedSize, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver,
- double timeoutSeconds) {
- if (malfunctioning) {
- responseReceiver.receive(ResponseOrError.fromError("Malfunctioning"));
- return;
- }
-
- Inspector request = BinaryFormat.decode(compressor.decompress(compressedSlime, compression, uncompressedSize)).get();
- String docsumClass = request.field("class").asString();
- List<Map<String, Object>> docsumsToReturn = new ArrayList<>();
- request.field("gids").traverse((ArrayTraverser)(index, gid) -> {
- GlobalId docId = new GlobalId(gid.asData());
- docsumsToReturn.add(docsums.get(new DocsumKey(node.toString(), docId, docsumClass)));
- });
- Slime responseSlime = new Slime();
- Cursor root = responseSlime.setObject();
- Cursor docsums = root.setArray("docsums");
- for (Map<String, Object> docsumFields : docsumsToReturn) {
- Cursor docsumItem = docsums.addObject();
- Cursor docsum = docsumItem.setObject("docsum");
- for (Map.Entry<String, Object> field : docsumFields.entrySet()) {
- if (field.getValue() instanceof Integer)
- docsum.setLong(field.getKey(), (Integer)field.getValue());
- else if (field.getValue() instanceof String)
- docsum.setString(field.getKey(), (String)field.getValue());
- else
- throw new RuntimeException();
- }
- }
- byte[] slimeBytes = BinaryFormat.encode(responseSlime);
- Compressor.Compression compressionResult = compressor.compress(compression, slimeBytes);
- GetDocsumsResponse response = new GetDocsumsResponse(compressionResult.type().getCode(), slimeBytes.length,
- compressionResult.data(), hitsContext);
- responseReceiver.receive(ResponseOrError.fromResponse(response));
- }
-
- @Override
- public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
- ResponseReceiver responseReceiver, double timeoutSeconds) {
- if (malfunctioning) {
- responseReceiver.receive(ResponseOrError.fromError("Malfunctioning"));
- return;
- }
-
- if(searchResult == null) {
- responseReceiver.receive(ResponseOrError.fromError("No result defined"));
- return;
- }
- var payload = ProtobufSerialization.serializeResult(searchResult);
- var compressionResult = compressor.compress(compression, payload);
- var response = new ProtobufResponse(compressionResult.type().getCode(), payload.length, compressionResult.data());
- responseReceiver.receive(ResponseOrError.fromResponse(response));
- }
-
public void setDocsumReponse(String nodeId, int docId, String docsumClass, Map<String, Object> docsumValues) {
docsums.put(new DocsumKey(nodeId, globalIdFrom(docId), docsumClass), docsumValues);
}
@@ -100,7 +44,7 @@ public class MockClient implements Client {
return new GlobalId(new IdIdString("", "test", "", String.valueOf(hitId)));
}
- private static class MockNodeConnection implements Client.NodeConnection {
+ private class MockNodeConnection implements Client.NodeConnection {
private final String hostname;
@@ -109,6 +53,61 @@ public class MockClient implements Client {
}
@Override
+ public void getDocsums(List<FastHit> hitsContext, CompressionType compression, int uncompressedSize, byte[] compressedSlime,
+ RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
+ if (malfunctioning) {
+ responseReceiver.receive(ResponseOrError.fromError("Malfunctioning"));
+ return;
+ }
+
+ Inspector request = BinaryFormat.decode(compressor.decompress(compressedSlime, compression, uncompressedSize)).get();
+ String docsumClass = request.field("class").asString();
+ List<Map<String, Object>> docsumsToReturn = new ArrayList<>();
+ request.field("gids").traverse((ArrayTraverser) (index, gid) -> {
+ GlobalId docId = new GlobalId(gid.asData());
+ docsumsToReturn.add(docsums.get(new DocsumKey(toString(), docId, docsumClass)));
+ });
+ Slime responseSlime = new Slime();
+ Cursor root = responseSlime.setObject();
+ Cursor docsums = root.setArray("docsums");
+ for (Map<String, Object> docsumFields : docsumsToReturn) {
+ Cursor docsumItem = docsums.addObject();
+ Cursor docsum = docsumItem.setObject("docsum");
+ for (Map.Entry<String, Object> field : docsumFields.entrySet()) {
+ if (field.getValue() instanceof Integer)
+ docsum.setLong(field.getKey(), (Integer) field.getValue());
+ else if (field.getValue() instanceof String)
+ docsum.setString(field.getKey(), (String) field.getValue());
+ else
+ throw new RuntimeException();
+ }
+ }
+ byte[] slimeBytes = BinaryFormat.encode(responseSlime);
+ Compressor.Compression compressionResult = compressor.compress(compression, slimeBytes);
+ GetDocsumsResponse response = new GetDocsumsResponse(compressionResult.type().getCode(), slimeBytes.length,
+ compressionResult.data(), hitsContext);
+ responseReceiver.receive(ResponseOrError.fromResponse(response));
+ }
+
+ @Override
+ public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ ResponseReceiver responseReceiver, double timeoutSeconds) {
+ if (malfunctioning) {
+ responseReceiver.receive(ResponseOrError.fromError("Malfunctioning"));
+ return;
+ }
+
+ if(searchResult == null) {
+ responseReceiver.receive(ResponseOrError.fromError("No result defined"));
+ return;
+ }
+ var payload = ProtobufSerialization.serializeResult(searchResult);
+ var compressionResult = compressor.compress(compression, payload);
+ var response = new ProtobufResponse(compressionResult.type().getCode(), payload.length, compressionResult.data());
+ responseReceiver.receive(ResponseOrError.fromResponse(response));
+ }
+
+ @Override
public void close() { }
@Override
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
index 64863b9a8a6..d629bd36bb1 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
@@ -34,7 +34,7 @@ public class RpcSearchInvokerTest {
var payloadHolder = new AtomicReference<byte[]>();
var lengthHolder = new AtomicInteger();
var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder);
- var mockPool = new RpcResourcePool(mockClient, ImmutableMap.of(7, () -> {}));
+ var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123)));
@SuppressWarnings("resource")
var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 77, 1), mockPool);
@@ -53,23 +53,26 @@ public class RpcSearchInvokerTest {
AtomicInteger lengthHolder) {
return new Client() {
@Override
- public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength,
- byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) {
- compressionTypeHolder.set(compression);
- payloadHolder.set(compressedPayload);
- lengthHolder.set(uncompressedLength);
- }
+ public NodeConnection createConnection(String hostname, int port) {
+ return new NodeConnection() {
+ @Override
+ public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, byte[] compressedSlime,
+ GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
+ fail("Unexpected call");
+ }
- @Override
- public void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, int uncompressedLength,
- byte[] compressedSlime, GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
- fail("Unexpected call");
- }
+ @Override
+ public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ ResponseReceiver responseReceiver, double timeoutSeconds) {
+ compressionTypeHolder.set(compression);
+ payloadHolder.set(compressedPayload);
+ lengthHolder.set(uncompressedLength);
+ }
- @Override
- public NodeConnection createConnection(String hostname, int port) {
- fail("Unexpected call");
- return null;
+ @Override
+ public void close() {
+ }
+ };
}
};
}