diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-06-16 16:12:30 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-16 16:12:30 +0200 |
commit | 9ec71e5daf80a664b781d0b3680a322e99e6b7a6 (patch) | |
tree | 816fdeea5ac26d02d3b81583449f04aa673b0a5e /container-search | |
parent | 29a9e4180ee985aee7e8daf12a1ca5fc807079c4 (diff) | |
parent | c5e68389699abc411a80db3a59974c0e5dfa493a (diff) |
Merge pull request #23132 from vespa-engine/bjorncs/dispatch-timeout
Improve timeout logic for docsum/search invokers
Diffstat (limited to 'container-search')
6 files changed, 85 insertions, 26 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index 0a7357f4a86..22ed8b6d9fa 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -26,23 +26,30 @@ interface Client { final Optional<T> response; final Optional<String> error; + final boolean isTimeout; public static <T> ResponseOrError<T> fromResponse(T response) { return new ResponseOrError<>(response); } public static <T> ResponseOrError<T> fromError(String error) { - return new ResponseOrError<T>(error); + return new ResponseOrError<T>(error, false); + } + + public static <T> ResponseOrError<T> fromTimeoutError(String error) { + return new ResponseOrError<T>(error, true); } ResponseOrError(T response) { this.response = Optional.of(response); this.error = Optional.empty(); + this.isTimeout = false; } - ResponseOrError(String error) { + ResponseOrError(String error, boolean isTimeout) { this.response = Optional.empty(); this.error = Optional.of(error); + this.isTimeout = isTimeout; } /** Returns the response, or empty if there is an error */ @@ -50,6 +57,9 @@ interface Client { /** Returns the error or empty if there is a response */ public Optional<String> error() { return error; } + + /** @return true if error is a timeout */ + public boolean timeout() { return isTimeout; } } class GetDocsumsResponse { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java index 8b2457606ab..154defe7eb2 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java @@ -33,15 +33,14 @@ import com.yahoo.vespa.objects.BufferSerializer; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.function.Consumer; public class ProtobufSerialization { private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024; - static byte[] serializeSearchRequest(Query query, int hits, String serverId) { - return convertFromQuery(query, hits, serverId).toByteArray(); + static byte[] serializeSearchRequest(Query query, int hits, String serverId, double requestTimeout) { + return convertFromQuery(query, hits, serverId, requestTimeout).toByteArray(); } private static void convertSearchReplyErrors(Result target, List<SearchProtocol.Error> errors) { @@ -50,9 +49,9 @@ public class ProtobufSerialization { } } - static SearchProtocol.SearchRequest convertFromQuery(Query query, int hits, String serverId) { + static SearchProtocol.SearchRequest convertFromQuery(Query query, int hits, String serverId, double requestTimeout) { var builder = SearchProtocol.SearchRequest.newBuilder().setHits(hits).setOffset(query.getOffset()) - .setTimeout((int) query.getTimeLeft()); + .setTimeout((int) (requestTimeout * 1000)); var documentDb = query.getModel().getDocumentDb(); if (documentDb != null) { @@ -130,9 +129,10 @@ public class ProtobufSerialization { static SearchProtocol.DocsumRequest.Builder createDocsumRequestBuilder(Query query, String serverId, String summaryClass, - boolean includeQueryData) { + boolean includeQueryData, + double requestTimeout) { var builder = SearchProtocol.DocsumRequest.newBuilder() - .setTimeout((int) query.getTimeLeft()) + .setTimeout((int) (requestTimeout * 1000)) .setDumpFeatures(query.properties().getBoolean(Ranking.RANKFEATURES, false)); if (summaryClass != null) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java index 92726ef1415..9e426cfe164 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java @@ -25,7 +25,6 @@ import com.yahoo.slime.BinaryFormat; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -80,11 +79,20 @@ public class RpcProtobufFillInvoker extends FillInvoker { outstandingResponses = hitsByNode.size(); responses = new LinkedBlockingQueue<>(outstandingResponses); - var builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), serverId, summaryClass, summaryNeedsQuery); - for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) { - var payload = ProtobufSerialization.serializeDocsumRequest(builder, nodeHits.getValue()); - sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, result); + var timeout = TimeoutHelper.calculateTimeout(result.getQuery()); + if (timeout.timedOut()) { + // Need to produce an error response her in case of JVM system clock being adjusted + // Timeout mechanism relies on System.currentTimeMillis(), not System.nanoTime() :( + hitsByNode.forEach((nodeId, hits) -> + receive(Client.ResponseOrError.fromTimeoutError("Timed out prior to sending docsum request to " + nodeId), hits)); + return; } + var builder = ProtobufSerialization.createDocsumRequestBuilder( + result.getQuery(), serverId, summaryClass, summaryNeedsQuery, timeout.request()); + hitsByNode.forEach((nodeId, hits) -> { + var payload = ProtobufSerialization.serializeDocsumRequest(builder, hits); + sendDocsumsRequest(nodeId, hits, payload, result, timeout.client()); + }); } @Override @@ -123,7 +131,8 @@ public class RpcProtobufFillInvoker extends FillInvoker { } /** Send a docsums request to a node. Responses will be added to the given receiver. */ - private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, Result result) { + private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, Result result, + double clientTimeout) { Client.NodeConnection node = resourcePool.getConnection(nodeId); if (node == null) { String error = "Could not fill hits from unknown node " + nodeId; @@ -134,10 +143,9 @@ public class RpcProtobufFillInvoker extends FillInvoker { } Query query = result.getQuery(); - double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; Compressor.Compression compressionResult = resourcePool.compress(query, payload); - node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> receive(roe, hits), - timeoutSeconds); + node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), + roe -> receive(roe, hits), clientTimeout); } private void processResponses(Result result, String summaryClass) throws TimeoutException { @@ -153,6 +161,9 @@ public class RpcProtobufFillInvoker extends FillInvoker { throwTimeout(); } var response = responseAndHits.getFirst(); + if (response.timeout()) { + throwTimeout(); + } var hitsContext = responseAndHits.getSecond(); skippedHits += processResponse(result, response, hitsContext, summaryClass); outstandingResponses--; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java index 973b9093d9d..64e0dd666dd 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java @@ -56,25 +56,32 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe } query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key()); - RpcContext context = getContext(incomingContext); - double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; + var timeout = TimeoutHelper.calculateTimeout(query); + if (timeout.timedOut()) { + // Need to produce an error response her in case of JVM system clock being adjusted + // Timeout mechanism relies on System.currentTimeMillis(), not System.nanoTime() :( + responses.add(Client.ResponseOrError.fromTimeoutError("Timeout before sending request to " + getName())); + responseAvailable(); + return incomingContext; + } + RpcContext context = getContext(incomingContext, timeout.request()); nodeConnection.request(RPC_METHOD, context.compressedPayload.type(), context.compressedPayload.uncompressedSize(), context.compressedPayload.data(), this, - timeoutSeconds); + timeout.client()); return context; } - private RpcContext getContext(Object incomingContext) { + private RpcContext getContext(Object incomingContext, double requestTimeout) { if (incomingContext instanceof RpcContext) return (RpcContext)incomingContext; return new RpcContext(resourcePool, query, ProtobufSerialization.serializeSearchRequest(query, Math.min(query.getHits(), maxHits), - searcher.getServerId())); + searcher.getServerId(), requestTimeout)); } @Override @@ -92,6 +99,9 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe if (response == null) { return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); } + if (response.timeout()) { + return errorResult(query, ErrorMessage.createTimeout(response.error().get())); + } if (response.error().isPresent()) { return errorResult(query, ErrorMessage.createBackendCommunicationError(response.error().get())); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java new file mode 100644 index 00000000000..f3441f3c761 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java @@ -0,0 +1,28 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.rpc; + +import com.yahoo.search.Query; + +/** + * @author bjorncs + */ +class TimeoutHelper { + + private TimeoutHelper() {} + + static Timeout calculateTimeout(Query q) { + long timeLeftMillis = q.getTimeLeft(); + if (timeLeftMillis <= 2) return new Timeout(0d, 0d); + // The old timeout logic subtracted 3ms to timeout for client timeout. + // 3ms equalled to 0.6% of the default query timeout (500ms). + // This accounted for cost of network and container post-processing. + // New logic subtracts 1% for client and 2% for content node (request). + double clientTimeout = Math.max(timeLeftMillis * 0.99d, 2d) / 1000d; + double requestTimeout = Math.max(timeLeftMillis * 0.98d, 1d) / 1000d; + return new Timeout(requestTimeout, clientTimeout); + } + + record Timeout(double request, double client) { + public boolean timedOut() { return request == 0 && client == 0; } + } +} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/ProtobufSerializationTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/ProtobufSerializationTest.java index 7a7ae173766..9953d1467e1 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/ProtobufSerializationTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/ProtobufSerializationTest.java @@ -35,7 +35,7 @@ public class ProtobufSerializationTest { .setRequest("?query=test&ranking.features.query(tensor_1)=[1.200]") .build(); - SearchProtocol.SearchRequest request1 = ProtobufSerialization.convertFromQuery(query, 9,"serverId"); + SearchProtocol.SearchRequest request1 = ProtobufSerialization.convertFromQuery(query, 9,"serverId", 0.5); assertEquals(9, request1.getHits()); assertEquals(0, request1.getRankPropertiesCount()); assertEquals(0, request1.getTensorRankPropertiesCount()); @@ -47,7 +47,7 @@ public class ProtobufSerializationTest { contentsOf(request1.getTensorFeatureOverrides(1).getValue())); query.prepare(); // calling prepare() moves "overrides" to "features" - content stays the same - SearchProtocol.SearchRequest request2 = ProtobufSerialization.convertFromQuery(query, 9,"serverId"); + SearchProtocol.SearchRequest request2 = ProtobufSerialization.convertFromQuery(query, 9,"serverId", 0.5); assertEquals(9, request2.getHits()); assertEquals(0, request2.getRankPropertiesCount()); assertEquals(2, request2.getTensorRankPropertiesCount()); @@ -62,7 +62,7 @@ public class ProtobufSerializationTest { @Test public void testDocsumSerialization() { Query q = new Query("search/?query=test&hits=10&offset=3"); - var builder = ProtobufSerialization.createDocsumRequestBuilder(q, "server", "summary", true); + var builder = ProtobufSerialization.createDocsumRequestBuilder(q, "server", "summary", true, 0.5); builder.setTimeout(0); var hit = new FastHit(); hit.setGlobalId(new GlobalId(IdString.createIdString("id:ns:type::id")).getRawId()); |