From 9faa5a3bc2d198e619732936459e11fb14634800 Mon Sep 17 00:00:00 2001 From: Olli Virtanen Date: Fri, 5 Apr 2019 14:23:22 +0200 Subject: Optimizations for search protocol over jrt/protobuf --- configdefinitions/src/vespa/dispatch.def | 3 + .../java/com/yahoo/search/dispatch/rpc/Client.java | 15 +-- .../com/yahoo/search/dispatch/rpc/RpcClient.java | 50 +++++---- .../yahoo/search/dispatch/rpc/RpcFillInvoker.java | 7 +- .../com/yahoo/search/dispatch/rpc/RpcPing.java | 6 +- .../dispatch/rpc/RpcProtobufFillInvoker.java | 15 ++- .../yahoo/search/dispatch/rpc/RpcResourcePool.java | 71 +++++++++---- .../search/dispatch/rpc/RpcSearchInvoker.java | 10 +- .../fastsearch/test/FastSearcherTestCase.java | 3 +- .../yahoo/search/dispatch/rpc/FillTestCase.java | 15 ++- .../com/yahoo/search/dispatch/rpc/MockClient.java | 113 ++++++++++----------- .../search/dispatch/rpc/RpcSearchInvokerTest.java | 35 ++++--- 12 files changed, 183 insertions(+), 160 deletions(-) diff --git a/configdefinitions/src/vespa/dispatch.def b/configdefinitions/src/vespa/dispatch.def index 7d5979bcdf1..477a781ebbc 100644 --- a/configdefinitions/src/vespa/dispatch.def +++ b/configdefinitions/src/vespa/dispatch.def @@ -40,6 +40,9 @@ minWaitAfterCoverageFactor double default=0 # Maximum wait time for full coverage after minimum coverage is achieved, factored based on time left at minimum coverage maxWaitAfterCoverageFactor double default=1 +# Number of JRT connection supervisors +numJrtSupervisors int default=8 + # The unique key of a search node node[].key int diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index cc37df04a62..e54e2187818 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -2,8 +2,6 @@ package com.yahoo.search.dispatch.rpc; import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.FastHit; import java.util.List; @@ -15,14 +13,6 @@ import java.util.Optional; * @author bratseth */ interface Client { - - void getDocsums(List hits, NodeConnection node, CompressionType compression, - int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, - double timeoutSeconds); - - void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds); - /** Creates a connection to a particular node in this */ NodeConnection createConnection(String hostname, int port); @@ -91,6 +81,11 @@ interface Client { } interface NodeConnection { + void getDocsums(List hits, CompressionType compression, int uncompressedLength, byte[] compressedSlime, + RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds); + + void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + ResponseReceiver responseReceiver, double timeoutSeconds); /** Closes this connection */ void close(); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 2aa01b05955..7e48733106a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -29,31 +29,6 @@ class RpcClient implements Client { return new RpcNodeConnection(hostname, port, supervisor); } - @Override - public void getDocsums(List hits, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - Request request = new Request("proton.getDocsums"); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedSlime)); - - request.setContext(hits); - RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver)); - } - - @Override - public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - ResponseReceiver responseReceiver, double timeoutSeconds) { - Request request = new Request(rpcMethod); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedPayload)); - - RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(rpcNode, responseReceiver)); - } - private static class RpcNodeConnection implements NodeConnection { // Information about the connected node @@ -73,7 +48,30 @@ class RpcClient implements Client { description = "rpc node connection to " + hostname + ":" + port; } - public void invokeAsync(Request req, double timeout, RequestWaiter waiter) { + @Override + public void getDocsums(List hits, CompressionType compression, int uncompressedLength, + byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { + Request request = new Request("proton.getDocsums"); + request.parameters().add(new Int8Value(compression.getCode())); + request.parameters().add(new Int32Value(uncompressedLength)); + request.parameters().add(new DataValue(compressedSlime)); + + request.setContext(hits); + invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(this, responseReceiver)); + } + + @Override + public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + ResponseReceiver responseReceiver, double timeoutSeconds) { + Request request = new Request(rpcMethod); + request.parameters().add(new Int8Value(compression.getCode())); + request.parameters().add(new Int32Value(uncompressedLength)); + request.parameters().add(new DataValue(compressedPayload)); + + invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(this, responseReceiver)); + } + + private void invokeAsync(Request req, double timeout, RequestWaiter waiter) { // TODO: Consider replacing this by a watcher on the target synchronized(this) { // ensure we have exactly 1 valid connection across threads if (target == null || ! target.isValid()) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java index 760f7486923..aa72823c809 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java @@ -100,7 +100,7 @@ public class RpcFillInvoker extends FillInvoker { /** Send a getDocsums request to a node. Responses will be added to the given receiver. */ private void sendGetDocsumsRequest(int nodeId, List hits, String summaryClass, CompressionType compression, Result result, GetDocsumsResponseReceiver responseReceiver) { - Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId); + Client.NodeConnection node = resourcePool.getConnection(nodeId); if (node == null) { String error = "Could not fill hits from unknown node " + nodeId; responseReceiver.receive(Client.ResponseOrError.fromError(error)); @@ -114,9 +114,8 @@ public class RpcFillInvoker extends FillInvoker { byte[] serializedSlime = BinaryFormat .encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(), hits)); double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, serializedSlime); - resourcePool.client().getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(), - responseReceiver, timeoutSeconds); + Compressor.Compression compressionResult = resourcePool.compress(query, serializedSlime); + node.getDocsums(hits, compressionResult.type(), serializedSlime.length, compressionResult.data(), responseReceiver, timeoutSeconds); } static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List hits) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index f3479e2e4a9..c001b51ef11 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -52,12 +52,12 @@ public class RpcPing implements Callable { } private void sendPing(LinkedBlockingQueue> queue) { - var connection = resourcePool.nodeConnections().get(node.key()); + var connection = resourcePool.getConnection(node.key()); var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray(); double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0; Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping); - resourcePool.client().request(RPC_METHOD, connection, compressionResult.type(), ping.length, compressionResult.data(), - rsp -> queue.add(rsp), timeoutSeconds); + connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), rsp -> queue.add(rsp), + timeoutSeconds); } private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java index 3ec821beba8..cd4ba191a7d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java @@ -66,9 +66,6 @@ public class RpcProtobufFillInvoker extends FillInvoker { protected void sendFillRequest(Result result, String summaryClass) { ListMap hitsByNode = hitsByNode(result); - CompressionType compression = CompressionType - .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); - result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf"); outstandingResponses = hitsByNode.size(); @@ -77,7 +74,7 @@ public class RpcProtobufFillInvoker extends FillInvoker { var builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), serverId, summaryClass, summaryNeedsQuery); for (Map.Entry> nodeHits : hitsByNode.entrySet()) { var payload = ProtobufSerialization.serializeDocsumRequest(builder, nodeHits.getValue()); - sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, compression, result); + sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, result); } } @@ -117,8 +114,8 @@ public class RpcProtobufFillInvoker extends FillInvoker { } /** Send a docsums request to a node. Responses will be added to the given receiver. */ - private void sendDocsumsRequest(int nodeId, List hits, byte[] payload, CompressionType compression, Result result) { - Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId); + private void sendDocsumsRequest(int nodeId, List hits, byte[] payload, Result result) { + Client.NodeConnection node = resourcePool.getConnection(nodeId); if (node == null) { String error = "Could not fill hits from unknown node " + nodeId; receive(Client.ResponseOrError.fromError(error), hits); @@ -129,9 +126,9 @@ public class RpcProtobufFillInvoker extends FillInvoker { Query query = result.getQuery(); double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); - resourcePool.client().request(RPC_METHOD, node, compressionResult.type(), payload.length, compressionResult.data(), - roe -> receive(roe, hits), timeoutSeconds); + Compressor.Compression compressionResult = resourcePool.compress(query, payload); + node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> receive(roe, hits), + timeoutSeconds); } private void processResponses(Result result, String summaryClass) throws TimeoutException { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index 830ba45ef0f..cccf8dd3693 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -2,12 +2,20 @@ package com.yahoo.search.dispatch.rpc; import com.google.common.collect.ImmutableMap; +import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; +import com.yahoo.compress.Compressor.Compression; import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; import com.yahoo.search.dispatch.FillInvoker; +import com.yahoo.search.dispatch.rpc.Client.NodeConnection; import com.yahoo.vespa.config.search.DispatchConfig; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Random; /** * RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains @@ -19,43 +27,70 @@ public class RpcResourcePool { /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); - private final Compressor compressor = new Compressor(); - private final Client client; + private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 32); + private final Random random = new Random(); /** Connections to the search nodes this talks to, indexed by node id ("partid") */ - private final ImmutableMap nodeConnections; + private final ImmutableMap nodeConnectionPools; - public RpcResourcePool(Client client, Map nodeConnections) { - this.client = client; - this.nodeConnections = ImmutableMap.copyOf(nodeConnections); + public RpcResourcePool(Map nodeConnections) { + var builder = new ImmutableMap.Builder(); + nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection)))); + this.nodeConnectionPools = builder.build(); } public RpcResourcePool(DispatchConfig dispatchConfig) { - this.client = new RpcClient(); + var clients = new ArrayList(dispatchConfig.numJrtSupervisors()); + for (int i = 0; i < dispatchConfig.numJrtSupervisors(); i++) { + clients.add(new RpcClient()); + } - // Create node rpc connections, indexed by the node distribution key - ImmutableMap.Builder nodeConnectionsBuilder = new ImmutableMap.Builder<>(); - for (DispatchConfig.Node node : dispatchConfig.node()) { - nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port())); + // Create node rpc connection pools, indexed by the node distribution key + var builder = new ImmutableMap.Builder(); + for (var node : dispatchConfig.node()) { + var connections = new ArrayList(clients.size()); + clients.forEach(client -> connections.add(client.createConnection(node.host(), node.port()))); + builder.put(node.key(), new NodeConnectionPool(connections)); } - this.nodeConnections = nodeConnectionsBuilder.build(); + this.nodeConnectionPools = builder.build(); } public Compressor compressor() { return compressor; } - public Client client() { - return client; + public Compression compress(Query query, byte[] payload) { + CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase()); + return compressor.compress(compression, payload); } - public ImmutableMap nodeConnections() { - return nodeConnections; + public NodeConnection getConnection(int nodeId) { + var pool = nodeConnectionPools.get(nodeId); + if (pool == null) { + return null; + } else { + return pool.nextConnection(); + } } public void release() { - for (Client.NodeConnection nodeConnection : nodeConnections.values()) { - nodeConnection.close(); + nodeConnectionPools.values().forEach(NodeConnectionPool::release); + } + + private class NodeConnectionPool { + private final List connections; + + NodeConnectionPool(List connections) { + this.connections = connections; + } + + Client.NodeConnection nextConnection() { + int slot = random.nextInt(connections.size()); + return connections.get(slot); + } + + void release() { + connections.forEach(Client.NodeConnection::close); } } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java index d70a7d95b63..75e9b06f445 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java @@ -46,10 +46,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe protected void sendSearchRequest(Query query) throws IOException { this.query = query; - CompressionType compression = CompressionType - .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); - - Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key()); + Client.NodeConnection nodeConnection = resourcePool.getConnection(node.key()); if (nodeConnection == null) { responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + node.key())); responseAvailable(); @@ -59,9 +56,8 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId()); double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); - resourcePool.client().request(RPC_METHOD, nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, - timeoutSeconds); + Compressor.Compression compressionResult = resourcePool.compress(query, payload); + nodeConnection.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), this, timeoutSeconds); } @Override diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java index f4be2943f5f..497f05e8341 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java @@ -154,7 +154,8 @@ public class FastSearcherTestCase { doFill(fastSearcher, result); ErrorMessage error = result.hits().getError(); assertEquals("Since we don't actually run summary backends we get this error when the Dispatcher is used", - "Error response from rpc node connection to host1:0: Connection error", error.getDetailedMessage()); + "Error response from rpc node connection to hostX:0: Connection error", + error.getDetailedMessage().replaceAll("host[12]", "hostX")); } } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java index e059008acac..6d1f19eeaf2 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java @@ -22,7 +22,6 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; - /** * Tests using a dispatcher to fill a result * @@ -38,7 +37,7 @@ public class FillTestCase { nodes.put(0, client.createConnection("host0", 123)); nodes.put(1, client.createConnection("host1", 123)); nodes.put(2, client.createConnection("host2", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes); RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true); Query query = new Query(); @@ -75,7 +74,7 @@ public class FillTestCase { nodes.put(0, client.createConnection("host0", 123)); nodes.put(1, client.createConnection("host1", 123)); nodes.put(2, client.createConnection("host2", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes); RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true); Query query = new Query(); @@ -90,7 +89,7 @@ public class FillTestCase { client.setDocsumReponse("host2", 1, "summaryClass1", map("field1", "s.2.1", "field2", 1)); client.setDocsumReponse("host1", 2, "summaryClass1", new HashMap<>()); client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); - client.setDocsumReponse("host0", 4, "summaryClass1",new HashMap<>()); + client.setDocsumReponse("host0", 4, "summaryClass1", new HashMap<>()); factory.createFillInvoker(db()).fill(result, "summaryClass1"); @@ -115,7 +114,7 @@ public class FillTestCase { Map nodes = new HashMap<>(); nodes.put(0, client.createConnection("host0", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes); RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true); Query query = new Query(); @@ -133,7 +132,7 @@ public class FillTestCase { Map nodes = new HashMap<>(); nodes.put(0, client.createConnection("host0", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes); RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null, true); Query query = new Query(); @@ -141,7 +140,6 @@ public class FillTestCase { result.hits().add(createHit(0, 0)); result.hits().add(createHit(1, 1)); - factory.createFillInvoker(db()).fill(result, "summaryClass1"); assertEquals("Could not fill hits from unknown node 1", result.hits().getError().getDetailedMessage()); @@ -151,8 +149,7 @@ public class FillTestCase { List fields = new ArrayList<>(); fields.add(DocsumField.create("field1", "string")); fields.add(DocsumField.create("field2", "int64")); - DocsumDefinitionSet docsums = new DocsumDefinitionSet(Collections.singleton(new DocsumDefinition("summaryClass1", - fields))); + DocsumDefinitionSet docsums = new DocsumDefinitionSet(Collections.singleton(new DocsumDefinition("summaryClass1", fields))); return new DocumentDatabase("default", docsums, Collections.emptySet()); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java index 687d3e728c0..3cc3257194c 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java @@ -36,62 +36,6 @@ public class MockClient implements Client { return new MockNodeConnection(hostname, port); } - @Override - public void getDocsums(List hitsContext, NodeConnection node, CompressionType compression, - int uncompressedSize, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, - double timeoutSeconds) { - if (malfunctioning) { - responseReceiver.receive(ResponseOrError.fromError("Malfunctioning")); - return; - } - - Inspector request = BinaryFormat.decode(compressor.decompress(compressedSlime, compression, uncompressedSize)).get(); - String docsumClass = request.field("class").asString(); - List> docsumsToReturn = new ArrayList<>(); - request.field("gids").traverse((ArrayTraverser)(index, gid) -> { - GlobalId docId = new GlobalId(gid.asData()); - docsumsToReturn.add(docsums.get(new DocsumKey(node.toString(), docId, docsumClass))); - }); - Slime responseSlime = new Slime(); - Cursor root = responseSlime.setObject(); - Cursor docsums = root.setArray("docsums"); - for (Map docsumFields : docsumsToReturn) { - Cursor docsumItem = docsums.addObject(); - Cursor docsum = docsumItem.setObject("docsum"); - for (Map.Entry field : docsumFields.entrySet()) { - if (field.getValue() instanceof Integer) - docsum.setLong(field.getKey(), (Integer)field.getValue()); - else if (field.getValue() instanceof String) - docsum.setString(field.getKey(), (String)field.getValue()); - else - throw new RuntimeException(); - } - } - byte[] slimeBytes = BinaryFormat.encode(responseSlime); - Compressor.Compression compressionResult = compressor.compress(compression, slimeBytes); - GetDocsumsResponse response = new GetDocsumsResponse(compressionResult.type().getCode(), slimeBytes.length, - compressionResult.data(), hitsContext); - responseReceiver.receive(ResponseOrError.fromResponse(response)); - } - - @Override - public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - ResponseReceiver responseReceiver, double timeoutSeconds) { - if (malfunctioning) { - responseReceiver.receive(ResponseOrError.fromError("Malfunctioning")); - return; - } - - if(searchResult == null) { - responseReceiver.receive(ResponseOrError.fromError("No result defined")); - return; - } - var payload = ProtobufSerialization.serializeResult(searchResult); - var compressionResult = compressor.compress(compression, payload); - var response = new ProtobufResponse(compressionResult.type().getCode(), payload.length, compressionResult.data()); - responseReceiver.receive(ResponseOrError.fromResponse(response)); - } - public void setDocsumReponse(String nodeId, int docId, String docsumClass, Map docsumValues) { docsums.put(new DocsumKey(nodeId, globalIdFrom(docId), docsumClass), docsumValues); } @@ -100,7 +44,7 @@ public class MockClient implements Client { return new GlobalId(new IdIdString("", "test", "", String.valueOf(hitId))); } - private static class MockNodeConnection implements Client.NodeConnection { + private class MockNodeConnection implements Client.NodeConnection { private final String hostname; @@ -108,6 +52,61 @@ public class MockClient implements Client { this.hostname = hostname; } + @Override + public void getDocsums(List hitsContext, CompressionType compression, int uncompressedSize, byte[] compressedSlime, + RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { + if (malfunctioning) { + responseReceiver.receive(ResponseOrError.fromError("Malfunctioning")); + return; + } + + Inspector request = BinaryFormat.decode(compressor.decompress(compressedSlime, compression, uncompressedSize)).get(); + String docsumClass = request.field("class").asString(); + List> docsumsToReturn = new ArrayList<>(); + request.field("gids").traverse((ArrayTraverser) (index, gid) -> { + GlobalId docId = new GlobalId(gid.asData()); + docsumsToReturn.add(docsums.get(new DocsumKey(toString(), docId, docsumClass))); + }); + Slime responseSlime = new Slime(); + Cursor root = responseSlime.setObject(); + Cursor docsums = root.setArray("docsums"); + for (Map docsumFields : docsumsToReturn) { + Cursor docsumItem = docsums.addObject(); + Cursor docsum = docsumItem.setObject("docsum"); + for (Map.Entry field : docsumFields.entrySet()) { + if (field.getValue() instanceof Integer) + docsum.setLong(field.getKey(), (Integer) field.getValue()); + else if (field.getValue() instanceof String) + docsum.setString(field.getKey(), (String) field.getValue()); + else + throw new RuntimeException(); + } + } + byte[] slimeBytes = BinaryFormat.encode(responseSlime); + Compressor.Compression compressionResult = compressor.compress(compression, slimeBytes); + GetDocsumsResponse response = new GetDocsumsResponse(compressionResult.type().getCode(), slimeBytes.length, + compressionResult.data(), hitsContext); + responseReceiver.receive(ResponseOrError.fromResponse(response)); + } + + @Override + public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + ResponseReceiver responseReceiver, double timeoutSeconds) { + if (malfunctioning) { + responseReceiver.receive(ResponseOrError.fromError("Malfunctioning")); + return; + } + + if(searchResult == null) { + responseReceiver.receive(ResponseOrError.fromError("No result defined")); + return; + } + var payload = ProtobufSerialization.serializeResult(searchResult); + var compressionResult = compressor.compress(compression, payload); + var response = new ProtobufResponse(compressionResult.type().getCode(), payload.length, compressionResult.data()); + responseReceiver.receive(ResponseOrError.fromResponse(response)); + } + @Override public void close() { } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java index 64863b9a8a6..d629bd36bb1 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java @@ -34,7 +34,7 @@ public class RpcSearchInvokerTest { var payloadHolder = new AtomicReference(); var lengthHolder = new AtomicInteger(); var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder); - var mockPool = new RpcResourcePool(mockClient, ImmutableMap.of(7, () -> {})); + var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123))); @SuppressWarnings("resource") var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 77, 1), mockPool); @@ -53,23 +53,26 @@ public class RpcSearchInvokerTest { AtomicInteger lengthHolder) { return new Client() { @Override - public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) { - compressionTypeHolder.set(compression); - payloadHolder.set(compressedPayload); - lengthHolder.set(uncompressedLength); - } + public NodeConnection createConnection(String hostname, int port) { + return new NodeConnection() { + @Override + public void getDocsums(List hits, CompressionType compression, int uncompressedLength, byte[] compressedSlime, + GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { + fail("Unexpected call"); + } - @Override - public void getDocsums(List hits, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedSlime, GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - fail("Unexpected call"); - } + @Override + public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + ResponseReceiver responseReceiver, double timeoutSeconds) { + compressionTypeHolder.set(compression); + payloadHolder.set(compressedPayload); + lengthHolder.set(uncompressedLength); + } - @Override - public NodeConnection createConnection(String hostname, int port) { - fail("Unexpected call"); - return null; + @Override + public void close() { + } + }; } }; } -- cgit v1.2.3