summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-04-15 16:18:06 +0200
committerJon Bratseth <bratseth@gmail.com>2020-04-15 16:18:06 +0200
commit3881d6283dc36646f958da16b948cbb2affec845 (patch)
tree3a2dfb7436e6d1c0c208eb10b0e5c75e98141ccb /vespa-http-client
parentf6b90d5e48dfc894bb7c5299522f5cea0ec0c658 (diff)
The document queue is always shared
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java15
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java14
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java36
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java5
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java5
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java28
6 files changed, 51 insertions, 52 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
index e0929e445a6..473b9494ba4 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
@@ -94,7 +94,10 @@ public class Result {
this.traceMessage = null;
}
- /** Returns the endpoint from which the result was received. */
+ /**
+ * Returns the endpoint from which the result was received,
+ * or null if this failed before being assigned an endpoint
+ */
public Endpoint getEndpoint() {
return endpoint;
}
@@ -124,16 +127,16 @@ public class Result {
StringBuilder b = new StringBuilder();
b.append("Detail ");
b.append("resultType=").append(resultType);
- if (exception != null) {
+ if (exception != null)
b.append(" exception='").append(Exceptions.toMessageString(exception)).append("'");
- }
- if (traceMessage != null && ! traceMessage.isEmpty()) {
+ if (traceMessage != null && ! traceMessage.isEmpty())
b.append(" trace='").append(traceMessage).append("'");
- }
- b.append(" endpoint=").append(endpoint);
+ if (endpoint != null)
+ b.append(" endpoint=").append(endpoint);
b.append(" resultTimeLocally=").append(timeStampMillis).append("\n");
return b.toString();
}
+
}
@Override
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java
index 4707cdb705c..fccb79c77b4 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java
@@ -13,9 +13,7 @@ import java.util.List;
*/
public final class Cluster {
- /**
- * Builder for {@link Cluster}.
- */
+ /** Builder for {@link Cluster}. */
public final static class Builder {
private final List<Endpoint> endpoints = new LinkedList<>();
private String route = null;
@@ -23,8 +21,8 @@ public final class Cluster {
/**
* Adds an Endpoint (a HTTP gateway) to this Cluster.
*
- * @param endpoint the Endpoint to add.
- * @return this, for chaining.
+ * @param endpoint the Endpoint to add
+ * @return this, for chaining
*/
public Builder addEndpoint(Endpoint endpoint) {
endpoints.add(endpoint);
@@ -34,8 +32,8 @@ public final class Cluster {
/**
* Sets a route specific to this cluster, which overrides the route set in {@link com.yahoo.vespa.http.client.config.FeedParams#getRoute()}.
*
- * @param route a route specific to this cluster.
- * @return this, for chaining.
+ * @param route a route specific to this cluster
+ * @return this, for chaining
*/
public Builder setRoute(String route) {
this.route = route;
@@ -68,7 +66,7 @@ public final class Cluster {
@Override
public String toString() {
- return "Cluster " + endpoints + ", route " + route;
+ return "cluster with endpoints " + endpoints + " and route '" + route + "'";
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index 67f84231606..98755320d74 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -24,11 +24,19 @@ import java.util.concurrent.TimeUnit;
*/
public class ClusterConnection implements AutoCloseable {
- private final List<IOThread> ioThreads = new ArrayList<>();
- private final int clusterId;
private static final JsonFactory jsonFactory = new JsonFactory();
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private final List<IOThread> ioThreads = new ArrayList<>();
+ private final int clusterId;
+ private final ThreadGroup ioThreadGroup;
+
+ /** The shared queue of document operations the io threads will take from */
+ private final DocumentQueue documentQueue;
+
+ /** The single endpoint this sends to, or null if it will send to multiple endpoints */
+ private final Endpoint singleEndpoint;
+
public ClusterConnection(OperationProcessor operationProcessor,
FeedParams feedParams,
ConnectionParams connectionParams,
@@ -37,16 +45,17 @@ public class ClusterConnection implements AutoCloseable {
int clientQueueSizePerCluster,
ScheduledThreadPoolExecutor timeoutExecutor) {
if (cluster.getEndpoints().isEmpty())
- throw new IllegalArgumentException("Cannot feed to empty cluster.");
+ throw new IllegalArgumentException("At least a single endpoint is required in " + cluster);
this.clusterId = clusterId;
int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() * connectionParams.getNumPersistentConnectionsPerEndpoint();
- if (totalNumberOfEndpointsInThisCluster == 0) return;
-
- // Lower than 1 does not make any sense.
+ if (totalNumberOfEndpointsInThisCluster == 0)
+ throw new IllegalArgumentException("At least 1 persistent connection per endpoint is required in " + cluster);
int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster);
- DocumentQueue documentQueue = null;
+ documentQueue = new DocumentQueue(clientQueueSizePerCluster);
+ ioThreadGroup = operationProcessor.getIoThreadGroup();
+ singleEndpoint = cluster.getEndpoints().size() == 1 ? cluster.getEndpoints().get(0) : null;
for (Endpoint endpoint : cluster.getEndpoints()) {
EndpointResultQueue endpointResultQueue = new EndpointResultQueue(operationProcessor,
endpoint,
@@ -66,9 +75,6 @@ public class ClusterConnection implements AutoCloseable {
operationProcessor.getClientId()
);
}
- if (documentQueue == null) {
- documentQueue = new DocumentQueue(clientQueueSizePerCluster);
- }
IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(),
endpointResultQueue,
gatewayConnection,
@@ -88,14 +94,10 @@ public class ClusterConnection implements AutoCloseable {
}
public void post(Document document) throws EndpointIOException {
- // The same document ID must always go to the same destination
- // In noHandshakeMode this has no effect as the documentQueue is shared between the IOThreads.
- int hash = document.getDocumentId().hashCode() & 0x7FFFFFFF; // Strip sign bit
- IOThread ioThread = ioThreads.get(hash % ioThreads.size());
try {
- ioThread.post(document);
- } catch (Throwable t) {
- throw new EndpointIOException(ioThread.getEndpoint(), "While sending", t);
+ documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup);
+ } catch (Throwable t) { // InterruptedException if shutting down, IllegalStateException if already shut down
+ throw new EndpointIOException(singleEndpoint, "While sending", t);
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java
index ae15f6ec22b..f9279f429ff 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java
@@ -20,8 +20,7 @@ public class EndpointIOException extends IOException {
this.endpoint = endpoint;
}
- public Endpoint getEndpoint() {
- return endpoint;
- }
+ /** Returns the endpoint, or the failure occurred before this was assigned to a unique endpoint */
+ public Endpoint getEndpoint() { return endpoint; }
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 5f3348557a9..b7638ff0967 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -36,7 +36,6 @@ class IOThread implements Runnable, AutoCloseable {
private final DocumentQueue documentQueue;
private final EndpointResultQueue resultQueue;
private final Thread thread;
- private final ThreadGroup ioThreadGroup;
private final int clusterId;
private final CountDownLatch running = new CountDownLatch(1);
private final CountDownLatch stopSignal = new CountDownLatch(1);
@@ -74,7 +73,6 @@ class IOThread implements Runnable, AutoCloseable {
this.maxInFlightRequests = maxInFlightRequests;
this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
- this.ioThreadGroup = ioThreadGroup;
thread.setDaemon(true);
this.localQueueTimeOut = localQueueTimeOut;
thread.start();
@@ -165,8 +163,9 @@ class IOThread implements Runnable, AutoCloseable {
log.fine("Session to " + endpoint + " closed.");
}
+ /** For testing only */
public void post(Document document) throws InterruptedException {
- documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup);
+ documentQueue.put(document, true);
}
@Override
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
index 95df465c7ca..205153a7a00 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
@@ -38,30 +38,28 @@ public final class EndPointResultFactory {
return results;
}
- public static EndpointResult createError(
- Endpoint endpoint, String operationId, Exception exception) {
- return new EndpointResult(operationId, new Result.Detail(
- endpoint, Result.ResultType.FATAL_ERROR, null, exception));
+ public static EndpointResult createError(Endpoint endpoint, String operationId, Exception exception) {
+ return new EndpointResult(operationId, new Result.Detail(endpoint,
+ Result.ResultType.FATAL_ERROR,
+ null,
+ exception));
}
- public static EndpointResult createTransientError(
- Endpoint endpoint, String operationId, Exception exception) {
- return new EndpointResult(operationId, new Result.Detail(
- endpoint, Result.ResultType.TRANSITIVE_ERROR, null, exception));
+ public static EndpointResult createTransientError(Endpoint endpoint, String operationId, Exception exception) {
+ return new EndpointResult(operationId, new Result.Detail(endpoint,
+ Result.ResultType.TRANSITIVE_ERROR,
+ null,
+ exception));
}
private static Result.ResultType replyToResultType(OperationStatus reply) {
- final Result.ResultType resultType;
// The ordering below is important, e.g. if success, it is never a transient error even if isTransient is true.
- if (reply.errorCode.isSuccess()) {
+ if (reply.errorCode.isSuccess())
return Result.ResultType.OPERATION_EXECUTED;
- }
- if (reply.isConditionNotMet) {
+ if (reply.isConditionNotMet)
return Result.ResultType.CONDITION_NOT_MET;
- }
- if (reply.errorCode.isTransient()) {
+ if (reply.errorCode.isTransient())
return Result.ResultType.TRANSITIVE_ERROR;
- }
return Result.ResultType.FATAL_ERROR;
}