diff options
author | Jon Bratseth <bratseth@oath.com> | 2019-07-05 03:31:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-07-05 03:31:10 -0700 |
commit | 10e4c09fa1db4ad4c10b12d216bba6cc33593dc7 (patch) | |
tree | f0157414c89ad1a792acded843322af837113ed7 | |
parent | f1f227311ee6748bff1dd83a4aef8a3e10001cd7 (diff) | |
parent | 80fed4ea6e58d2778463743b7bcd88607412d31e (diff) |
Merge pull request #9954 from vespa-engine/bratseth/dont-wait
Bratseth/dont wait
7 files changed, 146 insertions, 156 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java index 008f3b63a89..fff0aa910d5 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java @@ -13,13 +13,9 @@ import java.util.concurrent.TimeUnit; */ public final class FeedParams { - public boolean getDenyIfBusyV3() { - return denyIfBusyV3; - } + public boolean getDenyIfBusyV3() { return denyIfBusyV3; } - public long getMaxSleepTimeMs() { - return maxSleepTimeMs; - } + public long getMaxSleepTimeMs() { return maxSleepTimeMs; } public boolean getSilentUpgrade() { return silentUpgrade; } @@ -36,6 +32,7 @@ public final class FeedParams { * Mutable class used to instantiate a {@link FeedParams}. */ public static final class Builder { + private DataFormat dataFormat = DataFormat.JSON_UTF8; private long serverTimeout = TimeUnit.SECONDS.toMillis(180); private long clientTimeout = TimeUnit.SECONDS.toMillis(20); @@ -57,7 +54,7 @@ public final class FeedParams { * @return this, for chaining */ @Beta - public Builder withSilentUpgrade(boolean silentUpgrade) { + public Builder setSilentUpgrade(boolean silentUpgrade) { this.silentUpgrade = silentUpgrade; return this; } @@ -165,6 +162,7 @@ public final class FeedParams { /** * Sets the maximum number of operations to be in-flight. + * * @param maxInFlightRequests max number of operations. * @return this, for chaining */ @@ -246,11 +244,14 @@ public final class FeedParams { return maxChunkSizeBytes; } - public int getmaxInFlightRequests() { + public int getMaxInFlightRequests() { return maxInFlightRequests; } + } + // NOTE! See toBuilder at the end of this class if you add fields here + private final DataFormat dataFormat; private final long serverTimeoutMillis; private final long clientTimeoutMillis; @@ -263,7 +264,6 @@ public final class FeedParams { private final long maxSleepTimeMs; private final boolean silentUpgrade; - private FeedParams(DataFormat dataFormat, long serverTimeout, long clientTimeout, String route, int maxChunkSizeBytes, final int maxInFlightRequests, long localQueueTimeOut, String priority, boolean denyIfBusyV3, long maxSleepTimeMs, @@ -319,4 +319,20 @@ public final class FeedParams { return localQueueTimeOut; } + /** Returns a builder initialized to the values of this */ + public FeedParams.Builder toBuilder() { + Builder b = new Builder(); + b.setDataFormat(dataFormat); + b.setServerTimeout(serverTimeoutMillis, TimeUnit.MILLISECONDS); + b.setClientTimeout(clientTimeoutMillis, TimeUnit.MILLISECONDS); + b.setRoute(route); + b.setMaxChunkSizeBytes(maxChunkSizeBytes); + b.setMaxInFlightRequests(maxInFlightRequests); + b.setPriority(priority); + b.setDenyIfBusyV3(denyIfBusyV3); + b.setMaxSleepTimeMs(maxSleepTimeMs); + b.setSilentUpgrade(silentUpgrade); + return b; + } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java index 48fd21e2b1f..4e1406ab966 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java @@ -133,6 +133,8 @@ public final class SessionParams { } } + // NOTE! See toBuilder at the end of this class if you add fields here + private final List<Cluster> clusters; private final FeedParams feedParams; private final ConnectionParams connectionParams; @@ -179,4 +181,15 @@ public final class SessionParams { return errorReport; } + public Builder toBuilder() { + Builder b = new Builder(); + clusters.forEach(c -> b.addCluster(c)); + b.setFeedParams(feedParams); + b.setConnectionParams(connectionParams); + b.setClientQueueSize(clientQueueSize); + b.setErrorReporter(errorReport); + b.setThrottlerMinSize(throttlerMinSize); + return b; + } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index da45acc5687..6e1f3419e8e 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -8,7 +8,6 @@ import com.yahoo.vespa.http.client.config.Cluster; import com.yahoo.vespa.http.client.config.ConnectionParams; import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.SessionParams; import com.yahoo.vespa.http.client.core.Document; import com.yahoo.vespa.http.client.core.Exceptions; import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; @@ -25,45 +24,35 @@ import java.util.concurrent.TimeUnit; */ public class ClusterConnection implements AutoCloseable { - private final OperationProcessor operationProcessor; private final List<IOThread> ioThreads = new ArrayList<>(); private final int clusterId; - private final SessionParams.ErrorReporter errorReporter; private static JsonFactory jsonFactory = new JsonFactory(); private static ObjectMapper objectMapper = new ObjectMapper(); - public ClusterConnection( - OperationProcessor operationProcessor, - FeedParams feedParams, - ConnectionParams connectionParams, - SessionParams.ErrorReporter errorReporter, - Cluster cluster, - int clusterId, - int clientQueueSizePerCluster, - ScheduledThreadPoolExecutor timeoutExecutor) { - this.errorReporter = errorReporter; - if (cluster.getEndpoints().isEmpty()) { + public ClusterConnection(OperationProcessor operationProcessor, + FeedParams feedParams, + ConnectionParams connectionParams, + Cluster cluster, + int clusterId, + int clientQueueSizePerCluster, + ScheduledThreadPoolExecutor timeoutExecutor) { + if (cluster.getEndpoints().isEmpty()) throw new IllegalArgumentException("Cannot feed to empty cluster."); - } - this.operationProcessor = operationProcessor; + this.clusterId = clusterId; - final int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() - * connectionParams.getNumPersistentConnectionsPerEndpoint(); - if (totalNumberOfEndpointsInThisCluster == 0) { - return; - } + int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() * connectionParams.getNumPersistentConnectionsPerEndpoint(); + if (totalNumberOfEndpointsInThisCluster == 0) return; + // Lower than 1 does not make any sense. - final int maxInFlightPerSession = Math.max( - 1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster); + int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster); + DocumentQueue documentQueue = null; for (Endpoint endpoint : cluster.getEndpoints()) { - final EndpointResultQueue endpointResultQueue = new EndpointResultQueue( - operationProcessor, - endpoint, - clusterId, - timeoutExecutor, - feedParams.getServerTimeout(TimeUnit.MILLISECONDS) - + feedParams.getClientTimeout(TimeUnit.MILLISECONDS)); + EndpointResultQueue endpointResultQueue = new EndpointResultQueue(operationProcessor, + endpoint, + clusterId, + timeoutExecutor, + feedParams.getServerTimeout(TimeUnit.MILLISECONDS) + feedParams.getClientTimeout(TimeUnit.MILLISECONDS)); for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) { GatewayConnection gatewayConnection; if (connectionParams.isDryRun()) { @@ -74,24 +63,22 @@ public class ClusterConnection implements AutoCloseable { feedParams, cluster.getRoute(), connectionParams, - new ApacheGatewayConnection.HttpClientFactory( - connectionParams, endpoint.isUseSsl()), + new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()), operationProcessor.getClientId() ); } if (documentQueue == null) { documentQueue = new DocumentQueue(clientQueueSizePerCluster); } - final IOThread ioThread = new IOThread( - operationProcessor.getIoThreadGroup(), - endpointResultQueue, - gatewayConnection, - clusterId, - feedParams.getMaxChunkSizeBytes(), - maxInFlightPerSession, - feedParams.getLocalQueueTimeOut(), - documentQueue, - feedParams.getMaxSleepTimeMs()); + IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(), + endpointResultQueue, + gatewayConnection, + clusterId, + feedParams.getMaxChunkSizeBytes(), + maxInFlightPerSession, + feedParams.getLocalQueueTimeOut(), + documentQueue, + feedParams.getMaxSleepTimeMs()); ioThreads.add(ioThread); } } @@ -103,9 +90,9 @@ public class ClusterConnection implements AutoCloseable { public void post(Document document) throws EndpointIOException { String documentIdStr = document.getDocumentId(); - //the same document ID must always go to the same destination + // The same document ID must always go to the same destination // In noHandshakeMode this has no effect as the documentQueue is shared between the IOThreads. - int hash = documentIdStr.hashCode() & 0x7FFFFFFF; //strip sign bit + int hash = documentIdStr.hashCode() & 0x7FFFFFFF; // Strip sign bit IOThread ioThread = ioThreads.get(hash % ioThreads.size()); try { ioThread.post(document); @@ -148,7 +135,7 @@ public class ClusterConnection implements AutoCloseable { } public String getStatsAsJSon() throws IOException { - final StringWriter stringWriter = new StringWriter(); + StringWriter stringWriter = new StringWriter(); JsonGenerator jsonGenerator = jsonFactory.createGenerator(stringWriter); jsonGenerator.writeStartObject(); jsonGenerator.writeArrayFieldStart("session"); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 8c4ff3ae108..8ec4f6cb7f4 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -24,7 +24,7 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * Class for handling asynchronous feeding of new documents and processing of results. + * Thread which feeds document operations asynchronously and processes the results. * * @author Einar M R Rosenvinge */ @@ -53,19 +53,18 @@ class IOThread implements Runnable, AutoCloseable { private final AtomicInteger docsReceivedCounter = new AtomicInteger(0); private final AtomicInteger statusReceivedCounter = new AtomicInteger(0); private final AtomicInteger pendingDocumentStatusCount = new AtomicInteger(0); - private final AtomicInteger successfullHandshakes = new AtomicInteger(0); + private final AtomicInteger successfulHandshakes = new AtomicInteger(0); private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0); - IOThread( - ThreadGroup ioThreadGroup, - EndpointResultQueue endpointResultQueue, - GatewayConnection client, - int clusterId, - int maxChunkSizeBytes, - int maxInFlightRequests, - long localQueueTimeOut, - DocumentQueue documentQueue, - long maxSleepTimeMs) { + IOThread(ThreadGroup ioThreadGroup, + EndpointResultQueue endpointResultQueue, + GatewayConnection client, + int clusterId, + int maxChunkSizeBytes, + int maxInFlightRequests, + long localQueueTimeOut, + DocumentQueue documentQueue, + long maxSleepTimeMs) { this.documentQueue = documentQueue; this.endpoint = client.getEndpoint(); this.client = client; @@ -86,6 +85,9 @@ class IOThread implements Runnable, AutoCloseable { } public static class ConnectionStats { + + // NOTE: These fields are accessed by reflection in JSON serialization + public final int wrongSessionDetectedCounter; public final int wrongVersionDetectedCounter; public final int problemStatusCodeFromServerCounter; @@ -96,16 +98,15 @@ class IOThread implements Runnable, AutoCloseable { public final int successfullHandshakes; public final int lastGatewayProcessTimeMillis; - protected ConnectionStats( - final int wrongSessionDetectedCounter, - final int wrongVersionDetectedCounter, - final int problemStatusCodeFromServerCounter, - final int executeProblemsCounter, - final int docsReceivedCounter, - final int statusReceivedCounter, - final int pendingDocumentStatusCount, - final int successfullHandshakes, - final int lastGatewayProcessTimeMillis) { + ConnectionStats(int wrongSessionDetectedCounter, + int wrongVersionDetectedCounter, + int problemStatusCodeFromServerCounter, + int executeProblemsCounter, + int docsReceivedCounter, + int statusReceivedCounter, + int pendingDocumentStatusCount, + int successfullHandshakes, + int lastGatewayProcessTimeMillis) { this.wrongSessionDetectedCounter = wrongSessionDetectedCounter; this.wrongVersionDetectedCounter = wrongVersionDetectedCounter; this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter; @@ -130,16 +131,14 @@ class IOThread implements Runnable, AutoCloseable { docsReceivedCounter.get(), statusReceivedCounter.get(), pendingDocumentStatusCount.get(), - successfullHandshakes.get(), + successfulHandshakes.get(), lastGatewayProcessTimeMillis.get()); } @Override public void close() { documentQueue.close(); - if (stopSignal.getCount() == 0) { - return; - } + if (stopSignal.getCount() == 0) return; stopSignal.countDown(); log.finer("Closed called."); @@ -166,8 +165,7 @@ class IOThread implements Runnable, AutoCloseable { log.fine("Session to " + endpoint + " closed."); } - - public void post(final Document document) throws InterruptedException { + public void post(Document document) throws InterruptedException { documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup); } @@ -177,8 +175,8 @@ class IOThread implements Runnable, AutoCloseable { } - List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) { - final List<Document> docsForSendChunk = new ArrayList<>(); + List<Document> getNextDocsForFeeding(long maxWaitUnits, TimeUnit timeUnit) { + List<Document> docsForSendChunk = new ArrayList<>(); int chunkSizeBytes = 0; try { drainFirstDocumentsInQueueIfOld(); @@ -214,8 +212,7 @@ class IOThread implements Runnable, AutoCloseable { } } - private void markDocumentAsFailed( - List<Document> docs, ServerResponseException servletException) { + private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) { for (Document doc : docs) { resultQueue.failOperation( EndPointResultFactory.createTransientError( @@ -223,8 +220,7 @@ class IOThread implements Runnable, AutoCloseable { } } - private InputStream sendAndReceive(List<Document> docs) - throws IOException, ServerResponseException { + private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException { try { // Post the new docs and get async responses for other posts. return client.writeOperations(docs); @@ -238,17 +234,19 @@ class IOThread implements Runnable, AutoCloseable { } private static class ProcessResponse { + private final int transitiveErrorCount; private final int processResultsCount; + ProcessResponse(int transitiveErrorCount, int processResultsCount) { this.transitiveErrorCount = transitiveErrorCount; this.processResultsCount = processResultsCount; } + } private ProcessResponse processResponse(InputStream serverResponse) throws IOException { - final Collection<EndpointResult> endpointResults = - EndPointResultFactory.createResult(endpoint, serverResponse); + Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse); statusReceivedCounter.addAndGet(endpointResults.size()); int transientErrors = 0; for (EndpointResult endpointResult : endpointResults) { @@ -271,15 +269,14 @@ class IOThread implements Runnable, AutoCloseable { return processResponse; } - private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) - throws ServerResponseException, IOException { - final int pendingResultQueueSize = resultQueue.getPendingSize(); + private ProcessResponse pullAndProcessData(long maxWaitTimeMs) throws ServerResponseException, IOException { + int pendingResultQueueSize = resultQueue.getPendingSize(); pendingDocumentStatusCount.set(pendingResultQueueSize); - List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests) + List<Document> nextDocsForFeeding = + (pendingResultQueueSize > maxInFlightRequests) ? new ArrayList<>() // The queue is full, will not send more documents. - : getNextDocsForFeeding(maxWaitTimeMilliSecs, TimeUnit.MILLISECONDS); - + : getNextDocsForFeeding(maxWaitTimeMs, TimeUnit.MILLISECONDS); if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) { //we have no unfinished business with the server now. @@ -288,6 +285,7 @@ class IOThread implements Runnable, AutoCloseable { } log.finest("Awaiting " + pendingResultQueueSize + " results."); ProcessResponse processResponse = feedDocumentAndProcessResults(nextDocsForFeeding); + if (pendingResultQueueSize > maxInFlightRequests && processResponse.processResultsCount == 0) { try { // Max outstanding document operations, no more results on server side, wait a bit @@ -319,7 +317,7 @@ class IOThread implements Runnable, AutoCloseable { case CONNECTED: try { client.handshake(); - successfullHandshakes.getAndIncrement(); + successfulHandshakes.getAndIncrement(); } catch (ServerResponseException ser) { executeProblemsCounter.incrementAndGet(); log.info("Handshake did not work out " + endpoint + ": " + Exceptions.toMessageString(ser)); @@ -337,7 +335,7 @@ class IOThread implements Runnable, AutoCloseable { return ThreadState.SESSION_SYNCED; case SESSION_SYNCED: try { - ProcessResponse processResponse = pullAndProcessData(100); + ProcessResponse processResponse = pullAndProcessData(1); gatewayThrottler.handleCall(processResponse.transitiveErrorCount); } catch (ServerResponseException ser) { @@ -387,9 +385,8 @@ class IOThread implements Runnable, AutoCloseable { private void drainFirstDocumentsInQueueIfOld() { while (true) { Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut); - if (! document.isPresent()) { - return; - } + if ( ! document.isPresent()) return; + EndpointResult endpointResult = EndPointResultFactory.createTransientError( endpoint, document.get().getOperationId(), new Exception("Not sending document operation, timed out in queue after " diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 45133901567..692d90abe50 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -56,41 +56,34 @@ public class OperationProcessor { private final ThreadGroup ioThreadGroup; private final String clientId = new BigInteger(130, random).toString(32); - public OperationProcessor( - IncompleteResultsThrottler incompleteResultsThrottler, - FeedClient.ResultCallback resultCallback, - SessionParams sessionParams, - ScheduledThreadPoolExecutor timeoutExecutor) { + public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler, + FeedClient.ResultCallback resultCallback, + SessionParams sessionParams, + ScheduledThreadPoolExecutor timeoutExecutor) { this.numDestinations = sessionParams.getClusters().size(); this.resultCallback = resultCallback; this.incompleteResultsThrottler = incompleteResultsThrottler; this.timeoutExecutor = timeoutExecutor; this.ioThreadGroup = new ThreadGroup("operationprocessor"); - if (sessionParams.getClusters().isEmpty()) { + if (sessionParams.getClusters().isEmpty()) throw new IllegalArgumentException("Cannot feed to 0 clusters."); - } for (Cluster cluster : sessionParams.getClusters()) { - if (cluster.getEndpoints().isEmpty()) { + if (cluster.getEndpoints().isEmpty()) throw new IllegalArgumentException("Cannot feed to empty cluster."); - } } for (int i = 0; i < sessionParams.getClusters().size(); i++) { Cluster cluster = sessionParams.getClusters().get(i); - - clusters.add(new ClusterConnection( - this, - sessionParams.getFeedParams(), - sessionParams.getConnectionParams(), - sessionParams.getErrorReport(), - cluster, - i, - sessionParams.getClientQueueSize() / sessionParams.getClusters().size(), - timeoutExecutor)); - - } + clusters.add(new ClusterConnection(this, + sessionParams.getFeedParams(), + sessionParams.getConnectionParams(), + cluster, + i, + sessionParams.getClientQueueSize() / sessionParams.getClusters().size(), + timeoutExecutor)); + } operationStats = new OperationStats(sessionParams, clusters, incompleteResultsThrottler); maxRetries = sessionParams.getConnectionParams().getMaxRetries(); minTimeBetweenRetriesMs = sessionParams.getConnectionParams().getMinTimeBetweenRetriesMs(); @@ -122,21 +115,16 @@ public class OperationProcessor { } private boolean retriedThis(EndpointResult endpointResult, DocumentSendInfo documentSendInfo, int clusterId) { - final Result.Detail detail = endpointResult.getDetail(); - // If success, no retries to do. - if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) { - return false; - } + Result.Detail detail = endpointResult.getDetail(); + if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) return false; // Success: No retries int retries = documentSendInfo.incRetries(clusterId, detail); - if (retries > maxRetries) { - return false; - } + if (retries > maxRetries) return false; String exceptionMessage = detail.getException() == null ? "" : detail.getException().getMessage(); - if (exceptionMessage == null) { + if (exceptionMessage == null) exceptionMessage = ""; - } + // TODO: Return proper error code in structured data in next version of internal API. // Error codes from messagebus/src/cpp/messagebus/errorcode.h boolean retryThisOperation = @@ -151,12 +139,10 @@ public class OperationProcessor { if (retryThisOperation) { int waitTime = (int) (minTimeBetweenRetriesMs * (1 + random.nextDouble() / 3)); - log.finest("Retrying due to " + detail.toString() + " attempt " + retries - + " in " + waitTime + " ms."); - timeoutExecutor.schedule( - () -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()), - waitTime, - TimeUnit.MILLISECONDS); + log.finest("Retrying due to " + detail.toString() + " attempt " + retries + " in " + waitTime + " ms."); + timeoutExecutor.schedule(() -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()), + waitTime, + TimeUnit.MILLISECONDS); return true; } @@ -173,28 +159,20 @@ public class OperationProcessor { } DocumentSendInfo documentSendInfo = docSendInfoByOperationId.get(endpointResult.getOperationId()); - if (retriedThis(endpointResult, documentSendInfo, clusterId)) { - return null; - } + if (retriedThis(endpointResult, documentSendInfo, clusterId)) return null; - if (!documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) { - // Duplicate message, we have seen this operation before. - return null; - } + // Duplicate message + if ( ! documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) return null; // Is this the last operation we are waiting for? - if (documentSendInfo.detailCount() != numDestinations) { - return null; - } + if (documentSendInfo.detailCount() != numDestinations) return null; result = documentSendInfo.createResult(); docSendInfoByOperationId.remove(endpointResult.getOperationId()); String documentId = documentSendInfo.getDocument().getDocumentId(); - /** - * If we got a pending operation against this document - * dont't remove it from inflightDocuments and send blocked document operation - */ + // If we got a pending operation against this document + // dont't remove it from inflightDocuments and send blocked document operation List<Document> blockedDocuments = blockedDocumentsByDocumentId.get(documentId); if (blockedDocuments.isEmpty()) { inflightDocumentIds.remove(documentId); @@ -210,7 +188,6 @@ public class OperationProcessor { public void resultReceived(EndpointResult endpointResult, int clusterId) { Result result = process(endpointResult, clusterId); - if (result != null) { incompleteResultsThrottler.resultReady(result.isSuccess()); resultCallback.onCompletion(result.getDocumentId(), result); @@ -252,7 +229,6 @@ public class OperationProcessor { } private void sendToClusters(Document document) { - synchronized (monitor) { boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0; docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc)); @@ -319,4 +295,5 @@ public class OperationProcessor { throw new RuntimeException("Did not manage to shut down retry threads. Please report problem."); } } + } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java index a2d5b18999e..388c71087ec 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client; import com.yahoo.vespa.http.client.config.Cluster; import com.yahoo.vespa.http.client.config.ConnectionParams; import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.FeedParams; import com.yahoo.vespa.http.client.config.SessionParams; import org.junit.Test; @@ -36,7 +37,6 @@ public class SyncFeedClientTest { .build(); SyncFeedClient feedClient = new SyncFeedClient(sessionParams); - assertFeedSuccessful(feedClient); assertFeedSuccessful(feedClient); // ensure the client can be reused feedClient.close(); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index 3143282081b..5a4c6d05185 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -162,7 +162,7 @@ public class IOThreadTest { @Test public void requireThatEndpointConnectExceptionsArePropagated() - throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { + throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { when(apacheGatewayConnection.connect()).thenReturn(true); String errorMessage = "generic error message"; IOException cause = new IOException(errorMessage); |