diff options
12 files changed, 327 insertions, 80 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java index 6682f6ff1d0..2417a4acf71 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java @@ -42,6 +42,7 @@ public final class ConnectionParams { private int maxRetries = 100; private long minTimeBetweenRetriesMs = 700; private boolean dryRun = false; + private boolean runThreads = true; private int traceLevel = 0; private int traceEveryXOperation = 0; private boolean printTraceToStdErr = true; @@ -191,10 +192,8 @@ public final class ConnectionParams { } /** - * Don't send data to gateway, just pretend that everything is fine. - * - * @param dryRun true if enabled. - * @return pointer to builder. + * Set to true to skip making network connections and instead + * let requests complete successfully with no effect. */ public Builder setDryRun(boolean dryRun) { this.dryRun = dryRun; @@ -202,6 +201,15 @@ public final class ConnectionParams { } /** + * Set to false to skip starting io threads, such that any operation must be driven by a calling thread. + * Useful for testing. + */ + public Builder setRunThreads(boolean runThreads) { + this.runThreads = runThreads; + return this; + } + + /** * Set the min time between retries when temporarily failing against a gateway. * * @param minTimeBetweenRetries the min time value @@ -274,6 +282,7 @@ public final class ConnectionParams { maxRetries, minTimeBetweenRetriesMs, dryRun, + runThreads, traceLevel, traceEveryXOperation, printTraceToStdErr, @@ -293,6 +302,8 @@ public final class ConnectionParams { return dryRun; } + public boolean runThreads() { return runThreads; } + public int getMaxRetries() { return maxRetries; } @@ -345,6 +356,7 @@ public final class ConnectionParams { private final int maxRetries; private final long minTimeBetweenRetriesMs; private final boolean dryRun; + private final boolean runThreads; private final int traceLevel; private final int traceEveryXOperation; private final boolean printTraceToStdErr; @@ -364,6 +376,7 @@ public final class ConnectionParams { int maxRetries, long minTimeBetweenRetriesMs, boolean dryRun, + boolean runThreads, int traceLevel, int traceEveryXOperation, boolean printTraceToStdErr, @@ -385,6 +398,7 @@ public final class ConnectionParams { this.maxRetries = maxRetries; this.minTimeBetweenRetriesMs = minTimeBetweenRetriesMs; this.dryRun = dryRun; + this.runThreads = runThreads; this.traceLevel = traceLevel; this.traceEveryXOperation = traceEveryXOperation; this.printTraceToStdErr = printTraceToStdErr; @@ -436,6 +450,8 @@ public final class ConnectionParams { return dryRun; } + public boolean runThreads() { return runThreads; } + public int getTraceLevel() { return traceLevel; } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java index 3131206f148..bf07e3ea634 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java @@ -141,13 +141,12 @@ public final class SessionParams { private final ErrorReporter errorReport; private int throttlerMinSize; - private SessionParams( - Collection<Cluster> clusters, - FeedParams feedParams, - ConnectionParams connectionParams, - int clientQueueSize, - ErrorReporter errorReporter, - int throttlerMinSize) { + private SessionParams(Collection<Cluster> clusters, + FeedParams feedParams, + ConnectionParams connectionParams, + int clientQueueSize, + ErrorReporter errorReporter, + int throttlerMinSize) { this.clusters = Collections.unmodifiableList(new ArrayList<>(clusters)); this.feedParams = feedParams; this.connectionParams = connectionParams; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index a232ceeacf5..8e55e59b3f4 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -17,6 +17,7 @@ import java.io.StringWriter; import java.time.Clock; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -93,6 +94,7 @@ public class ClusterConnection implements AutoCloseable { documentQueue, feedParams.getMaxSleepTimeMs(), connectionParams.getConnectionTimeToLive(), + connectionParams.runThreads(), idlePollFrequency, clock); ioThreads.add(ioThread); @@ -167,6 +169,10 @@ public class ClusterConnection implements AutoCloseable { return stringWriter.toString(); } + public List<IOThread> ioThreads() { + return Collections.unmodifiableList(ioThreads); + } + @Override public boolean equals(Object o) { return (this == o) || (o instanceof ClusterConnection && clusterId == ((ClusterConnection) o).clusterId); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index cfd4c1003de..129fc000271 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -12,6 +12,7 @@ import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -26,6 +27,10 @@ public class DryRunGatewayConnection implements GatewayConnection { private Instant connectionTime = null; private Instant lastPollTime = null; + /** Set to true to hold off responding with a result to any incoming operations until this is set false */ + private boolean hold = false; + private List<Document> held = new ArrayList<>(); + public DryRunGatewayConnection(Endpoint endpoint, Clock clock) { this.endpoint = endpoint; this.clock = clock; @@ -34,13 +39,23 @@ public class DryRunGatewayConnection implements GatewayConnection { @Override public InputStream write(List<Document> docs) { StringBuilder result = new StringBuilder(); - for (Document doc : docs) { - OperationStatus operationStatus = new OperationStatus("ok", doc.getOperationId(), ErrorCode.OK, false, ""); - result.append(operationStatus.render()); + if (hold) { + held.addAll(docs); + } + else { + for (Document doc : held) + result.append(okResponse(doc).render()); + held.clear(); + for (Document doc : docs) + result.append(okResponse(doc).render()); } return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8)); } + public void hold(boolean hold) { + this.hold = hold; + } + @Override public InputStream poll() { lastPollTime = clock.instant(); @@ -75,4 +90,11 @@ public class DryRunGatewayConnection implements GatewayConnection { @Override public void close() { } + /** Returns the document currently held in this */ + public List<Document> held() { return Collections.unmodifiableList(held); } + + private OperationStatus okResponse(Document document) { + return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, ""); + } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java index c4ee2f58b65..1dd8b3bf3ec 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java @@ -33,12 +33,11 @@ class EndpointResultQueue { private final ScheduledThreadPoolExecutor timer; private final long totalTimeoutMs; - EndpointResultQueue( - OperationProcessor operationProcessor, - Endpoint endpoint, - int clusterId, - ScheduledThreadPoolExecutor timer, - long totalTimeoutMs) { + EndpointResultQueue(OperationProcessor operationProcessor, + Endpoint endpoint, + int clusterId, + ScheduledThreadPoolExecutor timer, + long totalTimeoutMs) { this.operationProcessor = operationProcessor; this.endpoint = endpoint; this.clusterId = clusterId; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 99aff8b4baa..c7c94587640 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -41,6 +42,8 @@ class IOThread implements Runnable, AutoCloseable { private final GatewayConnectionFactory connectionFactory; private final DocumentQueue documentQueue; private final EndpointResultQueue resultQueue; + + /** The thread running this, or null if it does not run a thread (meaning tick() must be called from the outside) */ private final Thread thread; private final int clusterId; private final CountDownLatch running = new CountDownLatch(1); @@ -56,6 +59,7 @@ class IOThread implements Runnable, AutoCloseable { private final Random random = new Random(); private GatewayConnection currentConnection; + private ConnectionState connectionState = ConnectionState.DISCONNECTED; /** * Previous connections on which we have sent operations and are still waiting for the result @@ -87,6 +91,7 @@ class IOThread implements Runnable, AutoCloseable { DocumentQueue documentQueue, long maxSleepTimeMs, Duration connectionTimeToLive, + boolean runThreads, double idlePollFrequency, Clock clock) { this.endpoint = endpoint; @@ -101,11 +106,18 @@ class IOThread implements Runnable, AutoCloseable { this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs); this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s] this.clock = clock; - this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); - thread.setDaemon(true); this.localQueueTimeOut = localQueueTimeOut; - this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10); - thread.start(); + this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10).toMillis() > pollIntervalUS / 1000 + ? localQueueTimeOut.dividedBy(10) + : Duration.ofMillis(pollIntervalUS / 1000); + if (runThreads) { + this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); + thread.setDaemon(true); + thread.start(); + } + else { + this.thread = null; + } } public Endpoint getEndpoint() { @@ -173,7 +185,7 @@ class IOThread implements Runnable, AutoCloseable { int chunkSizeBytes = 0; try { drainFirstDocumentsInQueueIfOld(); - Document doc = documentQueue.poll(maxWaitUnits, timeUnit); + Document doc = thread != null ? documentQueue.poll(maxWaitUnits, timeUnit) : documentQueue.poll(); if (doc != null) { docsForSendChunk.add(doc); chunkSizeBytes = doc.size(); @@ -371,18 +383,6 @@ class IOThread implements Runnable, AutoCloseable { } } - private boolean isStale(GatewayConnection connection) { - return connection.connectionTime() != null - && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant()); - } - - private ConnectionState refreshConnection(ConnectionState currentConnectionState) { - if (currentConnectionState == ConnectionState.SESSION_SYNCED) - oldConnections.add(currentConnection); - currentConnection = connectionFactory.newConnection(); - return ConnectionState.DISCONNECTED; - } - private void sleepIfProblemsGettingSyncedConnection(ConnectionState newState, ConnectionState oldState) { if (newState == ConnectionState.SESSION_SYNCED) return; if (newState == ConnectionState.CONNECTED && oldState == ConnectionState.DISCONNECTED) return; @@ -397,17 +397,19 @@ class IOThread implements Runnable, AutoCloseable { @Override public void run() { - ConnectionState connectionState = ConnectionState.DISCONNECTED; - while (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) { - ConnectionState oldState = connectionState; - connectionState = cycle(connectionState); - checkOldConnections(); - sleepIfProblemsGettingSyncedConnection(connectionState, oldState); - - } + while (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) + tick(); log.finer(toString() + " exiting, documentQueue.size()=" + documentQueue.size()); running.countDown(); + } + /** Do one iteration of work. Should be called from the single worker thread of this. */ + public void tick() { + ConnectionState oldState = connectionState; + connectionState = cycle(connectionState); + checkOldConnections(); + if (thread != null) + sleepIfProblemsGettingSyncedConnection(connectionState, oldState); } private void drainFirstDocumentsInQueueIfOld() { @@ -434,28 +436,33 @@ class IOThread implements Runnable, AutoCloseable { } } - private void checkOldConnections() { - if (resultQueue.getPendingSize() == 0) { - oldConnections.forEach(c -> c.close()); - oldConnections.clear(); - return; - } + private boolean isStale(GatewayConnection connection) { + return connection.connectionTime() != null + && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant()); + } + + private ConnectionState refreshConnection(ConnectionState currentConnectionState) { + if (currentConnectionState == ConnectionState.SESSION_SYNCED) + oldConnections.add(currentConnection); + currentConnection = connectionFactory.newConnection(); + return ConnectionState.DISCONNECTED; + } + private void checkOldConnections() { for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) { GatewayConnection connection = i.next(); if (closingTime(connection).isBefore(clock.instant())) { connection.close(); - i.remove();; + i.remove(); } else if (timeToPoll(connection)) { try { processResponse(connection.poll()); } catch (Exception e) { - // Old connection; this may be ok + // Old connection; best effort } } - } } @@ -471,8 +478,8 @@ class IOThread implements Runnable, AutoCloseable { (double)localQueueTimeOut.toMillis(); if (newness < 0) return true; // connection retired prematurely if (newness > 1) return false; // closing time reached - Duration pollInterval = Duration.ofMillis(pollIntervalUS * 1000 + - (long)(newness * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS * 1000))); + Duration pollInterval = Duration.ofMillis(pollIntervalUS / 1000 + + (long)((1 - newness) * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS / 1000))); return connection.lastPollTime().plus(pollInterval).isBefore(clock.instant()); } @@ -511,4 +518,10 @@ class IOThread implements Runnable, AutoCloseable { } } + /** For testing. Returns the current connection of this. Not thread safe. */ + public GatewayConnection currentConnection() { return currentConnection; } + + /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */ + public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 702a316422d..90d07104fef 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -17,6 +17,7 @@ import java.math.BigInteger; import java.security.SecureRandom; import java.time.Clock; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -255,6 +256,8 @@ public class OperationProcessor { } } + public List<ClusterConnection> clusters() { return Collections.unmodifiableList(clusters); } + public String getStatsAsJson() { return operationStats.getStatsAsJson(); } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java index 0821fa55e06..79a91d0b5f3 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java @@ -5,8 +5,7 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.AbstractHandler; /** - * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> - * @since 5.1.20 + * @author Einar M R Rosenvinge */ public final class Server implements AutoCloseable { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java index 0b03f3338c9..780de3e695c 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java @@ -18,13 +18,9 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import static com.yahoo.vespa.http.client.TestUtils.getResults; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** @@ -96,9 +92,9 @@ public class V3HttpAPITest { } @Test - public void requireThatSingleDestinationWorks() throws Exception { + public void testSingleDestination() throws Exception { try (Server server = new Server(new V3MockParsingRequestHandler(), 0); - Session session = SessionFactory.create(Endpoint.create("localhost", server.getPort(), false))) { + Session session = SessionFactory.create(Endpoint.create("localhost", server.getPort(), false))) { writeDocuments(session); Map<String, Result> results = getResults(session, documents.size()); @@ -106,8 +102,8 @@ public class V3HttpAPITest { for (TestDocument document : documents) { Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(true)); + assertNotNull(r); + assertTrue(r.getDetails().toString(), r.isSuccess()); } assertTrue(results.isEmpty()); } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java index 0005bddeb73..da82079e992 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java @@ -20,8 +20,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; /** - * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> - * @since 5.1.22 + * @author Einar M R Rosenvinge */ public class EndpointResultQueueTest { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index 8eb9513065e..59fb968906f 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.FeedConnectException; import com.yahoo.vespa.http.client.FeedEndpointException; import com.yahoo.vespa.http.client.FeedProtocolException; +import com.yahoo.vespa.http.client.ManualClock; import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.V3HttpAPITest; import com.yahoo.vespa.http.client.config.Endpoint; @@ -37,10 +38,12 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +// DO NOT ADD TESTS HERE, add to NewIOThreadTest public class IOThreadTest { private static final Endpoint ENDPOINT = Endpoint.create("myhost"); + final Clock clock = Clock.systemUTC(); final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class); final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class); final String exceptionMessage = "SOME EXCEPTION FOO"; @@ -49,13 +52,13 @@ public class IOThreadTest { Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(), V3HttpAPITest.documents.get(0).getContents(), null, - Clock.systemUTC().instant()); + clock.instant()); String docId2 = V3HttpAPITest.documents.get(1).getDocumentId(); Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(), V3HttpAPITest.documents.get(1).getContents(), null, - Clock.systemUTC().instant()); - DocumentQueue documentQueue = new DocumentQueue(4, Clock.systemUTC()); + clock.instant()); + DocumentQueue documentQueue = new DocumentQueue(4, clock); public IOThreadTest() { when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT); @@ -63,6 +66,7 @@ public class IOThreadTest { /** * Set up mock so that it can handle both failDocument() and resultReceived(). + * * @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail. * @param expectedDocIdOk on ok, this has to be the doc id, or the mock will fail. * @param isTransient checked on failure, if different, the mock will fail. @@ -100,8 +104,9 @@ public class IOThreadTest { documentQueue, 0, Duration.ofSeconds(15), + true, 10, - Clock.systemUTC()); + clock); } @Test @@ -118,7 +123,7 @@ public class IOThreadTest { } @Test - public void requireThatSingleDocumentWriteErrorIsHandledProperly() throws Exception { + public void testDocumentWriteError() throws Exception { when(apacheGatewayConnection.connect()).thenReturn(true); when(apacheGatewayConnection.write(any())).thenThrow(new IOException(exceptionMessage)); setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage); @@ -129,7 +134,7 @@ public class IOThreadTest { } @Test - public void requireThatTwoDocumentsFirstWriteErrorSecondOkIsHandledProperly() throws Exception { + public void testTwoDocumentsFirstWriteErrorSecondOk() throws Exception { when(apacheGatewayConnection.connect()).thenReturn(true); InputStream serverResponse = new ByteArrayInputStream( (docId2 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8)); @@ -149,10 +154,8 @@ public class IOThreadTest { @Test public void testQueueTimeOutNoNoConnectionToServer() throws Exception { when(apacheGatewayConnection.connect()).thenReturn(false); - InputStream serverResponse = new ByteArrayInputStream( - ("").getBytes(StandardCharsets.UTF_8)); - when(apacheGatewayConnection.write(any())) - .thenReturn(serverResponse); + InputStream serverResponse = new ByteArrayInputStream(("").getBytes(StandardCharsets.UTF_8)); + when(apacheGatewayConnection.write(any())).thenReturn(serverResponse); setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, "java.lang.Exception: Not sending document operation, timed out in queue after"); try (IOThread ioThread = createIOThread(10, 10)) { @@ -162,7 +165,7 @@ public class IOThreadTest { } @Test - public void requireThatEndpointProtocolExceptionsArePropagated() + public void testEndpointProtocolExceptionPropagation() throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { when(apacheGatewayConnection.connect()).thenReturn(true); int errorCode = 403; @@ -183,7 +186,7 @@ public class IOThreadTest { } @Test - public void requireThatEndpointConnectExceptionsArePropagated() + public void testEndpointConnectExceptionsPropagation() throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { when(apacheGatewayConnection.connect()).thenReturn(true); String errorMessage = "generic error message"; diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java new file mode 100644 index 00000000000..615fa22a6cf --- /dev/null +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java @@ -0,0 +1,192 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client.core.communication; + +import com.yahoo.vespa.http.client.FeedClient; +import com.yahoo.vespa.http.client.FeedEndpointException; +import com.yahoo.vespa.http.client.ManualClock; +import com.yahoo.vespa.http.client.Result; +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.SessionParams; +import com.yahoo.vespa.http.client.core.Document; +import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.ThrottlePolicy; +import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler; +import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; + +/** + * TODO: Migrate IOThreadTests here. + * + * @author bratseth + */ +public class NewIOThreadTest { + + @Test + public void testBasics() { + OperationProcessorTester tester = new OperationProcessorTester(); + assertEquals(0, tester.inflight()); + assertEquals(0, tester.success()); + assertEquals(0, tester.failures()); + tester.send("doc1"); + tester.send("doc2"); + tester.send("doc3"); + assertEquals(3, tester.inflight()); + assertEquals(0, tester.success()); + assertEquals(0, tester.failures()); + tester.success("doc1"); + tester.success("doc2"); + tester.success("doc3"); + assertEquals(0, tester.inflight()); + assertEquals(3, tester.success()); + assertEquals(0, tester.failures()); + } + + @Test + public void testPollingOldConnections() { + OperationProcessorTester tester = new OperationProcessorTester(); + tester.tick(3); + + assertEquals(1, tester.clusterConnections().size()); + assertEquals(1, tester.clusterConnections().get(0).ioThreads().size()); + IOThread ioThread = tester.clusterConnections().get(0).ioThreads().get(0); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + assertEquals(0, ioThread.oldConnections().size()); + + firstConnection.hold(true); + tester.send("doc1"); + tester.tick(1); + + tester.clock().advance(Duration.ofSeconds(20)); // Default connection ttl is 15 + tester.tick(3); + + assertEquals(1, ioThread.oldConnections().size()); + assertEquals(firstConnection, ioThread.oldConnections().get(0)); + assertNotSame(firstConnection, ioThread.currentConnection()); + assertEquals(20, firstConnection.lastPollTime().toEpochMilli() / 1000); + + // Check old connection poll pattern (linear backoff) + assertLastPollTimeWhenAdvancing(21, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(23, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(24, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(24, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(26, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(26, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(28, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(28, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(32, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(32, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(34, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(34, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(34, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); + + tester.clock().advance(Duration.ofSeconds(200)); + tester.tick(1); + assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size()); + } + + private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds, + int advanceSeconds, + DryRunGatewayConnection connection, + OperationProcessorTester tester) { + tester.clock().advance(Duration.ofSeconds(advanceSeconds)); + tester.tick(1); + assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000); + } + + private static class OperationProcessorTester { + + private final Endpoint endpoint; + private final int clusterId = 0; + private final ManualClock clock; + private final TestResultCallback resultCallback; + private final OperationProcessor operationProcessor; + + public OperationProcessorTester() { + endpoint = Endpoint.create("test-endpoint"); + SessionParams.Builder params = new SessionParams.Builder(); + Cluster.Builder clusterParams = new Cluster.Builder(); + clusterParams.addEndpoint(endpoint); + params.addCluster(clusterParams.build()); + ConnectionParams.Builder connectionParams = new ConnectionParams.Builder(); + connectionParams.setDryRun(true); + connectionParams.setRunThreads(false); + params.setConnectionParams(connectionParams.build()); + + clock = new ManualClock(Instant.ofEpochMilli(0)); + resultCallback = new TestResultCallback(); + operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()), + resultCallback, + params.build(), + new ScheduledThreadPoolExecutor(1), + clock); + } + + public ManualClock clock() { return clock; } + + /** Do n iteration of work in all io threads of this */ + public void tick(int n) { + for (int i = 0; i < n; i++) + for (ClusterConnection cluster : operationProcessor.clusters()) + for (IOThread thread : cluster.ioThreads()) + thread.tick(); + } + + public void send(String documentId) { + operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant())); + } + + public void success(String documentId) { + operationProcessor.resultReceived(new EndpointResult(documentId, new Result.Detail(endpoint)), clusterId); + } + + public int inflight() { + return operationProcessor.getIncompleteResultQueueSize(); + } + + public int success() { + return resultCallback.successes; + } + + public List<ClusterConnection> clusterConnections() { + return operationProcessor.clusters(); + } + + public int failures() { + return resultCallback.failures; + } + + } + + private static class TestResultCallback implements FeedClient.ResultCallback { + + private int successes = 0; + private int failures = 0; + + @Override + public void onCompletion(String docId, Result documentResult) { + successes++; + } + + @Override + public void onEndpointException(FeedEndpointException exception) { + failures++; + } + + + } + +} |