aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-http-client/src/main/java/com')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java24
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java6
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java28
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java11
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java85
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java3
7 files changed, 114 insertions, 56 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
index 6682f6ff1d0..2417a4acf71 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
@@ -42,6 +42,7 @@ public final class ConnectionParams {
private int maxRetries = 100;
private long minTimeBetweenRetriesMs = 700;
private boolean dryRun = false;
+ private boolean runThreads = true;
private int traceLevel = 0;
private int traceEveryXOperation = 0;
private boolean printTraceToStdErr = true;
@@ -191,10 +192,8 @@ public final class ConnectionParams {
}
/**
- * Don't send data to gateway, just pretend that everything is fine.
- *
- * @param dryRun true if enabled.
- * @return pointer to builder.
+ * Set to true to skip making network connections and instead
+ * let requests complete successfully with no effect.
*/
public Builder setDryRun(boolean dryRun) {
this.dryRun = dryRun;
@@ -202,6 +201,15 @@ public final class ConnectionParams {
}
/**
+ * Set to false to skip starting io threads, such that any operation must be driven by a calling thread.
+ * Useful for testing.
+ */
+ public Builder setRunThreads(boolean runThreads) {
+ this.runThreads = runThreads;
+ return this;
+ }
+
+ /**
* Set the min time between retries when temporarily failing against a gateway.
*
* @param minTimeBetweenRetries the min time value
@@ -274,6 +282,7 @@ public final class ConnectionParams {
maxRetries,
minTimeBetweenRetriesMs,
dryRun,
+ runThreads,
traceLevel,
traceEveryXOperation,
printTraceToStdErr,
@@ -293,6 +302,8 @@ public final class ConnectionParams {
return dryRun;
}
+ public boolean runThreads() { return runThreads; }
+
public int getMaxRetries() {
return maxRetries;
}
@@ -345,6 +356,7 @@ public final class ConnectionParams {
private final int maxRetries;
private final long minTimeBetweenRetriesMs;
private final boolean dryRun;
+ private final boolean runThreads;
private final int traceLevel;
private final int traceEveryXOperation;
private final boolean printTraceToStdErr;
@@ -364,6 +376,7 @@ public final class ConnectionParams {
int maxRetries,
long minTimeBetweenRetriesMs,
boolean dryRun,
+ boolean runThreads,
int traceLevel,
int traceEveryXOperation,
boolean printTraceToStdErr,
@@ -385,6 +398,7 @@ public final class ConnectionParams {
this.maxRetries = maxRetries;
this.minTimeBetweenRetriesMs = minTimeBetweenRetriesMs;
this.dryRun = dryRun;
+ this.runThreads = runThreads;
this.traceLevel = traceLevel;
this.traceEveryXOperation = traceEveryXOperation;
this.printTraceToStdErr = printTraceToStdErr;
@@ -436,6 +450,8 @@ public final class ConnectionParams {
return dryRun;
}
+ public boolean runThreads() { return runThreads; }
+
public int getTraceLevel() {
return traceLevel;
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
index 3131206f148..bf07e3ea634 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
@@ -141,13 +141,12 @@ public final class SessionParams {
private final ErrorReporter errorReport;
private int throttlerMinSize;
- private SessionParams(
- Collection<Cluster> clusters,
- FeedParams feedParams,
- ConnectionParams connectionParams,
- int clientQueueSize,
- ErrorReporter errorReporter,
- int throttlerMinSize) {
+ private SessionParams(Collection<Cluster> clusters,
+ FeedParams feedParams,
+ ConnectionParams connectionParams,
+ int clientQueueSize,
+ ErrorReporter errorReporter,
+ int throttlerMinSize) {
this.clusters = Collections.unmodifiableList(new ArrayList<>(clusters));
this.feedParams = feedParams;
this.connectionParams = connectionParams;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index a232ceeacf5..8e55e59b3f4 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -17,6 +17,7 @@ import java.io.StringWriter;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -93,6 +94,7 @@ public class ClusterConnection implements AutoCloseable {
documentQueue,
feedParams.getMaxSleepTimeMs(),
connectionParams.getConnectionTimeToLive(),
+ connectionParams.runThreads(),
idlePollFrequency,
clock);
ioThreads.add(ioThread);
@@ -167,6 +169,10 @@ public class ClusterConnection implements AutoCloseable {
return stringWriter.toString();
}
+ public List<IOThread> ioThreads() {
+ return Collections.unmodifiableList(ioThreads);
+ }
+
@Override
public boolean equals(Object o) {
return (this == o) || (o instanceof ClusterConnection && clusterId == ((ClusterConnection) o).clusterId);
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index cfd4c1003de..129fc000271 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -12,6 +12,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
@@ -26,6 +27,10 @@ public class DryRunGatewayConnection implements GatewayConnection {
private Instant connectionTime = null;
private Instant lastPollTime = null;
+ /** Set to true to hold off responding with a result to any incoming operations until this is set false */
+ private boolean hold = false;
+ private List<Document> held = new ArrayList<>();
+
public DryRunGatewayConnection(Endpoint endpoint, Clock clock) {
this.endpoint = endpoint;
this.clock = clock;
@@ -34,13 +39,23 @@ public class DryRunGatewayConnection implements GatewayConnection {
@Override
public InputStream write(List<Document> docs) {
StringBuilder result = new StringBuilder();
- for (Document doc : docs) {
- OperationStatus operationStatus = new OperationStatus("ok", doc.getOperationId(), ErrorCode.OK, false, "");
- result.append(operationStatus.render());
+ if (hold) {
+ held.addAll(docs);
+ }
+ else {
+ for (Document doc : held)
+ result.append(okResponse(doc).render());
+ held.clear();
+ for (Document doc : docs)
+ result.append(okResponse(doc).render());
}
return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8));
}
+ public void hold(boolean hold) {
+ this.hold = hold;
+ }
+
@Override
public InputStream poll() {
lastPollTime = clock.instant();
@@ -75,4 +90,11 @@ public class DryRunGatewayConnection implements GatewayConnection {
@Override
public void close() { }
+ /** Returns the document currently held in this */
+ public List<Document> held() { return Collections.unmodifiableList(held); }
+
+ private OperationStatus okResponse(Document document) {
+ return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, "");
+ }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
index c4ee2f58b65..1dd8b3bf3ec 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
@@ -33,12 +33,11 @@ class EndpointResultQueue {
private final ScheduledThreadPoolExecutor timer;
private final long totalTimeoutMs;
- EndpointResultQueue(
- OperationProcessor operationProcessor,
- Endpoint endpoint,
- int clusterId,
- ScheduledThreadPoolExecutor timer,
- long totalTimeoutMs) {
+ EndpointResultQueue(OperationProcessor operationProcessor,
+ Endpoint endpoint,
+ int clusterId,
+ ScheduledThreadPoolExecutor timer,
+ long totalTimeoutMs) {
this.operationProcessor = operationProcessor;
this.endpoint = endpoint;
this.clusterId = clusterId;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 99aff8b4baa..c7c94587640 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -18,6 +18,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -41,6 +42,8 @@ class IOThread implements Runnable, AutoCloseable {
private final GatewayConnectionFactory connectionFactory;
private final DocumentQueue documentQueue;
private final EndpointResultQueue resultQueue;
+
+ /** The thread running this, or null if it does not run a thread (meaning tick() must be called from the outside) */
private final Thread thread;
private final int clusterId;
private final CountDownLatch running = new CountDownLatch(1);
@@ -56,6 +59,7 @@ class IOThread implements Runnable, AutoCloseable {
private final Random random = new Random();
private GatewayConnection currentConnection;
+ private ConnectionState connectionState = ConnectionState.DISCONNECTED;
/**
* Previous connections on which we have sent operations and are still waiting for the result
@@ -87,6 +91,7 @@ class IOThread implements Runnable, AutoCloseable {
DocumentQueue documentQueue,
long maxSleepTimeMs,
Duration connectionTimeToLive,
+ boolean runThreads,
double idlePollFrequency,
Clock clock) {
this.endpoint = endpoint;
@@ -101,11 +106,18 @@ class IOThread implements Runnable, AutoCloseable {
this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
this.clock = clock;
- this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
- thread.setDaemon(true);
this.localQueueTimeOut = localQueueTimeOut;
- this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10);
- thread.start();
+ this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10).toMillis() > pollIntervalUS / 1000
+ ? localQueueTimeOut.dividedBy(10)
+ : Duration.ofMillis(pollIntervalUS / 1000);
+ if (runThreads) {
+ this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
+ thread.setDaemon(true);
+ thread.start();
+ }
+ else {
+ this.thread = null;
+ }
}
public Endpoint getEndpoint() {
@@ -173,7 +185,7 @@ class IOThread implements Runnable, AutoCloseable {
int chunkSizeBytes = 0;
try {
drainFirstDocumentsInQueueIfOld();
- Document doc = documentQueue.poll(maxWaitUnits, timeUnit);
+ Document doc = thread != null ? documentQueue.poll(maxWaitUnits, timeUnit) : documentQueue.poll();
if (doc != null) {
docsForSendChunk.add(doc);
chunkSizeBytes = doc.size();
@@ -371,18 +383,6 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private boolean isStale(GatewayConnection connection) {
- return connection.connectionTime() != null
- && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant());
- }
-
- private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
- if (currentConnectionState == ConnectionState.SESSION_SYNCED)
- oldConnections.add(currentConnection);
- currentConnection = connectionFactory.newConnection();
- return ConnectionState.DISCONNECTED;
- }
-
private void sleepIfProblemsGettingSyncedConnection(ConnectionState newState, ConnectionState oldState) {
if (newState == ConnectionState.SESSION_SYNCED) return;
if (newState == ConnectionState.CONNECTED && oldState == ConnectionState.DISCONNECTED) return;
@@ -397,17 +397,19 @@ class IOThread implements Runnable, AutoCloseable {
@Override
public void run() {
- ConnectionState connectionState = ConnectionState.DISCONNECTED;
- while (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) {
- ConnectionState oldState = connectionState;
- connectionState = cycle(connectionState);
- checkOldConnections();
- sleepIfProblemsGettingSyncedConnection(connectionState, oldState);
-
- }
+ while (stopSignal.getCount() > 0 || !documentQueue.isEmpty())
+ tick();
log.finer(toString() + " exiting, documentQueue.size()=" + documentQueue.size());
running.countDown();
+ }
+ /** Do one iteration of work. Should be called from the single worker thread of this. */
+ public void tick() {
+ ConnectionState oldState = connectionState;
+ connectionState = cycle(connectionState);
+ checkOldConnections();
+ if (thread != null)
+ sleepIfProblemsGettingSyncedConnection(connectionState, oldState);
}
private void drainFirstDocumentsInQueueIfOld() {
@@ -434,28 +436,33 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private void checkOldConnections() {
- if (resultQueue.getPendingSize() == 0) {
- oldConnections.forEach(c -> c.close());
- oldConnections.clear();
- return;
- }
+ private boolean isStale(GatewayConnection connection) {
+ return connection.connectionTime() != null
+ && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant());
+ }
+
+ private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
+ if (currentConnectionState == ConnectionState.SESSION_SYNCED)
+ oldConnections.add(currentConnection);
+ currentConnection = connectionFactory.newConnection();
+ return ConnectionState.DISCONNECTED;
+ }
+ private void checkOldConnections() {
for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) {
GatewayConnection connection = i.next();
if (closingTime(connection).isBefore(clock.instant())) {
connection.close();
- i.remove();;
+ i.remove();
}
else if (timeToPoll(connection)) {
try {
processResponse(connection.poll());
}
catch (Exception e) {
- // Old connection; this may be ok
+ // Old connection; best effort
}
}
-
}
}
@@ -471,8 +478,8 @@ class IOThread implements Runnable, AutoCloseable {
(double)localQueueTimeOut.toMillis();
if (newness < 0) return true; // connection retired prematurely
if (newness > 1) return false; // closing time reached
- Duration pollInterval = Duration.ofMillis(pollIntervalUS * 1000 +
- (long)(newness * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS * 1000)));
+ Duration pollInterval = Duration.ofMillis(pollIntervalUS / 1000 +
+ (long)((1 - newness) * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS / 1000)));
return connection.lastPollTime().plus(pollInterval).isBefore(clock.instant());
}
@@ -511,4 +518,10 @@ class IOThread implements Runnable, AutoCloseable {
}
}
+ /** For testing. Returns the current connection of this. Not thread safe. */
+ public GatewayConnection currentConnection() { return currentConnection; }
+
+ /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */
+ public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 702a316422d..90d07104fef 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -17,6 +17,7 @@ import java.math.BigInteger;
import java.security.SecureRandom;
import java.time.Clock;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -255,6 +256,8 @@ public class OperationProcessor {
}
}
+ public List<ClusterConnection> clusters() { return Collections.unmodifiableList(clusters); }
+
public String getStatsAsJson() {
return operationStats.getStatsAsJson();
}