summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@verizonmedia.com>2019-07-03 10:29:56 -0700
committerJon Bratseth <bratseth@verizonmedia.com>2019-07-03 10:29:56 -0700
commitfd65e3d63e7bbb7bacf05a760811f5d814799767 (patch)
treeb4afdbbc141b33174c326cd34507d3e92a7cb6e0 /vespa-http-client
parenta38afd806ece1174f7dc3cabd9ca0ac22f5a5801 (diff)
Nonfunctional changes only
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java82
1 files changed, 30 insertions, 52 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 274edcf2047..692d90abe50 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -56,41 +56,34 @@ public class OperationProcessor {
private final ThreadGroup ioThreadGroup;
private final String clientId = new BigInteger(130, random).toString(32);
- public OperationProcessor(
- IncompleteResultsThrottler incompleteResultsThrottler,
- FeedClient.ResultCallback resultCallback,
- SessionParams sessionParams,
- ScheduledThreadPoolExecutor timeoutExecutor) {
+ public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler,
+ FeedClient.ResultCallback resultCallback,
+ SessionParams sessionParams,
+ ScheduledThreadPoolExecutor timeoutExecutor) {
this.numDestinations = sessionParams.getClusters().size();
this.resultCallback = resultCallback;
this.incompleteResultsThrottler = incompleteResultsThrottler;
this.timeoutExecutor = timeoutExecutor;
this.ioThreadGroup = new ThreadGroup("operationprocessor");
- if (sessionParams.getClusters().isEmpty()) {
+ if (sessionParams.getClusters().isEmpty())
throw new IllegalArgumentException("Cannot feed to 0 clusters.");
- }
for (Cluster cluster : sessionParams.getClusters()) {
- if (cluster.getEndpoints().isEmpty()) {
+ if (cluster.getEndpoints().isEmpty())
throw new IllegalArgumentException("Cannot feed to empty cluster.");
- }
}
for (int i = 0; i < sessionParams.getClusters().size(); i++) {
Cluster cluster = sessionParams.getClusters().get(i);
-
- clusters.add(new ClusterConnection(
- this,
- sessionParams.getFeedParams(),
- sessionParams.getConnectionParams(),
- sessionParams.getErrorReport(),
- cluster,
- i,
- sessionParams.getClientQueueSize() / sessionParams.getClusters().size(),
- timeoutExecutor));
-
- }
+ clusters.add(new ClusterConnection(this,
+ sessionParams.getFeedParams(),
+ sessionParams.getConnectionParams(),
+ cluster,
+ i,
+ sessionParams.getClientQueueSize() / sessionParams.getClusters().size(),
+ timeoutExecutor));
+ }
operationStats = new OperationStats(sessionParams, clusters, incompleteResultsThrottler);
maxRetries = sessionParams.getConnectionParams().getMaxRetries();
minTimeBetweenRetriesMs = sessionParams.getConnectionParams().getMinTimeBetweenRetriesMs();
@@ -122,21 +115,16 @@ public class OperationProcessor {
}
private boolean retriedThis(EndpointResult endpointResult, DocumentSendInfo documentSendInfo, int clusterId) {
- final Result.Detail detail = endpointResult.getDetail();
- // If success, no retries to do.
- if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) {
- return false;
- }
+ Result.Detail detail = endpointResult.getDetail();
+ if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) return false; // Success: No retries
int retries = documentSendInfo.incRetries(clusterId, detail);
- if (retries > maxRetries) {
- return false;
- }
+ if (retries > maxRetries) return false;
String exceptionMessage = detail.getException() == null ? "" : detail.getException().getMessage();
- if (exceptionMessage == null) {
+ if (exceptionMessage == null)
exceptionMessage = "";
- }
+
// TODO: Return proper error code in structured data in next version of internal API.
// Error codes from messagebus/src/cpp/messagebus/errorcode.h
boolean retryThisOperation =
@@ -151,12 +139,10 @@ public class OperationProcessor {
if (retryThisOperation) {
int waitTime = (int) (minTimeBetweenRetriesMs * (1 + random.nextDouble() / 3));
- log.finest("Retrying due to " + detail.toString() + " attempt " + retries
- + " in " + waitTime + " ms.");
- timeoutExecutor.schedule(
- () -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()),
- waitTime,
- TimeUnit.MILLISECONDS);
+ log.finest("Retrying due to " + detail.toString() + " attempt " + retries + " in " + waitTime + " ms.");
+ timeoutExecutor.schedule(() -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()),
+ waitTime,
+ TimeUnit.MILLISECONDS);
return true;
}
@@ -173,28 +159,20 @@ public class OperationProcessor {
}
DocumentSendInfo documentSendInfo = docSendInfoByOperationId.get(endpointResult.getOperationId());
- if (retriedThis(endpointResult, documentSendInfo, clusterId)) {
- return null;
- }
+ if (retriedThis(endpointResult, documentSendInfo, clusterId)) return null;
- if (!documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) {
- // Duplicate message, we have seen this operation before.
- return null;
- }
+ // Duplicate message
+ if ( ! documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) return null;
// Is this the last operation we are waiting for?
- if (documentSendInfo.detailCount() != numDestinations) {
- return null;
- }
+ if (documentSendInfo.detailCount() != numDestinations) return null;
result = documentSendInfo.createResult();
docSendInfoByOperationId.remove(endpointResult.getOperationId());
String documentId = documentSendInfo.getDocument().getDocumentId();
- /**
- * If we got a pending operation against this document
- * dont't remove it from inflightDocuments and send blocked document operation
- */
+ // If we got a pending operation against this document
+ // dont't remove it from inflightDocuments and send blocked document operation
List<Document> blockedDocuments = blockedDocumentsByDocumentId.get(documentId);
if (blockedDocuments.isEmpty()) {
inflightDocumentIds.remove(documentId);
@@ -210,7 +188,6 @@ public class OperationProcessor {
public void resultReceived(EndpointResult endpointResult, int clusterId) {
Result result = process(endpointResult, clusterId);
-
if (result != null) {
incompleteResultsThrottler.resultReady(result.isSuccess());
resultCallback.onCompletion(result.getDocumentId(), result);
@@ -318,4 +295,5 @@ public class OperationProcessor {
throw new RuntimeException("Did not manage to shut down retry threads. Please report problem.");
}
}
+
}