diff options
author | Jon Bratseth <bratseth@verizonmedia.com> | 2019-07-03 10:29:56 -0700 |
---|---|---|
committer | Jon Bratseth <bratseth@verizonmedia.com> | 2019-07-03 10:29:56 -0700 |
commit | fd65e3d63e7bbb7bacf05a760811f5d814799767 (patch) | |
tree | b4afdbbc141b33174c326cd34507d3e92a7cb6e0 /vespa-http-client/src | |
parent | a38afd806ece1174f7dc3cabd9ca0ac22f5a5801 (diff) |
Nonfunctional changes only
Diffstat (limited to 'vespa-http-client/src')
-rw-r--r-- | vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java | 82 |
1 files changed, 30 insertions, 52 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 274edcf2047..692d90abe50 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -56,41 +56,34 @@ public class OperationProcessor { private final ThreadGroup ioThreadGroup; private final String clientId = new BigInteger(130, random).toString(32); - public OperationProcessor( - IncompleteResultsThrottler incompleteResultsThrottler, - FeedClient.ResultCallback resultCallback, - SessionParams sessionParams, - ScheduledThreadPoolExecutor timeoutExecutor) { + public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler, + FeedClient.ResultCallback resultCallback, + SessionParams sessionParams, + ScheduledThreadPoolExecutor timeoutExecutor) { this.numDestinations = sessionParams.getClusters().size(); this.resultCallback = resultCallback; this.incompleteResultsThrottler = incompleteResultsThrottler; this.timeoutExecutor = timeoutExecutor; this.ioThreadGroup = new ThreadGroup("operationprocessor"); - if (sessionParams.getClusters().isEmpty()) { + if (sessionParams.getClusters().isEmpty()) throw new IllegalArgumentException("Cannot feed to 0 clusters."); - } for (Cluster cluster : sessionParams.getClusters()) { - if (cluster.getEndpoints().isEmpty()) { + if (cluster.getEndpoints().isEmpty()) throw new IllegalArgumentException("Cannot feed to empty cluster."); - } } for (int i = 0; i < sessionParams.getClusters().size(); i++) { Cluster cluster = sessionParams.getClusters().get(i); - - clusters.add(new ClusterConnection( - this, - sessionParams.getFeedParams(), - sessionParams.getConnectionParams(), - sessionParams.getErrorReport(), - cluster, - i, - sessionParams.getClientQueueSize() / sessionParams.getClusters().size(), - timeoutExecutor)); - - } + clusters.add(new ClusterConnection(this, + sessionParams.getFeedParams(), + sessionParams.getConnectionParams(), + cluster, + i, + sessionParams.getClientQueueSize() / sessionParams.getClusters().size(), + timeoutExecutor)); + } operationStats = new OperationStats(sessionParams, clusters, incompleteResultsThrottler); maxRetries = sessionParams.getConnectionParams().getMaxRetries(); minTimeBetweenRetriesMs = sessionParams.getConnectionParams().getMinTimeBetweenRetriesMs(); @@ -122,21 +115,16 @@ public class OperationProcessor { } private boolean retriedThis(EndpointResult endpointResult, DocumentSendInfo documentSendInfo, int clusterId) { - final Result.Detail detail = endpointResult.getDetail(); - // If success, no retries to do. - if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) { - return false; - } + Result.Detail detail = endpointResult.getDetail(); + if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) return false; // Success: No retries int retries = documentSendInfo.incRetries(clusterId, detail); - if (retries > maxRetries) { - return false; - } + if (retries > maxRetries) return false; String exceptionMessage = detail.getException() == null ? "" : detail.getException().getMessage(); - if (exceptionMessage == null) { + if (exceptionMessage == null) exceptionMessage = ""; - } + // TODO: Return proper error code in structured data in next version of internal API. // Error codes from messagebus/src/cpp/messagebus/errorcode.h boolean retryThisOperation = @@ -151,12 +139,10 @@ public class OperationProcessor { if (retryThisOperation) { int waitTime = (int) (minTimeBetweenRetriesMs * (1 + random.nextDouble() / 3)); - log.finest("Retrying due to " + detail.toString() + " attempt " + retries - + " in " + waitTime + " ms."); - timeoutExecutor.schedule( - () -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()), - waitTime, - TimeUnit.MILLISECONDS); + log.finest("Retrying due to " + detail.toString() + " attempt " + retries + " in " + waitTime + " ms."); + timeoutExecutor.schedule(() -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()), + waitTime, + TimeUnit.MILLISECONDS); return true; } @@ -173,28 +159,20 @@ public class OperationProcessor { } DocumentSendInfo documentSendInfo = docSendInfoByOperationId.get(endpointResult.getOperationId()); - if (retriedThis(endpointResult, documentSendInfo, clusterId)) { - return null; - } + if (retriedThis(endpointResult, documentSendInfo, clusterId)) return null; - if (!documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) { - // Duplicate message, we have seen this operation before. - return null; - } + // Duplicate message + if ( ! documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) return null; // Is this the last operation we are waiting for? - if (documentSendInfo.detailCount() != numDestinations) { - return null; - } + if (documentSendInfo.detailCount() != numDestinations) return null; result = documentSendInfo.createResult(); docSendInfoByOperationId.remove(endpointResult.getOperationId()); String documentId = documentSendInfo.getDocument().getDocumentId(); - /** - * If we got a pending operation against this document - * dont't remove it from inflightDocuments and send blocked document operation - */ + // If we got a pending operation against this document + // dont't remove it from inflightDocuments and send blocked document operation List<Document> blockedDocuments = blockedDocumentsByDocumentId.get(documentId); if (blockedDocuments.isEmpty()) { inflightDocumentIds.remove(documentId); @@ -210,7 +188,6 @@ public class OperationProcessor { public void resultReceived(EndpointResult endpointResult, int clusterId) { Result result = process(endpointResult, clusterId); - if (result != null) { incompleteResultsThrottler.resultReady(result.isSuccess()); resultCallback.onCompletion(result.getDocumentId(), result); @@ -318,4 +295,5 @@ public class OperationProcessor { throw new RuntimeException("Did not manage to shut down retry threads. Please report problem."); } } + } |