summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@verizonmedia.com>2019-07-03 10:25:54 -0700
committerJon Bratseth <bratseth@verizonmedia.com>2019-07-03 10:25:54 -0700
commita38afd806ece1174f7dc3cabd9ca0ac22f5a5801 (patch)
treed49c683872582e08409bb258e381be337f074b26 /vespa-http-client
parent0a0f2cc1692fc9419accae1570d885a186380e3d (diff)
Nonfunctional changes only
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java60
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java93
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java1
3 files changed, 67 insertions, 87 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index da45acc5687..560bbd536e5 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -8,7 +8,6 @@ import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.ConnectionParams;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.config.SessionParams;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.Exceptions;
import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
@@ -25,45 +24,35 @@ import java.util.concurrent.TimeUnit;
*/
public class ClusterConnection implements AutoCloseable {
- private final OperationProcessor operationProcessor;
private final List<IOThread> ioThreads = new ArrayList<>();
private final int clusterId;
- private final SessionParams.ErrorReporter errorReporter;
private static JsonFactory jsonFactory = new JsonFactory();
private static ObjectMapper objectMapper = new ObjectMapper();
- public ClusterConnection(
- OperationProcessor operationProcessor,
- FeedParams feedParams,
- ConnectionParams connectionParams,
- SessionParams.ErrorReporter errorReporter,
- Cluster cluster,
- int clusterId,
- int clientQueueSizePerCluster,
- ScheduledThreadPoolExecutor timeoutExecutor) {
- this.errorReporter = errorReporter;
- if (cluster.getEndpoints().isEmpty()) {
+ public ClusterConnection(OperationProcessor operationProcessor,
+ FeedParams feedParams,
+ ConnectionParams connectionParams,
+ Cluster cluster,
+ int clusterId,
+ int clientQueueSizePerCluster,
+ ScheduledThreadPoolExecutor timeoutExecutor) {
+ if (cluster.getEndpoints().isEmpty())
throw new IllegalArgumentException("Cannot feed to empty cluster.");
- }
- this.operationProcessor = operationProcessor;
+
this.clusterId = clusterId;
- final int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size()
- * connectionParams.getNumPersistentConnectionsPerEndpoint();
- if (totalNumberOfEndpointsInThisCluster == 0) {
- return;
- }
+ int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() * connectionParams.getNumPersistentConnectionsPerEndpoint();
+ if (totalNumberOfEndpointsInThisCluster == 0) return;
+
// Lower than 1 does not make any sense.
- final int maxInFlightPerSession = Math.max(
- 1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster);
+ int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster);
+
DocumentQueue documentQueue = null;
for (Endpoint endpoint : cluster.getEndpoints()) {
- final EndpointResultQueue endpointResultQueue = new EndpointResultQueue(
- operationProcessor,
- endpoint,
- clusterId,
- timeoutExecutor,
- feedParams.getServerTimeout(TimeUnit.MILLISECONDS)
- + feedParams.getClientTimeout(TimeUnit.MILLISECONDS));
+ EndpointResultQueue endpointResultQueue = new EndpointResultQueue(operationProcessor,
+ endpoint,
+ clusterId,
+ timeoutExecutor,
+ feedParams.getServerTimeout(TimeUnit.MILLISECONDS) + feedParams.getClientTimeout(TimeUnit.MILLISECONDS));
for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) {
GatewayConnection gatewayConnection;
if (connectionParams.isDryRun()) {
@@ -74,15 +63,14 @@ public class ClusterConnection implements AutoCloseable {
feedParams,
cluster.getRoute(),
connectionParams,
- new ApacheGatewayConnection.HttpClientFactory(
- connectionParams, endpoint.isUseSsl()),
+ new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()),
operationProcessor.getClientId()
);
}
if (documentQueue == null) {
documentQueue = new DocumentQueue(clientQueueSizePerCluster);
}
- final IOThread ioThread = new IOThread(
+ IOThread ioThread = new IOThread(
operationProcessor.getIoThreadGroup(),
endpointResultQueue,
gatewayConnection,
@@ -103,9 +91,9 @@ public class ClusterConnection implements AutoCloseable {
public void post(Document document) throws EndpointIOException {
String documentIdStr = document.getDocumentId();
- //the same document ID must always go to the same destination
+ // The same document ID must always go to the same destination
// In noHandshakeMode this has no effect as the documentQueue is shared between the IOThreads.
- int hash = documentIdStr.hashCode() & 0x7FFFFFFF; //strip sign bit
+ int hash = documentIdStr.hashCode() & 0x7FFFFFFF; // Strip sign bit
IOThread ioThread = ioThreads.get(hash % ioThreads.size());
try {
ioThread.post(document);
@@ -148,7 +136,7 @@ public class ClusterConnection implements AutoCloseable {
}
public String getStatsAsJSon() throws IOException {
- final StringWriter stringWriter = new StringWriter();
+ StringWriter stringWriter = new StringWriter();
JsonGenerator jsonGenerator = jsonFactory.createGenerator(stringWriter);
jsonGenerator.writeStartObject();
jsonGenerator.writeArrayFieldStart("session");
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 8c4ff3ae108..25b0ef20e81 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -24,7 +24,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * Class for handling asynchronous feeding of new documents and processing of results.
+ * Thread which feeds document operations asynchronously and processes the results.
*
* @author Einar M R Rosenvinge
*/
@@ -56,16 +56,15 @@ class IOThread implements Runnable, AutoCloseable {
private final AtomicInteger successfullHandshakes = new AtomicInteger(0);
private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);
- IOThread(
- ThreadGroup ioThreadGroup,
- EndpointResultQueue endpointResultQueue,
- GatewayConnection client,
- int clusterId,
- int maxChunkSizeBytes,
- int maxInFlightRequests,
- long localQueueTimeOut,
- DocumentQueue documentQueue,
- long maxSleepTimeMs) {
+ IOThread(ThreadGroup ioThreadGroup,
+ EndpointResultQueue endpointResultQueue,
+ GatewayConnection client,
+ int clusterId,
+ int maxChunkSizeBytes,
+ int maxInFlightRequests,
+ long localQueueTimeOut,
+ DocumentQueue documentQueue,
+ long maxSleepTimeMs) {
this.documentQueue = documentQueue;
this.endpoint = client.getEndpoint();
this.client = client;
@@ -85,27 +84,26 @@ class IOThread implements Runnable, AutoCloseable {
return endpoint;
}
- public static class ConnectionStats {
- public final int wrongSessionDetectedCounter;
- public final int wrongVersionDetectedCounter;
- public final int problemStatusCodeFromServerCounter;
- public final int executeProblemsCounter;
- public final int docsReceivedCounter;
- public final int statusReceivedCounter;
- public final int pendingDocumentStatusCount;
- public final int successfullHandshakes;
- public final int lastGatewayProcessTimeMillis;
-
- protected ConnectionStats(
- final int wrongSessionDetectedCounter,
- final int wrongVersionDetectedCounter,
- final int problemStatusCodeFromServerCounter,
- final int executeProblemsCounter,
- final int docsReceivedCounter,
- final int statusReceivedCounter,
- final int pendingDocumentStatusCount,
- final int successfullHandshakes,
- final int lastGatewayProcessTimeMillis) {
+ static class ConnectionStats {
+ final int wrongSessionDetectedCounter;
+ final int wrongVersionDetectedCounter;
+ final int problemStatusCodeFromServerCounter;
+ final int executeProblemsCounter;
+ final int docsReceivedCounter;
+ final int statusReceivedCounter;
+ final int pendingDocumentStatusCount;
+ final int successfullHandshakes;
+ final int lastGatewayProcessTimeMillis;
+
+ ConnectionStats(int wrongSessionDetectedCounter,
+ int wrongVersionDetectedCounter,
+ int problemStatusCodeFromServerCounter,
+ int executeProblemsCounter,
+ int docsReceivedCounter,
+ int statusReceivedCounter,
+ int pendingDocumentStatusCount,
+ int successfullHandshakes,
+ int lastGatewayProcessTimeMillis) {
this.wrongSessionDetectedCounter = wrongSessionDetectedCounter;
this.wrongVersionDetectedCounter = wrongVersionDetectedCounter;
this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter;
@@ -137,9 +135,7 @@ class IOThread implements Runnable, AutoCloseable {
@Override
public void close() {
documentQueue.close();
- if (stopSignal.getCount() == 0) {
- return;
- }
+ if (stopSignal.getCount() == 0) return;
stopSignal.countDown();
log.finer("Closed called.");
@@ -166,8 +162,7 @@ class IOThread implements Runnable, AutoCloseable {
log.fine("Session to " + endpoint + " closed.");
}
-
- public void post(final Document document) throws InterruptedException {
+ public void post(Document document) throws InterruptedException {
documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup);
}
@@ -178,7 +173,7 @@ class IOThread implements Runnable, AutoCloseable {
List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) {
- final List<Document> docsForSendChunk = new ArrayList<>();
+ List<Document> docsForSendChunk = new ArrayList<>();
int chunkSizeBytes = 0;
try {
drainFirstDocumentsInQueueIfOld();
@@ -214,8 +209,7 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private void markDocumentAsFailed(
- List<Document> docs, ServerResponseException servletException) {
+ private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) {
for (Document doc : docs) {
resultQueue.failOperation(
EndPointResultFactory.createTransientError(
@@ -223,8 +217,7 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private InputStream sendAndReceive(List<Document> docs)
- throws IOException, ServerResponseException {
+ private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException {
try {
// Post the new docs and get async responses for other posts.
return client.writeOperations(docs);
@@ -238,17 +231,19 @@ class IOThread implements Runnable, AutoCloseable {
}
private static class ProcessResponse {
+
private final int transitiveErrorCount;
private final int processResultsCount;
+
ProcessResponse(int transitiveErrorCount, int processResultsCount) {
this.transitiveErrorCount = transitiveErrorCount;
this.processResultsCount = processResultsCount;
}
+
}
private ProcessResponse processResponse(InputStream serverResponse) throws IOException {
- final Collection<EndpointResult> endpointResults =
- EndPointResultFactory.createResult(endpoint, serverResponse);
+ Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse);
statusReceivedCounter.addAndGet(endpointResults.size());
int transientErrors = 0;
for (EndpointResult endpointResult : endpointResults) {
@@ -271,9 +266,8 @@ class IOThread implements Runnable, AutoCloseable {
return processResponse;
}
- private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs)
- throws ServerResponseException, IOException {
- final int pendingResultQueueSize = resultQueue.getPendingSize();
+ private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) throws ServerResponseException, IOException {
+ int pendingResultQueueSize = resultQueue.getPendingSize();
pendingDocumentStatusCount.set(pendingResultQueueSize);
List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests)
@@ -387,9 +381,8 @@ class IOThread implements Runnable, AutoCloseable {
private void drainFirstDocumentsInQueueIfOld() {
while (true) {
Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut);
- if (! document.isPresent()) {
- return;
- }
+ if ( ! document.isPresent()) return;
+
EndpointResult endpointResult = EndPointResultFactory.createTransientError(
endpoint, document.get().getOperationId(),
new Exception("Not sending document operation, timed out in queue after "
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 45133901567..274edcf2047 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -252,7 +252,6 @@ public class OperationProcessor {
}
private void sendToClusters(Document document) {
-
synchronized (monitor) {
boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0;
docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc));