summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@oath.com>2019-07-05 03:31:10 -0700
committerGitHub <noreply@github.com>2019-07-05 03:31:10 -0700
commit10e4c09fa1db4ad4c10b12d216bba6cc33593dc7 (patch)
treef0157414c89ad1a792acded843322af837113ed7
parentf1f227311ee6748bff1dd83a4aef8a3e10001cd7 (diff)
parent80fed4ea6e58d2778463743b7bcd88607412d31e (diff)
Merge pull request #9954 from vespa-engine/bratseth/dont-wait
Bratseth/dont wait
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java34
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java77
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java91
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java83
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java2
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java2
7 files changed, 146 insertions, 156 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
index 008f3b63a89..fff0aa910d5 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
@@ -13,13 +13,9 @@ import java.util.concurrent.TimeUnit;
*/
public final class FeedParams {
- public boolean getDenyIfBusyV3() {
- return denyIfBusyV3;
- }
+ public boolean getDenyIfBusyV3() { return denyIfBusyV3; }
- public long getMaxSleepTimeMs() {
- return maxSleepTimeMs;
- }
+ public long getMaxSleepTimeMs() { return maxSleepTimeMs; }
public boolean getSilentUpgrade() { return silentUpgrade; }
@@ -36,6 +32,7 @@ public final class FeedParams {
* Mutable class used to instantiate a {@link FeedParams}.
*/
public static final class Builder {
+
private DataFormat dataFormat = DataFormat.JSON_UTF8;
private long serverTimeout = TimeUnit.SECONDS.toMillis(180);
private long clientTimeout = TimeUnit.SECONDS.toMillis(20);
@@ -57,7 +54,7 @@ public final class FeedParams {
* @return this, for chaining
*/
@Beta
- public Builder withSilentUpgrade(boolean silentUpgrade) {
+ public Builder setSilentUpgrade(boolean silentUpgrade) {
this.silentUpgrade = silentUpgrade;
return this;
}
@@ -165,6 +162,7 @@ public final class FeedParams {
/**
* Sets the maximum number of operations to be in-flight.
+ *
* @param maxInFlightRequests max number of operations.
* @return this, for chaining
*/
@@ -246,11 +244,14 @@ public final class FeedParams {
return maxChunkSizeBytes;
}
- public int getmaxInFlightRequests() {
+ public int getMaxInFlightRequests() {
return maxInFlightRequests;
}
+
}
+ // NOTE! See toBuilder at the end of this class if you add fields here
+
private final DataFormat dataFormat;
private final long serverTimeoutMillis;
private final long clientTimeoutMillis;
@@ -263,7 +264,6 @@ public final class FeedParams {
private final long maxSleepTimeMs;
private final boolean silentUpgrade;
-
private FeedParams(DataFormat dataFormat, long serverTimeout, long clientTimeout, String route,
int maxChunkSizeBytes, final int maxInFlightRequests,
long localQueueTimeOut, String priority, boolean denyIfBusyV3, long maxSleepTimeMs,
@@ -319,4 +319,20 @@ public final class FeedParams {
return localQueueTimeOut;
}
+ /** Returns a builder initialized to the values of this */
+ public FeedParams.Builder toBuilder() {
+ Builder b = new Builder();
+ b.setDataFormat(dataFormat);
+ b.setServerTimeout(serverTimeoutMillis, TimeUnit.MILLISECONDS);
+ b.setClientTimeout(clientTimeoutMillis, TimeUnit.MILLISECONDS);
+ b.setRoute(route);
+ b.setMaxChunkSizeBytes(maxChunkSizeBytes);
+ b.setMaxInFlightRequests(maxInFlightRequests);
+ b.setPriority(priority);
+ b.setDenyIfBusyV3(denyIfBusyV3);
+ b.setMaxSleepTimeMs(maxSleepTimeMs);
+ b.setSilentUpgrade(silentUpgrade);
+ return b;
+ }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
index 48fd21e2b1f..4e1406ab966 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
@@ -133,6 +133,8 @@ public final class SessionParams {
}
}
+ // NOTE! See toBuilder at the end of this class if you add fields here
+
private final List<Cluster> clusters;
private final FeedParams feedParams;
private final ConnectionParams connectionParams;
@@ -179,4 +181,15 @@ public final class SessionParams {
return errorReport;
}
+ public Builder toBuilder() {
+ Builder b = new Builder();
+ clusters.forEach(c -> b.addCluster(c));
+ b.setFeedParams(feedParams);
+ b.setConnectionParams(connectionParams);
+ b.setClientQueueSize(clientQueueSize);
+ b.setErrorReporter(errorReport);
+ b.setThrottlerMinSize(throttlerMinSize);
+ return b;
+ }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index da45acc5687..6e1f3419e8e 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -8,7 +8,6 @@ import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.ConnectionParams;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.config.SessionParams;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.Exceptions;
import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
@@ -25,45 +24,35 @@ import java.util.concurrent.TimeUnit;
*/
public class ClusterConnection implements AutoCloseable {
- private final OperationProcessor operationProcessor;
private final List<IOThread> ioThreads = new ArrayList<>();
private final int clusterId;
- private final SessionParams.ErrorReporter errorReporter;
private static JsonFactory jsonFactory = new JsonFactory();
private static ObjectMapper objectMapper = new ObjectMapper();
- public ClusterConnection(
- OperationProcessor operationProcessor,
- FeedParams feedParams,
- ConnectionParams connectionParams,
- SessionParams.ErrorReporter errorReporter,
- Cluster cluster,
- int clusterId,
- int clientQueueSizePerCluster,
- ScheduledThreadPoolExecutor timeoutExecutor) {
- this.errorReporter = errorReporter;
- if (cluster.getEndpoints().isEmpty()) {
+ public ClusterConnection(OperationProcessor operationProcessor,
+ FeedParams feedParams,
+ ConnectionParams connectionParams,
+ Cluster cluster,
+ int clusterId,
+ int clientQueueSizePerCluster,
+ ScheduledThreadPoolExecutor timeoutExecutor) {
+ if (cluster.getEndpoints().isEmpty())
throw new IllegalArgumentException("Cannot feed to empty cluster.");
- }
- this.operationProcessor = operationProcessor;
+
this.clusterId = clusterId;
- final int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size()
- * connectionParams.getNumPersistentConnectionsPerEndpoint();
- if (totalNumberOfEndpointsInThisCluster == 0) {
- return;
- }
+ int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() * connectionParams.getNumPersistentConnectionsPerEndpoint();
+ if (totalNumberOfEndpointsInThisCluster == 0) return;
+
// Lower than 1 does not make any sense.
- final int maxInFlightPerSession = Math.max(
- 1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster);
+ int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster);
+
DocumentQueue documentQueue = null;
for (Endpoint endpoint : cluster.getEndpoints()) {
- final EndpointResultQueue endpointResultQueue = new EndpointResultQueue(
- operationProcessor,
- endpoint,
- clusterId,
- timeoutExecutor,
- feedParams.getServerTimeout(TimeUnit.MILLISECONDS)
- + feedParams.getClientTimeout(TimeUnit.MILLISECONDS));
+ EndpointResultQueue endpointResultQueue = new EndpointResultQueue(operationProcessor,
+ endpoint,
+ clusterId,
+ timeoutExecutor,
+ feedParams.getServerTimeout(TimeUnit.MILLISECONDS) + feedParams.getClientTimeout(TimeUnit.MILLISECONDS));
for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) {
GatewayConnection gatewayConnection;
if (connectionParams.isDryRun()) {
@@ -74,24 +63,22 @@ public class ClusterConnection implements AutoCloseable {
feedParams,
cluster.getRoute(),
connectionParams,
- new ApacheGatewayConnection.HttpClientFactory(
- connectionParams, endpoint.isUseSsl()),
+ new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()),
operationProcessor.getClientId()
);
}
if (documentQueue == null) {
documentQueue = new DocumentQueue(clientQueueSizePerCluster);
}
- final IOThread ioThread = new IOThread(
- operationProcessor.getIoThreadGroup(),
- endpointResultQueue,
- gatewayConnection,
- clusterId,
- feedParams.getMaxChunkSizeBytes(),
- maxInFlightPerSession,
- feedParams.getLocalQueueTimeOut(),
- documentQueue,
- feedParams.getMaxSleepTimeMs());
+ IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(),
+ endpointResultQueue,
+ gatewayConnection,
+ clusterId,
+ feedParams.getMaxChunkSizeBytes(),
+ maxInFlightPerSession,
+ feedParams.getLocalQueueTimeOut(),
+ documentQueue,
+ feedParams.getMaxSleepTimeMs());
ioThreads.add(ioThread);
}
}
@@ -103,9 +90,9 @@ public class ClusterConnection implements AutoCloseable {
public void post(Document document) throws EndpointIOException {
String documentIdStr = document.getDocumentId();
- //the same document ID must always go to the same destination
+ // The same document ID must always go to the same destination
// In noHandshakeMode this has no effect as the documentQueue is shared between the IOThreads.
- int hash = documentIdStr.hashCode() & 0x7FFFFFFF; //strip sign bit
+ int hash = documentIdStr.hashCode() & 0x7FFFFFFF; // Strip sign bit
IOThread ioThread = ioThreads.get(hash % ioThreads.size());
try {
ioThread.post(document);
@@ -148,7 +135,7 @@ public class ClusterConnection implements AutoCloseable {
}
public String getStatsAsJSon() throws IOException {
- final StringWriter stringWriter = new StringWriter();
+ StringWriter stringWriter = new StringWriter();
JsonGenerator jsonGenerator = jsonFactory.createGenerator(stringWriter);
jsonGenerator.writeStartObject();
jsonGenerator.writeArrayFieldStart("session");
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 8c4ff3ae108..8ec4f6cb7f4 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -24,7 +24,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * Class for handling asynchronous feeding of new documents and processing of results.
+ * Thread which feeds document operations asynchronously and processes the results.
*
* @author Einar M R Rosenvinge
*/
@@ -53,19 +53,18 @@ class IOThread implements Runnable, AutoCloseable {
private final AtomicInteger docsReceivedCounter = new AtomicInteger(0);
private final AtomicInteger statusReceivedCounter = new AtomicInteger(0);
private final AtomicInteger pendingDocumentStatusCount = new AtomicInteger(0);
- private final AtomicInteger successfullHandshakes = new AtomicInteger(0);
+ private final AtomicInteger successfulHandshakes = new AtomicInteger(0);
private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);
- IOThread(
- ThreadGroup ioThreadGroup,
- EndpointResultQueue endpointResultQueue,
- GatewayConnection client,
- int clusterId,
- int maxChunkSizeBytes,
- int maxInFlightRequests,
- long localQueueTimeOut,
- DocumentQueue documentQueue,
- long maxSleepTimeMs) {
+ IOThread(ThreadGroup ioThreadGroup,
+ EndpointResultQueue endpointResultQueue,
+ GatewayConnection client,
+ int clusterId,
+ int maxChunkSizeBytes,
+ int maxInFlightRequests,
+ long localQueueTimeOut,
+ DocumentQueue documentQueue,
+ long maxSleepTimeMs) {
this.documentQueue = documentQueue;
this.endpoint = client.getEndpoint();
this.client = client;
@@ -86,6 +85,9 @@ class IOThread implements Runnable, AutoCloseable {
}
public static class ConnectionStats {
+
+ // NOTE: These fields are accessed by reflection in JSON serialization
+
public final int wrongSessionDetectedCounter;
public final int wrongVersionDetectedCounter;
public final int problemStatusCodeFromServerCounter;
@@ -96,16 +98,15 @@ class IOThread implements Runnable, AutoCloseable {
public final int successfullHandshakes;
public final int lastGatewayProcessTimeMillis;
- protected ConnectionStats(
- final int wrongSessionDetectedCounter,
- final int wrongVersionDetectedCounter,
- final int problemStatusCodeFromServerCounter,
- final int executeProblemsCounter,
- final int docsReceivedCounter,
- final int statusReceivedCounter,
- final int pendingDocumentStatusCount,
- final int successfullHandshakes,
- final int lastGatewayProcessTimeMillis) {
+ ConnectionStats(int wrongSessionDetectedCounter,
+ int wrongVersionDetectedCounter,
+ int problemStatusCodeFromServerCounter,
+ int executeProblemsCounter,
+ int docsReceivedCounter,
+ int statusReceivedCounter,
+ int pendingDocumentStatusCount,
+ int successfullHandshakes,
+ int lastGatewayProcessTimeMillis) {
this.wrongSessionDetectedCounter = wrongSessionDetectedCounter;
this.wrongVersionDetectedCounter = wrongVersionDetectedCounter;
this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter;
@@ -130,16 +131,14 @@ class IOThread implements Runnable, AutoCloseable {
docsReceivedCounter.get(),
statusReceivedCounter.get(),
pendingDocumentStatusCount.get(),
- successfullHandshakes.get(),
+ successfulHandshakes.get(),
lastGatewayProcessTimeMillis.get());
}
@Override
public void close() {
documentQueue.close();
- if (stopSignal.getCount() == 0) {
- return;
- }
+ if (stopSignal.getCount() == 0) return;
stopSignal.countDown();
log.finer("Closed called.");
@@ -166,8 +165,7 @@ class IOThread implements Runnable, AutoCloseable {
log.fine("Session to " + endpoint + " closed.");
}
-
- public void post(final Document document) throws InterruptedException {
+ public void post(Document document) throws InterruptedException {
documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup);
}
@@ -177,8 +175,8 @@ class IOThread implements Runnable, AutoCloseable {
}
- List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) {
- final List<Document> docsForSendChunk = new ArrayList<>();
+ List<Document> getNextDocsForFeeding(long maxWaitUnits, TimeUnit timeUnit) {
+ List<Document> docsForSendChunk = new ArrayList<>();
int chunkSizeBytes = 0;
try {
drainFirstDocumentsInQueueIfOld();
@@ -214,8 +212,7 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private void markDocumentAsFailed(
- List<Document> docs, ServerResponseException servletException) {
+ private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) {
for (Document doc : docs) {
resultQueue.failOperation(
EndPointResultFactory.createTransientError(
@@ -223,8 +220,7 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private InputStream sendAndReceive(List<Document> docs)
- throws IOException, ServerResponseException {
+ private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException {
try {
// Post the new docs and get async responses for other posts.
return client.writeOperations(docs);
@@ -238,17 +234,19 @@ class IOThread implements Runnable, AutoCloseable {
}
private static class ProcessResponse {
+
private final int transitiveErrorCount;
private final int processResultsCount;
+
ProcessResponse(int transitiveErrorCount, int processResultsCount) {
this.transitiveErrorCount = transitiveErrorCount;
this.processResultsCount = processResultsCount;
}
+
}
private ProcessResponse processResponse(InputStream serverResponse) throws IOException {
- final Collection<EndpointResult> endpointResults =
- EndPointResultFactory.createResult(endpoint, serverResponse);
+ Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse);
statusReceivedCounter.addAndGet(endpointResults.size());
int transientErrors = 0;
for (EndpointResult endpointResult : endpointResults) {
@@ -271,15 +269,14 @@ class IOThread implements Runnable, AutoCloseable {
return processResponse;
}
- private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs)
- throws ServerResponseException, IOException {
- final int pendingResultQueueSize = resultQueue.getPendingSize();
+ private ProcessResponse pullAndProcessData(long maxWaitTimeMs) throws ServerResponseException, IOException {
+ int pendingResultQueueSize = resultQueue.getPendingSize();
pendingDocumentStatusCount.set(pendingResultQueueSize);
- List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests)
+ List<Document> nextDocsForFeeding =
+ (pendingResultQueueSize > maxInFlightRequests)
? new ArrayList<>() // The queue is full, will not send more documents.
- : getNextDocsForFeeding(maxWaitTimeMilliSecs, TimeUnit.MILLISECONDS);
-
+ : getNextDocsForFeeding(maxWaitTimeMs, TimeUnit.MILLISECONDS);
if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) {
//we have no unfinished business with the server now.
@@ -288,6 +285,7 @@ class IOThread implements Runnable, AutoCloseable {
}
log.finest("Awaiting " + pendingResultQueueSize + " results.");
ProcessResponse processResponse = feedDocumentAndProcessResults(nextDocsForFeeding);
+
if (pendingResultQueueSize > maxInFlightRequests && processResponse.processResultsCount == 0) {
try {
// Max outstanding document operations, no more results on server side, wait a bit
@@ -319,7 +317,7 @@ class IOThread implements Runnable, AutoCloseable {
case CONNECTED:
try {
client.handshake();
- successfullHandshakes.getAndIncrement();
+ successfulHandshakes.getAndIncrement();
} catch (ServerResponseException ser) {
executeProblemsCounter.incrementAndGet();
log.info("Handshake did not work out " + endpoint + ": " + Exceptions.toMessageString(ser));
@@ -337,7 +335,7 @@ class IOThread implements Runnable, AutoCloseable {
return ThreadState.SESSION_SYNCED;
case SESSION_SYNCED:
try {
- ProcessResponse processResponse = pullAndProcessData(100);
+ ProcessResponse processResponse = pullAndProcessData(1);
gatewayThrottler.handleCall(processResponse.transitiveErrorCount);
}
catch (ServerResponseException ser) {
@@ -387,9 +385,8 @@ class IOThread implements Runnable, AutoCloseable {
private void drainFirstDocumentsInQueueIfOld() {
while (true) {
Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut);
- if (! document.isPresent()) {
- return;
- }
+ if ( ! document.isPresent()) return;
+
EndpointResult endpointResult = EndPointResultFactory.createTransientError(
endpoint, document.get().getOperationId(),
new Exception("Not sending document operation, timed out in queue after "
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 45133901567..692d90abe50 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -56,41 +56,34 @@ public class OperationProcessor {
private final ThreadGroup ioThreadGroup;
private final String clientId = new BigInteger(130, random).toString(32);
- public OperationProcessor(
- IncompleteResultsThrottler incompleteResultsThrottler,
- FeedClient.ResultCallback resultCallback,
- SessionParams sessionParams,
- ScheduledThreadPoolExecutor timeoutExecutor) {
+ public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler,
+ FeedClient.ResultCallback resultCallback,
+ SessionParams sessionParams,
+ ScheduledThreadPoolExecutor timeoutExecutor) {
this.numDestinations = sessionParams.getClusters().size();
this.resultCallback = resultCallback;
this.incompleteResultsThrottler = incompleteResultsThrottler;
this.timeoutExecutor = timeoutExecutor;
this.ioThreadGroup = new ThreadGroup("operationprocessor");
- if (sessionParams.getClusters().isEmpty()) {
+ if (sessionParams.getClusters().isEmpty())
throw new IllegalArgumentException("Cannot feed to 0 clusters.");
- }
for (Cluster cluster : sessionParams.getClusters()) {
- if (cluster.getEndpoints().isEmpty()) {
+ if (cluster.getEndpoints().isEmpty())
throw new IllegalArgumentException("Cannot feed to empty cluster.");
- }
}
for (int i = 0; i < sessionParams.getClusters().size(); i++) {
Cluster cluster = sessionParams.getClusters().get(i);
-
- clusters.add(new ClusterConnection(
- this,
- sessionParams.getFeedParams(),
- sessionParams.getConnectionParams(),
- sessionParams.getErrorReport(),
- cluster,
- i,
- sessionParams.getClientQueueSize() / sessionParams.getClusters().size(),
- timeoutExecutor));
-
- }
+ clusters.add(new ClusterConnection(this,
+ sessionParams.getFeedParams(),
+ sessionParams.getConnectionParams(),
+ cluster,
+ i,
+ sessionParams.getClientQueueSize() / sessionParams.getClusters().size(),
+ timeoutExecutor));
+ }
operationStats = new OperationStats(sessionParams, clusters, incompleteResultsThrottler);
maxRetries = sessionParams.getConnectionParams().getMaxRetries();
minTimeBetweenRetriesMs = sessionParams.getConnectionParams().getMinTimeBetweenRetriesMs();
@@ -122,21 +115,16 @@ public class OperationProcessor {
}
private boolean retriedThis(EndpointResult endpointResult, DocumentSendInfo documentSendInfo, int clusterId) {
- final Result.Detail detail = endpointResult.getDetail();
- // If success, no retries to do.
- if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) {
- return false;
- }
+ Result.Detail detail = endpointResult.getDetail();
+ if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) return false; // Success: No retries
int retries = documentSendInfo.incRetries(clusterId, detail);
- if (retries > maxRetries) {
- return false;
- }
+ if (retries > maxRetries) return false;
String exceptionMessage = detail.getException() == null ? "" : detail.getException().getMessage();
- if (exceptionMessage == null) {
+ if (exceptionMessage == null)
exceptionMessage = "";
- }
+
// TODO: Return proper error code in structured data in next version of internal API.
// Error codes from messagebus/src/cpp/messagebus/errorcode.h
boolean retryThisOperation =
@@ -151,12 +139,10 @@ public class OperationProcessor {
if (retryThisOperation) {
int waitTime = (int) (minTimeBetweenRetriesMs * (1 + random.nextDouble() / 3));
- log.finest("Retrying due to " + detail.toString() + " attempt " + retries
- + " in " + waitTime + " ms.");
- timeoutExecutor.schedule(
- () -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()),
- waitTime,
- TimeUnit.MILLISECONDS);
+ log.finest("Retrying due to " + detail.toString() + " attempt " + retries + " in " + waitTime + " ms.");
+ timeoutExecutor.schedule(() -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()),
+ waitTime,
+ TimeUnit.MILLISECONDS);
return true;
}
@@ -173,28 +159,20 @@ public class OperationProcessor {
}
DocumentSendInfo documentSendInfo = docSendInfoByOperationId.get(endpointResult.getOperationId());
- if (retriedThis(endpointResult, documentSendInfo, clusterId)) {
- return null;
- }
+ if (retriedThis(endpointResult, documentSendInfo, clusterId)) return null;
- if (!documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) {
- // Duplicate message, we have seen this operation before.
- return null;
- }
+ // Duplicate message
+ if ( ! documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) return null;
// Is this the last operation we are waiting for?
- if (documentSendInfo.detailCount() != numDestinations) {
- return null;
- }
+ if (documentSendInfo.detailCount() != numDestinations) return null;
result = documentSendInfo.createResult();
docSendInfoByOperationId.remove(endpointResult.getOperationId());
String documentId = documentSendInfo.getDocument().getDocumentId();
- /**
- * If we got a pending operation against this document
- * dont't remove it from inflightDocuments and send blocked document operation
- */
+ // If we got a pending operation against this document
+ // dont't remove it from inflightDocuments and send blocked document operation
List<Document> blockedDocuments = blockedDocumentsByDocumentId.get(documentId);
if (blockedDocuments.isEmpty()) {
inflightDocumentIds.remove(documentId);
@@ -210,7 +188,6 @@ public class OperationProcessor {
public void resultReceived(EndpointResult endpointResult, int clusterId) {
Result result = process(endpointResult, clusterId);
-
if (result != null) {
incompleteResultsThrottler.resultReady(result.isSuccess());
resultCallback.onCompletion(result.getDocumentId(), result);
@@ -252,7 +229,6 @@ public class OperationProcessor {
}
private void sendToClusters(Document document) {
-
synchronized (monitor) {
boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0;
docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc));
@@ -319,4 +295,5 @@ public class OperationProcessor {
throw new RuntimeException("Did not manage to shut down retry threads. Please report problem.");
}
}
+
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java
index a2d5b18999e..388c71087ec 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client;
import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.ConnectionParams;
import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
import com.yahoo.vespa.http.client.config.SessionParams;
import org.junit.Test;
@@ -36,7 +37,6 @@ public class SyncFeedClientTest {
.build();
SyncFeedClient feedClient = new SyncFeedClient(sessionParams);
-
assertFeedSuccessful(feedClient);
assertFeedSuccessful(feedClient); // ensure the client can be reused
feedClient.close();
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 3143282081b..5a4c6d05185 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -162,7 +162,7 @@ public class IOThreadTest {
@Test
public void requireThatEndpointConnectExceptionsArePropagated()
- throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
+ throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
when(apacheGatewayConnection.connect()).thenReturn(true);
String errorMessage = "generic error message";
IOException cause = new IOException(errorMessage);