aboutsummaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2022-06-16 15:21:04 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2022-06-16 15:21:04 +0200
commit5ef7b0995a0334b8040eb64c75fda00e564b2e58 (patch)
tree90f1a3856f7cac32e1528129b7a8695bab49b4c7 /container-search
parentef4e78c4cda02ec8c38cafed13a385eeb30bb650 (diff)
Fail-fast if already timed out before RPC invoke
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java14
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java19
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java10
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java4
4 files changed, 39 insertions, 8 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index 0a7357f4a86..22ed8b6d9fa 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -26,23 +26,30 @@ interface Client {
final Optional<T> response;
final Optional<String> error;
+ final boolean isTimeout;
public static <T> ResponseOrError<T> fromResponse(T response) {
return new ResponseOrError<>(response);
}
public static <T> ResponseOrError<T> fromError(String error) {
- return new ResponseOrError<T>(error);
+ return new ResponseOrError<T>(error, false);
+ }
+
+ public static <T> ResponseOrError<T> fromTimeoutError(String error) {
+ return new ResponseOrError<T>(error, true);
}
ResponseOrError(T response) {
this.response = Optional.of(response);
this.error = Optional.empty();
+ this.isTimeout = false;
}
- ResponseOrError(String error) {
+ ResponseOrError(String error, boolean isTimeout) {
this.response = Optional.empty();
this.error = Optional.of(error);
+ this.isTimeout = isTimeout;
}
/** Returns the response, or empty if there is an error */
@@ -50,6 +57,9 @@ interface Client {
/** Returns the error or empty if there is a response */
public Optional<String> error() { return error; }
+
+ /** @return true if error is a timeout */
+ public boolean timeout() { return isTimeout; }
}
class GetDocsumsResponse {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
index 4e13e265905..3de83e2fe9e 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -25,7 +25,6 @@ import com.yahoo.slime.BinaryFormat;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -81,12 +80,19 @@ public class RpcProtobufFillInvoker extends FillInvoker {
responses = new LinkedBlockingQueue<>(outstandingResponses);
var timeout = TimeoutHelper.calculateTimeout(result.getQuery());
+ if (timeout.timedOut()) {
+ // Need to produce an error response her in case of JVM system clock being adjusted
+ // Timeout mechanism relies on System.currentTimeMillis(), not System.nanoTime() :(
+ hitsByNode.forEach((nodeId, hits) ->
+ receive(Client.ResponseOrError.fromTimeoutError("Timed out waiting for summary data from " + nodeId), hits));
+ return;
+ }
var builder = ProtobufSerialization.createDocsumRequestBuilder(
result.getQuery(), serverId, summaryClass, summaryNeedsQuery, timeout.request());
- for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) {
- var payload = ProtobufSerialization.serializeDocsumRequest(builder, nodeHits.getValue());
- sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, result, timeout.client());
- }
+ hitsByNode.forEach((nodeId, hits) -> {
+ var payload = ProtobufSerialization.serializeDocsumRequest(builder, hits);
+ sendDocsumsRequest(nodeId, hits, payload, result, timeout.client());
+ });
}
@Override
@@ -155,6 +161,9 @@ public class RpcProtobufFillInvoker extends FillInvoker {
throwTimeout();
}
var response = responseAndHits.getFirst();
+ if (response.timeout()) {
+ throwTimeout();
+ }
var hitsContext = responseAndHits.getSecond();
skippedHits += processResponse(result, response, hitsContext, summaryClass);
outstandingResponses--;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index 51127dc5416..f80a40f95c8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -57,6 +57,13 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key());
var timeout = TimeoutHelper.calculateTimeout(query);
+ if (timeout.timedOut()) {
+ // Need to produce an error response her in case of JVM system clock being adjusted
+ // Timeout mechanism relies on System.currentTimeMillis(), not System.nanoTime() :(
+ responses.add(Client.ResponseOrError.fromTimeoutError("Timeout while waiting for " + getName()));
+ responseAvailable();
+ return incomingContext;
+ }
RpcContext context = getContext(incomingContext, timeout.request());
nodeConnection.request(RPC_METHOD,
context.compressedPayload.type(),
@@ -92,6 +99,9 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
if (response == null) {
return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
}
+ if (response.timeout()) {
+ return errorResult(query, ErrorMessage.createTimeout(response.error().get()));
+ }
if (response.error().isPresent()) {
return errorResult(query, ErrorMessage.createBackendCommunicationError(response.error().get()));
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java
index a48ff8cc1a7..f3441f3c761 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/TimeoutHelper.java
@@ -22,5 +22,7 @@ class TimeoutHelper {
return new Timeout(requestTimeout, clientTimeout);
}
- record Timeout(double request, double client) {}
+ record Timeout(double request, double client) {
+ public boolean timedOut() { return request == 0 && client == 0; }
+ }
}