diff options
Diffstat (limited to 'vespa-http-client/src/main')
7 files changed, 114 insertions, 56 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java index 6682f6ff1d0..2417a4acf71 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java @@ -42,6 +42,7 @@ public final class ConnectionParams { private int maxRetries = 100; private long minTimeBetweenRetriesMs = 700; private boolean dryRun = false; + private boolean runThreads = true; private int traceLevel = 0; private int traceEveryXOperation = 0; private boolean printTraceToStdErr = true; @@ -191,10 +192,8 @@ public final class ConnectionParams { } /** - * Don't send data to gateway, just pretend that everything is fine. - * - * @param dryRun true if enabled. - * @return pointer to builder. + * Set to true to skip making network connections and instead + * let requests complete successfully with no effect. */ public Builder setDryRun(boolean dryRun) { this.dryRun = dryRun; @@ -202,6 +201,15 @@ public final class ConnectionParams { } /** + * Set to false to skip starting io threads, such that any operation must be driven by a calling thread. + * Useful for testing. + */ + public Builder setRunThreads(boolean runThreads) { + this.runThreads = runThreads; + return this; + } + + /** * Set the min time between retries when temporarily failing against a gateway. * * @param minTimeBetweenRetries the min time value @@ -274,6 +282,7 @@ public final class ConnectionParams { maxRetries, minTimeBetweenRetriesMs, dryRun, + runThreads, traceLevel, traceEveryXOperation, printTraceToStdErr, @@ -293,6 +302,8 @@ public final class ConnectionParams { return dryRun; } + public boolean runThreads() { return runThreads; } + public int getMaxRetries() { return maxRetries; } @@ -345,6 +356,7 @@ public final class ConnectionParams { private final int maxRetries; private final long minTimeBetweenRetriesMs; private final boolean dryRun; + private final boolean runThreads; private final int traceLevel; private final int traceEveryXOperation; private final boolean printTraceToStdErr; @@ -364,6 +376,7 @@ public final class ConnectionParams { int maxRetries, long minTimeBetweenRetriesMs, boolean dryRun, + boolean runThreads, int traceLevel, int traceEveryXOperation, boolean printTraceToStdErr, @@ -385,6 +398,7 @@ public final class ConnectionParams { this.maxRetries = maxRetries; this.minTimeBetweenRetriesMs = minTimeBetweenRetriesMs; this.dryRun = dryRun; + this.runThreads = runThreads; this.traceLevel = traceLevel; this.traceEveryXOperation = traceEveryXOperation; this.printTraceToStdErr = printTraceToStdErr; @@ -436,6 +450,8 @@ public final class ConnectionParams { return dryRun; } + public boolean runThreads() { return runThreads; } + public int getTraceLevel() { return traceLevel; } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java index 3131206f148..bf07e3ea634 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java @@ -141,13 +141,12 @@ public final class SessionParams { private final ErrorReporter errorReport; private int throttlerMinSize; - private SessionParams( - Collection<Cluster> clusters, - FeedParams feedParams, - ConnectionParams connectionParams, - int clientQueueSize, - ErrorReporter errorReporter, - int throttlerMinSize) { + private SessionParams(Collection<Cluster> clusters, + FeedParams feedParams, + ConnectionParams connectionParams, + int clientQueueSize, + ErrorReporter errorReporter, + int throttlerMinSize) { this.clusters = Collections.unmodifiableList(new ArrayList<>(clusters)); this.feedParams = feedParams; this.connectionParams = connectionParams; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index a232ceeacf5..8e55e59b3f4 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -17,6 +17,7 @@ import java.io.StringWriter; import java.time.Clock; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -93,6 +94,7 @@ public class ClusterConnection implements AutoCloseable { documentQueue, feedParams.getMaxSleepTimeMs(), connectionParams.getConnectionTimeToLive(), + connectionParams.runThreads(), idlePollFrequency, clock); ioThreads.add(ioThread); @@ -167,6 +169,10 @@ public class ClusterConnection implements AutoCloseable { return stringWriter.toString(); } + public List<IOThread> ioThreads() { + return Collections.unmodifiableList(ioThreads); + } + @Override public boolean equals(Object o) { return (this == o) || (o instanceof ClusterConnection && clusterId == ((ClusterConnection) o).clusterId); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index cfd4c1003de..129fc000271 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -12,6 +12,7 @@ import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -26,6 +27,10 @@ public class DryRunGatewayConnection implements GatewayConnection { private Instant connectionTime = null; private Instant lastPollTime = null; + /** Set to true to hold off responding with a result to any incoming operations until this is set false */ + private boolean hold = false; + private List<Document> held = new ArrayList<>(); + public DryRunGatewayConnection(Endpoint endpoint, Clock clock) { this.endpoint = endpoint; this.clock = clock; @@ -34,13 +39,23 @@ public class DryRunGatewayConnection implements GatewayConnection { @Override public InputStream write(List<Document> docs) { StringBuilder result = new StringBuilder(); - for (Document doc : docs) { - OperationStatus operationStatus = new OperationStatus("ok", doc.getOperationId(), ErrorCode.OK, false, ""); - result.append(operationStatus.render()); + if (hold) { + held.addAll(docs); + } + else { + for (Document doc : held) + result.append(okResponse(doc).render()); + held.clear(); + for (Document doc : docs) + result.append(okResponse(doc).render()); } return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8)); } + public void hold(boolean hold) { + this.hold = hold; + } + @Override public InputStream poll() { lastPollTime = clock.instant(); @@ -75,4 +90,11 @@ public class DryRunGatewayConnection implements GatewayConnection { @Override public void close() { } + /** Returns the document currently held in this */ + public List<Document> held() { return Collections.unmodifiableList(held); } + + private OperationStatus okResponse(Document document) { + return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, ""); + } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java index c4ee2f58b65..1dd8b3bf3ec 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java @@ -33,12 +33,11 @@ class EndpointResultQueue { private final ScheduledThreadPoolExecutor timer; private final long totalTimeoutMs; - EndpointResultQueue( - OperationProcessor operationProcessor, - Endpoint endpoint, - int clusterId, - ScheduledThreadPoolExecutor timer, - long totalTimeoutMs) { + EndpointResultQueue(OperationProcessor operationProcessor, + Endpoint endpoint, + int clusterId, + ScheduledThreadPoolExecutor timer, + long totalTimeoutMs) { this.operationProcessor = operationProcessor; this.endpoint = endpoint; this.clusterId = clusterId; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 99aff8b4baa..c7c94587640 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -41,6 +42,8 @@ class IOThread implements Runnable, AutoCloseable { private final GatewayConnectionFactory connectionFactory; private final DocumentQueue documentQueue; private final EndpointResultQueue resultQueue; + + /** The thread running this, or null if it does not run a thread (meaning tick() must be called from the outside) */ private final Thread thread; private final int clusterId; private final CountDownLatch running = new CountDownLatch(1); @@ -56,6 +59,7 @@ class IOThread implements Runnable, AutoCloseable { private final Random random = new Random(); private GatewayConnection currentConnection; + private ConnectionState connectionState = ConnectionState.DISCONNECTED; /** * Previous connections on which we have sent operations and are still waiting for the result @@ -87,6 +91,7 @@ class IOThread implements Runnable, AutoCloseable { DocumentQueue documentQueue, long maxSleepTimeMs, Duration connectionTimeToLive, + boolean runThreads, double idlePollFrequency, Clock clock) { this.endpoint = endpoint; @@ -101,11 +106,18 @@ class IOThread implements Runnable, AutoCloseable { this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs); this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s] this.clock = clock; - this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); - thread.setDaemon(true); this.localQueueTimeOut = localQueueTimeOut; - this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10); - thread.start(); + this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10).toMillis() > pollIntervalUS / 1000 + ? localQueueTimeOut.dividedBy(10) + : Duration.ofMillis(pollIntervalUS / 1000); + if (runThreads) { + this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); + thread.setDaemon(true); + thread.start(); + } + else { + this.thread = null; + } } public Endpoint getEndpoint() { @@ -173,7 +185,7 @@ class IOThread implements Runnable, AutoCloseable { int chunkSizeBytes = 0; try { drainFirstDocumentsInQueueIfOld(); - Document doc = documentQueue.poll(maxWaitUnits, timeUnit); + Document doc = thread != null ? documentQueue.poll(maxWaitUnits, timeUnit) : documentQueue.poll(); if (doc != null) { docsForSendChunk.add(doc); chunkSizeBytes = doc.size(); @@ -371,18 +383,6 @@ class IOThread implements Runnable, AutoCloseable { } } - private boolean isStale(GatewayConnection connection) { - return connection.connectionTime() != null - && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant()); - } - - private ConnectionState refreshConnection(ConnectionState currentConnectionState) { - if (currentConnectionState == ConnectionState.SESSION_SYNCED) - oldConnections.add(currentConnection); - currentConnection = connectionFactory.newConnection(); - return ConnectionState.DISCONNECTED; - } - private void sleepIfProblemsGettingSyncedConnection(ConnectionState newState, ConnectionState oldState) { if (newState == ConnectionState.SESSION_SYNCED) return; if (newState == ConnectionState.CONNECTED && oldState == ConnectionState.DISCONNECTED) return; @@ -397,17 +397,19 @@ class IOThread implements Runnable, AutoCloseable { @Override public void run() { - ConnectionState connectionState = ConnectionState.DISCONNECTED; - while (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) { - ConnectionState oldState = connectionState; - connectionState = cycle(connectionState); - checkOldConnections(); - sleepIfProblemsGettingSyncedConnection(connectionState, oldState); - - } + while (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) + tick(); log.finer(toString() + " exiting, documentQueue.size()=" + documentQueue.size()); running.countDown(); + } + /** Do one iteration of work. Should be called from the single worker thread of this. */ + public void tick() { + ConnectionState oldState = connectionState; + connectionState = cycle(connectionState); + checkOldConnections(); + if (thread != null) + sleepIfProblemsGettingSyncedConnection(connectionState, oldState); } private void drainFirstDocumentsInQueueIfOld() { @@ -434,28 +436,33 @@ class IOThread implements Runnable, AutoCloseable { } } - private void checkOldConnections() { - if (resultQueue.getPendingSize() == 0) { - oldConnections.forEach(c -> c.close()); - oldConnections.clear(); - return; - } + private boolean isStale(GatewayConnection connection) { + return connection.connectionTime() != null + && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant()); + } + + private ConnectionState refreshConnection(ConnectionState currentConnectionState) { + if (currentConnectionState == ConnectionState.SESSION_SYNCED) + oldConnections.add(currentConnection); + currentConnection = connectionFactory.newConnection(); + return ConnectionState.DISCONNECTED; + } + private void checkOldConnections() { for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) { GatewayConnection connection = i.next(); if (closingTime(connection).isBefore(clock.instant())) { connection.close(); - i.remove();; + i.remove(); } else if (timeToPoll(connection)) { try { processResponse(connection.poll()); } catch (Exception e) { - // Old connection; this may be ok + // Old connection; best effort } } - } } @@ -471,8 +478,8 @@ class IOThread implements Runnable, AutoCloseable { (double)localQueueTimeOut.toMillis(); if (newness < 0) return true; // connection retired prematurely if (newness > 1) return false; // closing time reached - Duration pollInterval = Duration.ofMillis(pollIntervalUS * 1000 + - (long)(newness * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS * 1000))); + Duration pollInterval = Duration.ofMillis(pollIntervalUS / 1000 + + (long)((1 - newness) * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS / 1000))); return connection.lastPollTime().plus(pollInterval).isBefore(clock.instant()); } @@ -511,4 +518,10 @@ class IOThread implements Runnable, AutoCloseable { } } + /** For testing. Returns the current connection of this. Not thread safe. */ + public GatewayConnection currentConnection() { return currentConnection; } + + /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */ + public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 702a316422d..90d07104fef 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -17,6 +17,7 @@ import java.math.BigInteger; import java.security.SecureRandom; import java.time.Clock; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -255,6 +256,8 @@ public class OperationProcessor { } } + public List<ClusterConnection> clusters() { return Collections.unmodifiableList(clusters); } + public String getStatsAsJson() { return operationStats.getStatsAsJson(); } |