diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2019-04-05 14:23:22 +0200 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2019-04-05 14:23:22 +0200 |
commit | 9faa5a3bc2d198e619732936459e11fb14634800 (patch) | |
tree | d83fd3a9484f2b9886465abf19ab687175e71280 /container-search/src/main/java/com/yahoo/search | |
parent | 747114c0f7835bf9a3e47a19e924856227efde8d (diff) |
Optimizations for search protocol over jrt/protobuf
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search')
7 files changed, 97 insertions, 77 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index cc37df04a62..e54e2187818 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -2,8 +2,6 @@ package com.yahoo.search.dispatch.rpc; import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.FastHit; import java.util.List; @@ -15,14 +13,6 @@ import java.util.Optional; * @author bratseth */ interface Client { - - void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, - int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, - double timeoutSeconds); - - void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds); - /** Creates a connection to a particular node in this */ NodeConnection createConnection(String hostname, int port); @@ -91,6 +81,11 @@ interface Client { } interface NodeConnection { + void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, byte[] compressedSlime, + RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds); + + void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + ResponseReceiver responseReceiver, double timeoutSeconds); /** Closes this connection */ void close(); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 2aa01b05955..7e48733106a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -29,31 +29,6 @@ class RpcClient implements Client { return new RpcNodeConnection(hostname, port, supervisor); } - @Override - public void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - Request request = new Request("proton.getDocsums"); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedSlime)); - - request.setContext(hits); - RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver)); - } - - @Override - public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - ResponseReceiver responseReceiver, double timeoutSeconds) { - Request request = new Request(rpcMethod); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedPayload)); - - RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(rpcNode, responseReceiver)); - } - private static class RpcNodeConnection implements NodeConnection { // Information about the connected node @@ -73,7 +48,30 @@ class RpcClient implements Client { description = "rpc node connection to " + hostname + ":" + port; } - public void invokeAsync(Request req, double timeout, RequestWaiter waiter) { + @Override + public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, + byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { + Request request = new Request("proton.getDocsums"); + request.parameters().add(new Int8Value(compression.getCode())); + request.parameters().add(new Int32Value(uncompressedLength)); + request.parameters().add(new DataValue(compressedSlime)); + + request.setContext(hits); + invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(this, responseReceiver)); + } + + @Override + public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + ResponseReceiver responseReceiver, double timeoutSeconds) { + Request request = new Request(rpcMethod); + request.parameters().add(new Int8Value(compression.getCode())); + request.parameters().add(new Int32Value(uncompressedLength)); + request.parameters().add(new DataValue(compressedPayload)); + + invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(this, responseReceiver)); + } + + private void invokeAsync(Request req, double timeout, RequestWaiter waiter) { // TODO: Consider replacing this by a watcher on the target synchronized(this) { // ensure we have exactly 1 valid connection across threads if (target == null || ! target.isValid()) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java index 760f7486923..aa72823c809 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java @@ -100,7 +100,7 @@ public class RpcFillInvoker extends FillInvoker { /** Send a getDocsums request to a node. Responses will be added to the given receiver. */ private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, CompressionType compression, Result result, GetDocsumsResponseReceiver responseReceiver) { - Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId); + Client.NodeConnection node = resourcePool.getConnection(nodeId); if (node == null) { String error = "Could not fill hits from unknown node " + nodeId; responseReceiver.receive(Client.ResponseOrError.fromError(error)); @@ -114,9 +114,8 @@ public class RpcFillInvoker extends FillInvoker { byte[] serializedSlime = BinaryFormat .encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(), hits)); double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, serializedSlime); - resourcePool.client().getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(), - responseReceiver, timeoutSeconds); + Compressor.Compression compressionResult = resourcePool.compress(query, serializedSlime); + node.getDocsums(hits, compressionResult.type(), serializedSlime.length, compressionResult.data(), responseReceiver, timeoutSeconds); } static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List<FastHit> hits) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index f3479e2e4a9..c001b51ef11 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -52,12 +52,12 @@ public class RpcPing implements Callable<Pong> { } private void sendPing(LinkedBlockingQueue<ResponseOrError<ProtobufResponse>> queue) { - var connection = resourcePool.nodeConnections().get(node.key()); + var connection = resourcePool.getConnection(node.key()); var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray(); double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0; Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping); - resourcePool.client().request(RPC_METHOD, connection, compressionResult.type(), ping.length, compressionResult.data(), - rsp -> queue.add(rsp), timeoutSeconds); + connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), rsp -> queue.add(rsp), + timeoutSeconds); } private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java index 3ec821beba8..cd4ba191a7d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java @@ -66,9 +66,6 @@ public class RpcProtobufFillInvoker extends FillInvoker { protected void sendFillRequest(Result result, String summaryClass) { ListMap<Integer, FastHit> hitsByNode = hitsByNode(result); - CompressionType compression = CompressionType - .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); - result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf"); outstandingResponses = hitsByNode.size(); @@ -77,7 +74,7 @@ public class RpcProtobufFillInvoker extends FillInvoker { var builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), serverId, summaryClass, summaryNeedsQuery); for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) { var payload = ProtobufSerialization.serializeDocsumRequest(builder, nodeHits.getValue()); - sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, compression, result); + sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, result); } } @@ -117,8 +114,8 @@ public class RpcProtobufFillInvoker extends FillInvoker { } /** Send a docsums request to a node. Responses will be added to the given receiver. */ - private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, CompressionType compression, Result result) { - Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId); + private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, Result result) { + Client.NodeConnection node = resourcePool.getConnection(nodeId); if (node == null) { String error = "Could not fill hits from unknown node " + nodeId; receive(Client.ResponseOrError.fromError(error), hits); @@ -129,9 +126,9 @@ public class RpcProtobufFillInvoker extends FillInvoker { Query query = result.getQuery(); double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); - resourcePool.client().request(RPC_METHOD, node, compressionResult.type(), payload.length, compressionResult.data(), - roe -> receive(roe, hits), timeoutSeconds); + Compressor.Compression compressionResult = resourcePool.compress(query, payload); + node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> receive(roe, hits), + timeoutSeconds); } private void processResponses(Result result, String summaryClass) throws TimeoutException { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index 830ba45ef0f..cccf8dd3693 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -2,12 +2,20 @@ package com.yahoo.search.dispatch.rpc; import com.google.common.collect.ImmutableMap; +import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; +import com.yahoo.compress.Compressor.Compression; import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; import com.yahoo.search.dispatch.FillInvoker; +import com.yahoo.search.dispatch.rpc.Client.NodeConnection; import com.yahoo.vespa.config.search.DispatchConfig; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Random; /** * RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains @@ -19,43 +27,70 @@ public class RpcResourcePool { /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); - private final Compressor compressor = new Compressor(); - private final Client client; + private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 32); + private final Random random = new Random(); /** Connections to the search nodes this talks to, indexed by node id ("partid") */ - private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections; + private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools; - public RpcResourcePool(Client client, Map<Integer, Client.NodeConnection> nodeConnections) { - this.client = client; - this.nodeConnections = ImmutableMap.copyOf(nodeConnections); + public RpcResourcePool(Map<Integer, Client.NodeConnection> nodeConnections) { + var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); + nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection)))); + this.nodeConnectionPools = builder.build(); } public RpcResourcePool(DispatchConfig dispatchConfig) { - this.client = new RpcClient(); + var clients = new ArrayList<RpcClient>(dispatchConfig.numJrtSupervisors()); + for (int i = 0; i < dispatchConfig.numJrtSupervisors(); i++) { + clients.add(new RpcClient()); + } - // Create node rpc connections, indexed by the node distribution key - ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>(); - for (DispatchConfig.Node node : dispatchConfig.node()) { - nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port())); + // Create node rpc connection pools, indexed by the node distribution key + var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); + for (var node : dispatchConfig.node()) { + var connections = new ArrayList<Client.NodeConnection>(clients.size()); + clients.forEach(client -> connections.add(client.createConnection(node.host(), node.port()))); + builder.put(node.key(), new NodeConnectionPool(connections)); } - this.nodeConnections = nodeConnectionsBuilder.build(); + this.nodeConnectionPools = builder.build(); } public Compressor compressor() { return compressor; } - public Client client() { - return client; + public Compression compress(Query query, byte[] payload) { + CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase()); + return compressor.compress(compression, payload); } - public ImmutableMap<Integer, Client.NodeConnection> nodeConnections() { - return nodeConnections; + public NodeConnection getConnection(int nodeId) { + var pool = nodeConnectionPools.get(nodeId); + if (pool == null) { + return null; + } else { + return pool.nextConnection(); + } } public void release() { - for (Client.NodeConnection nodeConnection : nodeConnections.values()) { - nodeConnection.close(); + nodeConnectionPools.values().forEach(NodeConnectionPool::release); + } + + private class NodeConnectionPool { + private final List<Client.NodeConnection> connections; + + NodeConnectionPool(List<Client.NodeConnection> connections) { + this.connections = connections; + } + + Client.NodeConnection nextConnection() { + int slot = random.nextInt(connections.size()); + return connections.get(slot); + } + + void release() { + connections.forEach(Client.NodeConnection::close); } } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java index d70a7d95b63..75e9b06f445 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java @@ -46,10 +46,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe protected void sendSearchRequest(Query query) throws IOException { this.query = query; - CompressionType compression = CompressionType - .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); - - Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key()); + Client.NodeConnection nodeConnection = resourcePool.getConnection(node.key()); if (nodeConnection == null) { responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + node.key())); responseAvailable(); @@ -59,9 +56,8 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId()); double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); - resourcePool.client().request(RPC_METHOD, nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, - timeoutSeconds); + Compressor.Compression compressionResult = resourcePool.compress(query, payload); + nodeConnection.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), this, timeoutSeconds); } @Override |