diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-04-15 16:18:06 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-04-15 16:18:06 +0200 |
commit | 3881d6283dc36646f958da16b948cbb2affec845 (patch) | |
tree | 3a2dfb7436e6d1c0c208eb10b0e5c75e98141ccb /vespa-http-client/src | |
parent | f6b90d5e48dfc894bb7c5299522f5cea0ec0c658 (diff) |
The document queue is always shared
Diffstat (limited to 'vespa-http-client/src')
6 files changed, 51 insertions, 52 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java index e0929e445a6..473b9494ba4 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java @@ -94,7 +94,10 @@ public class Result { this.traceMessage = null; } - /** Returns the endpoint from which the result was received. */ + /** + * Returns the endpoint from which the result was received, + * or null if this failed before being assigned an endpoint + */ public Endpoint getEndpoint() { return endpoint; } @@ -124,16 +127,16 @@ public class Result { StringBuilder b = new StringBuilder(); b.append("Detail "); b.append("resultType=").append(resultType); - if (exception != null) { + if (exception != null) b.append(" exception='").append(Exceptions.toMessageString(exception)).append("'"); - } - if (traceMessage != null && ! traceMessage.isEmpty()) { + if (traceMessage != null && ! traceMessage.isEmpty()) b.append(" trace='").append(traceMessage).append("'"); - } - b.append(" endpoint=").append(endpoint); + if (endpoint != null) + b.append(" endpoint=").append(endpoint); b.append(" resultTimeLocally=").append(timeStampMillis).append("\n"); return b.toString(); } + } @Override diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java index 4707cdb705c..fccb79c77b4 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java @@ -13,9 +13,7 @@ import java.util.List; */ public final class Cluster { - /** - * Builder for {@link Cluster}. - */ + /** Builder for {@link Cluster}. */ public final static class Builder { private final List<Endpoint> endpoints = new LinkedList<>(); private String route = null; @@ -23,8 +21,8 @@ public final class Cluster { /** * Adds an Endpoint (a HTTP gateway) to this Cluster. * - * @param endpoint the Endpoint to add. - * @return this, for chaining. + * @param endpoint the Endpoint to add + * @return this, for chaining */ public Builder addEndpoint(Endpoint endpoint) { endpoints.add(endpoint); @@ -34,8 +32,8 @@ public final class Cluster { /** * Sets a route specific to this cluster, which overrides the route set in {@link com.yahoo.vespa.http.client.config.FeedParams#getRoute()}. * - * @param route a route specific to this cluster. - * @return this, for chaining. + * @param route a route specific to this cluster + * @return this, for chaining */ public Builder setRoute(String route) { this.route = route; @@ -68,7 +66,7 @@ public final class Cluster { @Override public String toString() { - return "Cluster " + endpoints + ", route " + route; + return "cluster with endpoints " + endpoints + " and route '" + route + "'"; } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index 67f84231606..98755320d74 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -24,11 +24,19 @@ import java.util.concurrent.TimeUnit; */ public class ClusterConnection implements AutoCloseable { - private final List<IOThread> ioThreads = new ArrayList<>(); - private final int clusterId; private static final JsonFactory jsonFactory = new JsonFactory(); private static final ObjectMapper objectMapper = new ObjectMapper(); + private final List<IOThread> ioThreads = new ArrayList<>(); + private final int clusterId; + private final ThreadGroup ioThreadGroup; + + /** The shared queue of document operations the io threads will take from */ + private final DocumentQueue documentQueue; + + /** The single endpoint this sends to, or null if it will send to multiple endpoints */ + private final Endpoint singleEndpoint; + public ClusterConnection(OperationProcessor operationProcessor, FeedParams feedParams, ConnectionParams connectionParams, @@ -37,16 +45,17 @@ public class ClusterConnection implements AutoCloseable { int clientQueueSizePerCluster, ScheduledThreadPoolExecutor timeoutExecutor) { if (cluster.getEndpoints().isEmpty()) - throw new IllegalArgumentException("Cannot feed to empty cluster."); + throw new IllegalArgumentException("At least a single endpoint is required in " + cluster); this.clusterId = clusterId; int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() * connectionParams.getNumPersistentConnectionsPerEndpoint(); - if (totalNumberOfEndpointsInThisCluster == 0) return; - - // Lower than 1 does not make any sense. + if (totalNumberOfEndpointsInThisCluster == 0) + throw new IllegalArgumentException("At least 1 persistent connection per endpoint is required in " + cluster); int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster); - DocumentQueue documentQueue = null; + documentQueue = new DocumentQueue(clientQueueSizePerCluster); + ioThreadGroup = operationProcessor.getIoThreadGroup(); + singleEndpoint = cluster.getEndpoints().size() == 1 ? cluster.getEndpoints().get(0) : null; for (Endpoint endpoint : cluster.getEndpoints()) { EndpointResultQueue endpointResultQueue = new EndpointResultQueue(operationProcessor, endpoint, @@ -66,9 +75,6 @@ public class ClusterConnection implements AutoCloseable { operationProcessor.getClientId() ); } - if (documentQueue == null) { - documentQueue = new DocumentQueue(clientQueueSizePerCluster); - } IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(), endpointResultQueue, gatewayConnection, @@ -88,14 +94,10 @@ public class ClusterConnection implements AutoCloseable { } public void post(Document document) throws EndpointIOException { - // The same document ID must always go to the same destination - // In noHandshakeMode this has no effect as the documentQueue is shared between the IOThreads. - int hash = document.getDocumentId().hashCode() & 0x7FFFFFFF; // Strip sign bit - IOThread ioThread = ioThreads.get(hash % ioThreads.size()); try { - ioThread.post(document); - } catch (Throwable t) { - throw new EndpointIOException(ioThread.getEndpoint(), "While sending", t); + documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup); + } catch (Throwable t) { // InterruptedException if shutting down, IllegalStateException if already shut down + throw new EndpointIOException(singleEndpoint, "While sending", t); } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java index ae15f6ec22b..f9279f429ff 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java @@ -20,8 +20,7 @@ public class EndpointIOException extends IOException { this.endpoint = endpoint; } - public Endpoint getEndpoint() { - return endpoint; - } + /** Returns the endpoint, or the failure occurred before this was assigned to a unique endpoint */ + public Endpoint getEndpoint() { return endpoint; } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 5f3348557a9..b7638ff0967 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -36,7 +36,6 @@ class IOThread implements Runnable, AutoCloseable { private final DocumentQueue documentQueue; private final EndpointResultQueue resultQueue; private final Thread thread; - private final ThreadGroup ioThreadGroup; private final int clusterId; private final CountDownLatch running = new CountDownLatch(1); private final CountDownLatch stopSignal = new CountDownLatch(1); @@ -74,7 +73,6 @@ class IOThread implements Runnable, AutoCloseable { this.maxInFlightRequests = maxInFlightRequests; this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs); this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); - this.ioThreadGroup = ioThreadGroup; thread.setDaemon(true); this.localQueueTimeOut = localQueueTimeOut; thread.start(); @@ -165,8 +163,9 @@ class IOThread implements Runnable, AutoCloseable { log.fine("Session to " + endpoint + " closed."); } + /** For testing only */ public void post(Document document) throws InterruptedException { - documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup); + documentQueue.put(document, true); } @Override diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java index 95df465c7ca..205153a7a00 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java @@ -38,30 +38,28 @@ public final class EndPointResultFactory { return results; } - public static EndpointResult createError( - Endpoint endpoint, String operationId, Exception exception) { - return new EndpointResult(operationId, new Result.Detail( - endpoint, Result.ResultType.FATAL_ERROR, null, exception)); + public static EndpointResult createError(Endpoint endpoint, String operationId, Exception exception) { + return new EndpointResult(operationId, new Result.Detail(endpoint, + Result.ResultType.FATAL_ERROR, + null, + exception)); } - public static EndpointResult createTransientError( - Endpoint endpoint, String operationId, Exception exception) { - return new EndpointResult(operationId, new Result.Detail( - endpoint, Result.ResultType.TRANSITIVE_ERROR, null, exception)); + public static EndpointResult createTransientError(Endpoint endpoint, String operationId, Exception exception) { + return new EndpointResult(operationId, new Result.Detail(endpoint, + Result.ResultType.TRANSITIVE_ERROR, + null, + exception)); } private static Result.ResultType replyToResultType(OperationStatus reply) { - final Result.ResultType resultType; // The ordering below is important, e.g. if success, it is never a transient error even if isTransient is true. - if (reply.errorCode.isSuccess()) { + if (reply.errorCode.isSuccess()) return Result.ResultType.OPERATION_EXECUTED; - } - if (reply.isConditionNotMet) { + if (reply.isConditionNotMet) return Result.ResultType.CONDITION_NOT_MET; - } - if (reply.errorCode.isTransient()) { + if (reply.errorCode.isTransient()) return Result.ResultType.TRANSITIVE_ERROR; - } return Result.ResultType.FATAL_ERROR; } |