diff options
author | Jon Bratseth <bratseth@verizonmedia.com> | 2019-07-03 10:25:54 -0700 |
---|---|---|
committer | Jon Bratseth <bratseth@verizonmedia.com> | 2019-07-03 10:25:54 -0700 |
commit | a38afd806ece1174f7dc3cabd9ca0ac22f5a5801 (patch) | |
tree | d49c683872582e08409bb258e381be337f074b26 /vespa-http-client | |
parent | 0a0f2cc1692fc9419accae1570d885a186380e3d (diff) |
Nonfunctional changes only
Diffstat (limited to 'vespa-http-client')
3 files changed, 67 insertions, 87 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index da45acc5687..560bbd536e5 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -8,7 +8,6 @@ import com.yahoo.vespa.http.client.config.Cluster; import com.yahoo.vespa.http.client.config.ConnectionParams; import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.SessionParams; import com.yahoo.vespa.http.client.core.Document; import com.yahoo.vespa.http.client.core.Exceptions; import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; @@ -25,45 +24,35 @@ import java.util.concurrent.TimeUnit; */ public class ClusterConnection implements AutoCloseable { - private final OperationProcessor operationProcessor; private final List<IOThread> ioThreads = new ArrayList<>(); private final int clusterId; - private final SessionParams.ErrorReporter errorReporter; private static JsonFactory jsonFactory = new JsonFactory(); private static ObjectMapper objectMapper = new ObjectMapper(); - public ClusterConnection( - OperationProcessor operationProcessor, - FeedParams feedParams, - ConnectionParams connectionParams, - SessionParams.ErrorReporter errorReporter, - Cluster cluster, - int clusterId, - int clientQueueSizePerCluster, - ScheduledThreadPoolExecutor timeoutExecutor) { - this.errorReporter = errorReporter; - if (cluster.getEndpoints().isEmpty()) { + public ClusterConnection(OperationProcessor operationProcessor, + FeedParams feedParams, + ConnectionParams connectionParams, + Cluster cluster, + int clusterId, + int clientQueueSizePerCluster, + ScheduledThreadPoolExecutor timeoutExecutor) { + if (cluster.getEndpoints().isEmpty()) throw new IllegalArgumentException("Cannot feed to empty cluster."); - } - this.operationProcessor = operationProcessor; + this.clusterId = clusterId; - final int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() - * connectionParams.getNumPersistentConnectionsPerEndpoint(); - if (totalNumberOfEndpointsInThisCluster == 0) { - return; - } + int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() * connectionParams.getNumPersistentConnectionsPerEndpoint(); + if (totalNumberOfEndpointsInThisCluster == 0) return; + // Lower than 1 does not make any sense. - final int maxInFlightPerSession = Math.max( - 1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster); + int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster); + DocumentQueue documentQueue = null; for (Endpoint endpoint : cluster.getEndpoints()) { - final EndpointResultQueue endpointResultQueue = new EndpointResultQueue( - operationProcessor, - endpoint, - clusterId, - timeoutExecutor, - feedParams.getServerTimeout(TimeUnit.MILLISECONDS) - + feedParams.getClientTimeout(TimeUnit.MILLISECONDS)); + EndpointResultQueue endpointResultQueue = new EndpointResultQueue(operationProcessor, + endpoint, + clusterId, + timeoutExecutor, + feedParams.getServerTimeout(TimeUnit.MILLISECONDS) + feedParams.getClientTimeout(TimeUnit.MILLISECONDS)); for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) { GatewayConnection gatewayConnection; if (connectionParams.isDryRun()) { @@ -74,15 +63,14 @@ public class ClusterConnection implements AutoCloseable { feedParams, cluster.getRoute(), connectionParams, - new ApacheGatewayConnection.HttpClientFactory( - connectionParams, endpoint.isUseSsl()), + new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()), operationProcessor.getClientId() ); } if (documentQueue == null) { documentQueue = new DocumentQueue(clientQueueSizePerCluster); } - final IOThread ioThread = new IOThread( + IOThread ioThread = new IOThread( operationProcessor.getIoThreadGroup(), endpointResultQueue, gatewayConnection, @@ -103,9 +91,9 @@ public class ClusterConnection implements AutoCloseable { public void post(Document document) throws EndpointIOException { String documentIdStr = document.getDocumentId(); - //the same document ID must always go to the same destination + // The same document ID must always go to the same destination // In noHandshakeMode this has no effect as the documentQueue is shared between the IOThreads. - int hash = documentIdStr.hashCode() & 0x7FFFFFFF; //strip sign bit + int hash = documentIdStr.hashCode() & 0x7FFFFFFF; // Strip sign bit IOThread ioThread = ioThreads.get(hash % ioThreads.size()); try { ioThread.post(document); @@ -148,7 +136,7 @@ public class ClusterConnection implements AutoCloseable { } public String getStatsAsJSon() throws IOException { - final StringWriter stringWriter = new StringWriter(); + StringWriter stringWriter = new StringWriter(); JsonGenerator jsonGenerator = jsonFactory.createGenerator(stringWriter); jsonGenerator.writeStartObject(); jsonGenerator.writeArrayFieldStart("session"); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 8c4ff3ae108..25b0ef20e81 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -24,7 +24,7 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * Class for handling asynchronous feeding of new documents and processing of results. + * Thread which feeds document operations asynchronously and processes the results. * * @author Einar M R Rosenvinge */ @@ -56,16 +56,15 @@ class IOThread implements Runnable, AutoCloseable { private final AtomicInteger successfullHandshakes = new AtomicInteger(0); private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0); - IOThread( - ThreadGroup ioThreadGroup, - EndpointResultQueue endpointResultQueue, - GatewayConnection client, - int clusterId, - int maxChunkSizeBytes, - int maxInFlightRequests, - long localQueueTimeOut, - DocumentQueue documentQueue, - long maxSleepTimeMs) { + IOThread(ThreadGroup ioThreadGroup, + EndpointResultQueue endpointResultQueue, + GatewayConnection client, + int clusterId, + int maxChunkSizeBytes, + int maxInFlightRequests, + long localQueueTimeOut, + DocumentQueue documentQueue, + long maxSleepTimeMs) { this.documentQueue = documentQueue; this.endpoint = client.getEndpoint(); this.client = client; @@ -85,27 +84,26 @@ class IOThread implements Runnable, AutoCloseable { return endpoint; } - public static class ConnectionStats { - public final int wrongSessionDetectedCounter; - public final int wrongVersionDetectedCounter; - public final int problemStatusCodeFromServerCounter; - public final int executeProblemsCounter; - public final int docsReceivedCounter; - public final int statusReceivedCounter; - public final int pendingDocumentStatusCount; - public final int successfullHandshakes; - public final int lastGatewayProcessTimeMillis; - - protected ConnectionStats( - final int wrongSessionDetectedCounter, - final int wrongVersionDetectedCounter, - final int problemStatusCodeFromServerCounter, - final int executeProblemsCounter, - final int docsReceivedCounter, - final int statusReceivedCounter, - final int pendingDocumentStatusCount, - final int successfullHandshakes, - final int lastGatewayProcessTimeMillis) { + static class ConnectionStats { + final int wrongSessionDetectedCounter; + final int wrongVersionDetectedCounter; + final int problemStatusCodeFromServerCounter; + final int executeProblemsCounter; + final int docsReceivedCounter; + final int statusReceivedCounter; + final int pendingDocumentStatusCount; + final int successfullHandshakes; + final int lastGatewayProcessTimeMillis; + + ConnectionStats(int wrongSessionDetectedCounter, + int wrongVersionDetectedCounter, + int problemStatusCodeFromServerCounter, + int executeProblemsCounter, + int docsReceivedCounter, + int statusReceivedCounter, + int pendingDocumentStatusCount, + int successfullHandshakes, + int lastGatewayProcessTimeMillis) { this.wrongSessionDetectedCounter = wrongSessionDetectedCounter; this.wrongVersionDetectedCounter = wrongVersionDetectedCounter; this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter; @@ -137,9 +135,7 @@ class IOThread implements Runnable, AutoCloseable { @Override public void close() { documentQueue.close(); - if (stopSignal.getCount() == 0) { - return; - } + if (stopSignal.getCount() == 0) return; stopSignal.countDown(); log.finer("Closed called."); @@ -166,8 +162,7 @@ class IOThread implements Runnable, AutoCloseable { log.fine("Session to " + endpoint + " closed."); } - - public void post(final Document document) throws InterruptedException { + public void post(Document document) throws InterruptedException { documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup); } @@ -178,7 +173,7 @@ class IOThread implements Runnable, AutoCloseable { List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) { - final List<Document> docsForSendChunk = new ArrayList<>(); + List<Document> docsForSendChunk = new ArrayList<>(); int chunkSizeBytes = 0; try { drainFirstDocumentsInQueueIfOld(); @@ -214,8 +209,7 @@ class IOThread implements Runnable, AutoCloseable { } } - private void markDocumentAsFailed( - List<Document> docs, ServerResponseException servletException) { + private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) { for (Document doc : docs) { resultQueue.failOperation( EndPointResultFactory.createTransientError( @@ -223,8 +217,7 @@ class IOThread implements Runnable, AutoCloseable { } } - private InputStream sendAndReceive(List<Document> docs) - throws IOException, ServerResponseException { + private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException { try { // Post the new docs and get async responses for other posts. return client.writeOperations(docs); @@ -238,17 +231,19 @@ class IOThread implements Runnable, AutoCloseable { } private static class ProcessResponse { + private final int transitiveErrorCount; private final int processResultsCount; + ProcessResponse(int transitiveErrorCount, int processResultsCount) { this.transitiveErrorCount = transitiveErrorCount; this.processResultsCount = processResultsCount; } + } private ProcessResponse processResponse(InputStream serverResponse) throws IOException { - final Collection<EndpointResult> endpointResults = - EndPointResultFactory.createResult(endpoint, serverResponse); + Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse); statusReceivedCounter.addAndGet(endpointResults.size()); int transientErrors = 0; for (EndpointResult endpointResult : endpointResults) { @@ -271,9 +266,8 @@ class IOThread implements Runnable, AutoCloseable { return processResponse; } - private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) - throws ServerResponseException, IOException { - final int pendingResultQueueSize = resultQueue.getPendingSize(); + private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) throws ServerResponseException, IOException { + int pendingResultQueueSize = resultQueue.getPendingSize(); pendingDocumentStatusCount.set(pendingResultQueueSize); List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests) @@ -387,9 +381,8 @@ class IOThread implements Runnable, AutoCloseable { private void drainFirstDocumentsInQueueIfOld() { while (true) { Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut); - if (! document.isPresent()) { - return; - } + if ( ! document.isPresent()) return; + EndpointResult endpointResult = EndPointResultFactory.createTransientError( endpoint, document.get().getOperationId(), new Exception("Not sending document operation, timed out in queue after " diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 45133901567..274edcf2047 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -252,7 +252,6 @@ public class OperationProcessor { } private void sendToClusters(Document document) { - synchronized (monitor) { boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0; docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc)); |