summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2019-04-05 14:23:22 +0200
committerOlli Virtanen <olli.virtanen@oath.com>2019-04-05 14:23:22 +0200
commit9faa5a3bc2d198e619732936459e11fb14634800 (patch)
treed83fd3a9484f2b9886465abf19ab687175e71280 /container-search/src/main/java/com/yahoo/search
parent747114c0f7835bf9a3e47a19e924856227efde8d (diff)
Optimizations for search protocol over jrt/protobuf
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java50
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java6
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java71
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java10
7 files changed, 97 insertions, 77 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index cc37df04a62..e54e2187818 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -2,8 +2,6 @@
package com.yahoo.search.dispatch.rpc;
import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
-import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.FastHit;
import java.util.List;
@@ -15,14 +13,6 @@ import java.util.Optional;
* @author bratseth
*/
interface Client {
-
- void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression,
- int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver,
- double timeoutSeconds);
-
- void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength,
- byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds);
-
/** Creates a connection to a particular node in this */
NodeConnection createConnection(String hostname, int port);
@@ -91,6 +81,11 @@ interface Client {
}
interface NodeConnection {
+ void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, byte[] compressedSlime,
+ RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds);
+
+ void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ ResponseReceiver responseReceiver, double timeoutSeconds);
/** Closes this connection */
void close();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 2aa01b05955..7e48733106a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -29,31 +29,6 @@ class RpcClient implements Client {
return new RpcNodeConnection(hostname, port, supervisor);
}
- @Override
- public void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, int uncompressedLength,
- byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
- Request request = new Request("proton.getDocsums");
- request.parameters().add(new Int8Value(compression.getCode()));
- request.parameters().add(new Int32Value(uncompressedLength));
- request.parameters().add(new DataValue(compressedSlime));
-
- request.setContext(hits);
- RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
- rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver));
- }
-
- @Override
- public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
- ResponseReceiver responseReceiver, double timeoutSeconds) {
- Request request = new Request(rpcMethod);
- request.parameters().add(new Int8Value(compression.getCode()));
- request.parameters().add(new Int32Value(uncompressedLength));
- request.parameters().add(new DataValue(compressedPayload));
-
- RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
- rpcNode.invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(rpcNode, responseReceiver));
- }
-
private static class RpcNodeConnection implements NodeConnection {
// Information about the connected node
@@ -73,7 +48,30 @@ class RpcClient implements Client {
description = "rpc node connection to " + hostname + ":" + port;
}
- public void invokeAsync(Request req, double timeout, RequestWaiter waiter) {
+ @Override
+ public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength,
+ byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
+ Request request = new Request("proton.getDocsums");
+ request.parameters().add(new Int8Value(compression.getCode()));
+ request.parameters().add(new Int32Value(uncompressedLength));
+ request.parameters().add(new DataValue(compressedSlime));
+
+ request.setContext(hits);
+ invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(this, responseReceiver));
+ }
+
+ @Override
+ public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ ResponseReceiver responseReceiver, double timeoutSeconds) {
+ Request request = new Request(rpcMethod);
+ request.parameters().add(new Int8Value(compression.getCode()));
+ request.parameters().add(new Int32Value(uncompressedLength));
+ request.parameters().add(new DataValue(compressedPayload));
+
+ invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(this, responseReceiver));
+ }
+
+ private void invokeAsync(Request req, double timeout, RequestWaiter waiter) {
// TODO: Consider replacing this by a watcher on the target
synchronized(this) { // ensure we have exactly 1 valid connection across threads
if (target == null || ! target.isValid())
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
index 760f7486923..aa72823c809 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
@@ -100,7 +100,7 @@ public class RpcFillInvoker extends FillInvoker {
/** Send a getDocsums request to a node. Responses will be added to the given receiver. */
private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, CompressionType compression,
Result result, GetDocsumsResponseReceiver responseReceiver) {
- Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId);
+ Client.NodeConnection node = resourcePool.getConnection(nodeId);
if (node == null) {
String error = "Could not fill hits from unknown node " + nodeId;
responseReceiver.receive(Client.ResponseOrError.fromError(error));
@@ -114,9 +114,8 @@ public class RpcFillInvoker extends FillInvoker {
byte[] serializedSlime = BinaryFormat
.encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(), hits));
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
- Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, serializedSlime);
- resourcePool.client().getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(),
- responseReceiver, timeoutSeconds);
+ Compressor.Compression compressionResult = resourcePool.compress(query, serializedSlime);
+ node.getDocsums(hits, compressionResult.type(), serializedSlime.length, compressionResult.data(), responseReceiver, timeoutSeconds);
}
static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List<FastHit> hits) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
index f3479e2e4a9..c001b51ef11 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -52,12 +52,12 @@ public class RpcPing implements Callable<Pong> {
}
private void sendPing(LinkedBlockingQueue<ResponseOrError<ProtobufResponse>> queue) {
- var connection = resourcePool.nodeConnections().get(node.key());
+ var connection = resourcePool.getConnection(node.key());
var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray();
double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0;
Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping);
- resourcePool.client().request(RPC_METHOD, connection, compressionResult.type(), ping.length, compressionResult.data(),
- rsp -> queue.add(rsp), timeoutSeconds);
+ connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), rsp -> queue.add(rsp),
+ timeoutSeconds);
}
private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
index 3ec821beba8..cd4ba191a7d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -66,9 +66,6 @@ public class RpcProtobufFillInvoker extends FillInvoker {
protected void sendFillRequest(Result result, String summaryClass) {
ListMap<Integer, FastHit> hitsByNode = hitsByNode(result);
- CompressionType compression = CompressionType
- .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase());
-
result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf");
outstandingResponses = hitsByNode.size();
@@ -77,7 +74,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
var builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), serverId, summaryClass, summaryNeedsQuery);
for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) {
var payload = ProtobufSerialization.serializeDocsumRequest(builder, nodeHits.getValue());
- sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, compression, result);
+ sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, result);
}
}
@@ -117,8 +114,8 @@ public class RpcProtobufFillInvoker extends FillInvoker {
}
/** Send a docsums request to a node. Responses will be added to the given receiver. */
- private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, CompressionType compression, Result result) {
- Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId);
+ private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, Result result) {
+ Client.NodeConnection node = resourcePool.getConnection(nodeId);
if (node == null) {
String error = "Could not fill hits from unknown node " + nodeId;
receive(Client.ResponseOrError.fromError(error), hits);
@@ -129,9 +126,9 @@ public class RpcProtobufFillInvoker extends FillInvoker {
Query query = result.getQuery();
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
- Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload);
- resourcePool.client().request(RPC_METHOD, node, compressionResult.type(), payload.length, compressionResult.data(),
- roe -> receive(roe, hits), timeoutSeconds);
+ Compressor.Compression compressionResult = resourcePool.compress(query, payload);
+ node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> receive(roe, hits),
+ timeoutSeconds);
}
private void processResponses(Result result, String summaryClass) throws TimeoutException {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 830ba45ef0f..cccf8dd3693 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -2,12 +2,20 @@
package com.yahoo.search.dispatch.rpc;
import com.google.common.collect.ImmutableMap;
+import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
+import com.yahoo.compress.Compressor.Compression;
import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
import com.yahoo.search.dispatch.FillInvoker;
+import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
import com.yahoo.vespa.config.search.DispatchConfig;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
/**
* RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains
@@ -19,43 +27,70 @@ public class RpcResourcePool {
/** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
- private final Compressor compressor = new Compressor();
- private final Client client;
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 32);
+ private final Random random = new Random();
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
- private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections;
+ private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
- public RpcResourcePool(Client client, Map<Integer, Client.NodeConnection> nodeConnections) {
- this.client = client;
- this.nodeConnections = ImmutableMap.copyOf(nodeConnections);
+ public RpcResourcePool(Map<Integer, Client.NodeConnection> nodeConnections) {
+ var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
+ nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection))));
+ this.nodeConnectionPools = builder.build();
}
public RpcResourcePool(DispatchConfig dispatchConfig) {
- this.client = new RpcClient();
+ var clients = new ArrayList<RpcClient>(dispatchConfig.numJrtSupervisors());
+ for (int i = 0; i < dispatchConfig.numJrtSupervisors(); i++) {
+ clients.add(new RpcClient());
+ }
- // Create node rpc connections, indexed by the node distribution key
- ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>();
- for (DispatchConfig.Node node : dispatchConfig.node()) {
- nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port()));
+ // Create node rpc connection pools, indexed by the node distribution key
+ var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
+ for (var node : dispatchConfig.node()) {
+ var connections = new ArrayList<Client.NodeConnection>(clients.size());
+ clients.forEach(client -> connections.add(client.createConnection(node.host(), node.port())));
+ builder.put(node.key(), new NodeConnectionPool(connections));
}
- this.nodeConnections = nodeConnectionsBuilder.build();
+ this.nodeConnectionPools = builder.build();
}
public Compressor compressor() {
return compressor;
}
- public Client client() {
- return client;
+ public Compression compress(Query query, byte[] payload) {
+ CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase());
+ return compressor.compress(compression, payload);
}
- public ImmutableMap<Integer, Client.NodeConnection> nodeConnections() {
- return nodeConnections;
+ public NodeConnection getConnection(int nodeId) {
+ var pool = nodeConnectionPools.get(nodeId);
+ if (pool == null) {
+ return null;
+ } else {
+ return pool.nextConnection();
+ }
}
public void release() {
- for (Client.NodeConnection nodeConnection : nodeConnections.values()) {
- nodeConnection.close();
+ nodeConnectionPools.values().forEach(NodeConnectionPool::release);
+ }
+
+ private class NodeConnectionPool {
+ private final List<Client.NodeConnection> connections;
+
+ NodeConnectionPool(List<Client.NodeConnection> connections) {
+ this.connections = connections;
+ }
+
+ Client.NodeConnection nextConnection() {
+ int slot = random.nextInt(connections.size());
+ return connections.get(slot);
+ }
+
+ void release() {
+ connections.forEach(Client.NodeConnection::close);
}
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index d70a7d95b63..75e9b06f445 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -46,10 +46,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
protected void sendSearchRequest(Query query) throws IOException {
this.query = query;
- CompressionType compression = CompressionType
- .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase());
-
- Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key());
+ Client.NodeConnection nodeConnection = resourcePool.getConnection(node.key());
if (nodeConnection == null) {
responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + node.key()));
responseAvailable();
@@ -59,9 +56,8 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId());
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
- Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload);
- resourcePool.client().request(RPC_METHOD, nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this,
- timeoutSeconds);
+ Compressor.Compression compressionResult = resourcePool.compress(query, payload);
+ nodeConnection.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), this, timeoutSeconds);
}
@Override