summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-06-16 16:12:30 +0200
committerGitHub <noreply@github.com>2022-06-16 16:12:30 +0200
commit9ec71e5daf80a664b781d0b3680a322e99e6b7a6 (patch)
tree816fdeea5ac26d02d3b81583449f04aa673b0a5e /container-search
parent29a9e4180ee985aee7e8daf12a1ca5fc807079c4 (diff)
parentc5e68389699abc411a80db3a59974c0e5dfa493a (diff)
Merge pull request #23132 from vespa-engine/bjorncs/dispatch-timeout
Improve timeout logic for docsum/search invokers
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java14
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java14
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java29
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java20
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java28
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/ProtobufSerializationTest.java6
6 files changed, 85 insertions, 26 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index 0a7357f4a86..22ed8b6d9fa 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -26,23 +26,30 @@ interface Client {
final Optional<T> response;
final Optional<String> error;
+ final boolean isTimeout;
public static <T> ResponseOrError<T> fromResponse(T response) {
return new ResponseOrError<>(response);
}
public static <T> ResponseOrError<T> fromError(String error) {
- return new ResponseOrError<T>(error);
+ return new ResponseOrError<T>(error, false);
+ }
+
+ public static <T> ResponseOrError<T> fromTimeoutError(String error) {
+ return new ResponseOrError<T>(error, true);
}
ResponseOrError(T response) {
this.response = Optional.of(response);
this.error = Optional.empty();
+ this.isTimeout = false;
}
- ResponseOrError(String error) {
+ ResponseOrError(String error, boolean isTimeout) {
this.response = Optional.empty();
this.error = Optional.of(error);
+ this.isTimeout = isTimeout;
}
/** Returns the response, or empty if there is an error */
@@ -50,6 +57,9 @@ interface Client {
/** Returns the error or empty if there is a response */
public Optional<String> error() { return error; }
+
+ /** @return true if error is a timeout */
+ public boolean timeout() { return isTimeout; }
}
class GetDocsumsResponse {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
index 8b2457606ab..154defe7eb2 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
@@ -33,15 +33,14 @@ import com.yahoo.vespa.objects.BufferSerializer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import java.util.function.Consumer;
public class ProtobufSerialization {
private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024;
- static byte[] serializeSearchRequest(Query query, int hits, String serverId) {
- return convertFromQuery(query, hits, serverId).toByteArray();
+ static byte[] serializeSearchRequest(Query query, int hits, String serverId, double requestTimeout) {
+ return convertFromQuery(query, hits, serverId, requestTimeout).toByteArray();
}
private static void convertSearchReplyErrors(Result target, List<SearchProtocol.Error> errors) {
@@ -50,9 +49,9 @@ public class ProtobufSerialization {
}
}
- static SearchProtocol.SearchRequest convertFromQuery(Query query, int hits, String serverId) {
+ static SearchProtocol.SearchRequest convertFromQuery(Query query, int hits, String serverId, double requestTimeout) {
var builder = SearchProtocol.SearchRequest.newBuilder().setHits(hits).setOffset(query.getOffset())
- .setTimeout((int) query.getTimeLeft());
+ .setTimeout((int) (requestTimeout * 1000));
var documentDb = query.getModel().getDocumentDb();
if (documentDb != null) {
@@ -130,9 +129,10 @@ public class ProtobufSerialization {
static SearchProtocol.DocsumRequest.Builder createDocsumRequestBuilder(Query query,
String serverId,
String summaryClass,
- boolean includeQueryData) {
+ boolean includeQueryData,
+ double requestTimeout) {
var builder = SearchProtocol.DocsumRequest.newBuilder()
- .setTimeout((int) query.getTimeLeft())
+ .setTimeout((int) (requestTimeout * 1000))
.setDumpFeatures(query.properties().getBoolean(Ranking.RANKFEATURES, false));
if (summaryClass != null) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
index 92726ef1415..9e426cfe164 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -25,7 +25,6 @@ import com.yahoo.slime.BinaryFormat;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -80,11 +79,20 @@ public class RpcProtobufFillInvoker extends FillInvoker {
outstandingResponses = hitsByNode.size();
responses = new LinkedBlockingQueue<>(outstandingResponses);
- var builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), serverId, summaryClass, summaryNeedsQuery);
- for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) {
- var payload = ProtobufSerialization.serializeDocsumRequest(builder, nodeHits.getValue());
- sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, result);
+ var timeout = TimeoutHelper.calculateTimeout(result.getQuery());
+ if (timeout.timedOut()) {
+ // Need to produce an error response her in case of JVM system clock being adjusted
+ // Timeout mechanism relies on System.currentTimeMillis(), not System.nanoTime() :(
+ hitsByNode.forEach((nodeId, hits) ->
+ receive(Client.ResponseOrError.fromTimeoutError("Timed out prior to sending docsum request to " + nodeId), hits));
+ return;
}
+ var builder = ProtobufSerialization.createDocsumRequestBuilder(
+ result.getQuery(), serverId, summaryClass, summaryNeedsQuery, timeout.request());
+ hitsByNode.forEach((nodeId, hits) -> {
+ var payload = ProtobufSerialization.serializeDocsumRequest(builder, hits);
+ sendDocsumsRequest(nodeId, hits, payload, result, timeout.client());
+ });
}
@Override
@@ -123,7 +131,8 @@ public class RpcProtobufFillInvoker extends FillInvoker {
}
/** Send a docsums request to a node. Responses will be added to the given receiver. */
- private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, Result result) {
+ private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, Result result,
+ double clientTimeout) {
Client.NodeConnection node = resourcePool.getConnection(nodeId);
if (node == null) {
String error = "Could not fill hits from unknown node " + nodeId;
@@ -134,10 +143,9 @@ public class RpcProtobufFillInvoker extends FillInvoker {
}
Query query = result.getQuery();
- double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
Compressor.Compression compressionResult = resourcePool.compress(query, payload);
- node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> receive(roe, hits),
- timeoutSeconds);
+ node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(),
+ roe -> receive(roe, hits), clientTimeout);
}
private void processResponses(Result result, String summaryClass) throws TimeoutException {
@@ -153,6 +161,9 @@ public class RpcProtobufFillInvoker extends FillInvoker {
throwTimeout();
}
var response = responseAndHits.getFirst();
+ if (response.timeout()) {
+ throwTimeout();
+ }
var hitsContext = responseAndHits.getSecond();
skippedHits += processResponse(result, response, hitsContext, summaryClass);
outstandingResponses--;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index 973b9093d9d..64e0dd666dd 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -56,25 +56,32 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
}
query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key());
- RpcContext context = getContext(incomingContext);
- double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
+ var timeout = TimeoutHelper.calculateTimeout(query);
+ if (timeout.timedOut()) {
+ // Need to produce an error response her in case of JVM system clock being adjusted
+ // Timeout mechanism relies on System.currentTimeMillis(), not System.nanoTime() :(
+ responses.add(Client.ResponseOrError.fromTimeoutError("Timeout before sending request to " + getName()));
+ responseAvailable();
+ return incomingContext;
+ }
+ RpcContext context = getContext(incomingContext, timeout.request());
nodeConnection.request(RPC_METHOD,
context.compressedPayload.type(),
context.compressedPayload.uncompressedSize(),
context.compressedPayload.data(),
this,
- timeoutSeconds);
+ timeout.client());
return context;
}
- private RpcContext getContext(Object incomingContext) {
+ private RpcContext getContext(Object incomingContext, double requestTimeout) {
if (incomingContext instanceof RpcContext)
return (RpcContext)incomingContext;
return new RpcContext(resourcePool, query,
ProtobufSerialization.serializeSearchRequest(query,
Math.min(query.getHits(), maxHits),
- searcher.getServerId()));
+ searcher.getServerId(), requestTimeout));
}
@Override
@@ -92,6 +99,9 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
if (response == null) {
return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
}
+ if (response.timeout()) {
+ return errorResult(query, ErrorMessage.createTimeout(response.error().get()));
+ }
if (response.error().isPresent()) {
return errorResult(query, ErrorMessage.createBackendCommunicationError(response.error().get()));
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java
new file mode 100644
index 00000000000..f3441f3c761
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java
@@ -0,0 +1,28 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.search.Query;
+
+/**
+ * @author bjorncs
+ */
+class TimeoutHelper {
+
+ private TimeoutHelper() {}
+
+ static Timeout calculateTimeout(Query q) {
+ long timeLeftMillis = q.getTimeLeft();
+ if (timeLeftMillis <= 2) return new Timeout(0d, 0d);
+ // The old timeout logic subtracted 3ms to timeout for client timeout.
+ // 3ms equalled to 0.6% of the default query timeout (500ms).
+ // This accounted for cost of network and container post-processing.
+ // New logic subtracts 1% for client and 2% for content node (request).
+ double clientTimeout = Math.max(timeLeftMillis * 0.99d, 2d) / 1000d;
+ double requestTimeout = Math.max(timeLeftMillis * 0.98d, 1d) / 1000d;
+ return new Timeout(requestTimeout, clientTimeout);
+ }
+
+ record Timeout(double request, double client) {
+ public boolean timedOut() { return request == 0 && client == 0; }
+ }
+}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/ProtobufSerializationTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/ProtobufSerializationTest.java
index 7a7ae173766..9953d1467e1 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/ProtobufSerializationTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/ProtobufSerializationTest.java
@@ -35,7 +35,7 @@ public class ProtobufSerializationTest {
.setRequest("?query=test&ranking.features.query(tensor_1)=[1.200]")
.build();
- SearchProtocol.SearchRequest request1 = ProtobufSerialization.convertFromQuery(query, 9,"serverId");
+ SearchProtocol.SearchRequest request1 = ProtobufSerialization.convertFromQuery(query, 9,"serverId", 0.5);
assertEquals(9, request1.getHits());
assertEquals(0, request1.getRankPropertiesCount());
assertEquals(0, request1.getTensorRankPropertiesCount());
@@ -47,7 +47,7 @@ public class ProtobufSerializationTest {
contentsOf(request1.getTensorFeatureOverrides(1).getValue()));
query.prepare(); // calling prepare() moves "overrides" to "features" - content stays the same
- SearchProtocol.SearchRequest request2 = ProtobufSerialization.convertFromQuery(query, 9,"serverId");
+ SearchProtocol.SearchRequest request2 = ProtobufSerialization.convertFromQuery(query, 9,"serverId", 0.5);
assertEquals(9, request2.getHits());
assertEquals(0, request2.getRankPropertiesCount());
assertEquals(2, request2.getTensorRankPropertiesCount());
@@ -62,7 +62,7 @@ public class ProtobufSerializationTest {
@Test
public void testDocsumSerialization() {
Query q = new Query("search/?query=test&hits=10&offset=3");
- var builder = ProtobufSerialization.createDocsumRequestBuilder(q, "server", "summary", true);
+ var builder = ProtobufSerialization.createDocsumRequestBuilder(q, "server", "summary", true, 0.5);
builder.setTimeout(0);
var hit = new FastHit();
hit.setGlobalId(new GlobalId(IdString.createIdString("id:ns:type::id")).getRawId());