aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-09-01 15:15:51 +0200
committerJon Bratseth <bratseth@gmail.com>2020-09-01 15:15:51 +0200
commitd4a63b0f6f20cfc86ab246029ed72d5f2e1c7ed8 (patch)
treedd9b10cb26cd0680345f6be3eadc8e5a02f76c54 /vespa-http-client
parentdd3b8fac4d08abcc713b2d8a886ae622c063fd1d (diff)
Migrate IOThread tests
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java45
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java38
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java400
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java196
6 files changed, 268 insertions, 413 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
index d3e4b4ed762..3a67c80224f 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
@@ -11,6 +11,7 @@ import com.yahoo.vespa.http.client.config.Endpoint;
* @author bjorncs
*/
public abstract class FeedEndpointException extends RuntimeException {
+
private final Endpoint endpoint;
protected FeedEndpointException(String message, Throwable cause, Endpoint endpoint) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index 129fc000271..623ea543ffb 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -5,8 +5,10 @@ import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.ErrorCode;
import com.yahoo.vespa.http.client.core.OperationStatus;
+import com.yahoo.vespa.http.client.core.ServerResponseException;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
@@ -29,7 +31,13 @@ public class DryRunGatewayConnection implements GatewayConnection {
/** Set to true to hold off responding with a result to any incoming operations until this is set false */
private boolean hold = false;
- private List<Document> held = new ArrayList<>();
+ private final List<Document> held = new ArrayList<>();
+
+ /** If this is set, handshake operations will throw this exception */
+ private ServerResponseException throwThisOnHandshake = null;
+
+ /** If this is set, all write operations will throw this exception */
+ private IOException throwThisOnWrite = null;
public DryRunGatewayConnection(Endpoint endpoint, Clock clock) {
this.endpoint = endpoint;
@@ -37,27 +45,27 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public InputStream write(List<Document> docs) {
- StringBuilder result = new StringBuilder();
+ public InputStream write(List<Document> docs) throws IOException {
+ if (throwThisOnWrite != null)
+ throw throwThisOnWrite;
+
if (hold) {
held.addAll(docs);
+ return new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8));
}
else {
+ StringBuilder result = new StringBuilder();
for (Document doc : held)
result.append(okResponse(doc).render());
held.clear();
for (Document doc : docs)
result.append(okResponse(doc).render());
+ return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8));
}
- return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8));
- }
-
- public void hold(boolean hold) {
- this.hold = hold;
}
@Override
- public InputStream poll() {
+ public InputStream poll() throws IOException {
lastPollTime = clock.instant();
return write(new ArrayList<>());
}
@@ -66,7 +74,7 @@ public class DryRunGatewayConnection implements GatewayConnection {
public Instant lastPollTime() { return lastPollTime; }
@Override
- public InputStream drain() {
+ public InputStream drain() throws IOException {
return write(new ArrayList<>());
}
@@ -85,14 +93,29 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public void handshake() { }
+ public void handshake() throws ServerResponseException {
+ if (throwThisOnHandshake != null)
+ throw throwThisOnHandshake;
+ }
@Override
public void close() { }
+ public void hold(boolean hold) {
+ this.hold = hold;
+ }
+
/** Returns the document currently held in this */
public List<Document> held() { return Collections.unmodifiableList(held); }
+ public void throwOnWrite(IOException throwThisOnWrite) {
+ this.throwThisOnWrite = throwThisOnWrite;
+ }
+
+ public void throwOnHandshake(ServerResponseException throwThisOnHandshake) {
+ this.throwThisOnHandshake = throwThisOnHandshake;
+ }
+
private OperationStatus okResponse(Document document) {
return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, "");
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
index 1dd8b3bf3ec..a5a37a31665 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
@@ -65,7 +65,6 @@ class EndpointResultQueue {
private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) {
operationProcessor.resultReceived(result, clusterId);
-
TimerFuture timerFuture = futureByOperation.remove(result.getOperationId());
if (timerFuture == null) {
if (duplicateGivesWarning) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 8f052a1b3f6..4796f4bb4a8 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -51,7 +51,6 @@ class IOThread implements Runnable, AutoCloseable {
private final int maxChunkSizeBytes;
private final int maxInFlightRequests;
private final Duration localQueueTimeOut;
- private final Duration maxOldConnectionPollInterval;
private final GatewayThrottler gatewayThrottler;
private final Duration connectionTimeToLive;
private final long pollIntervalUS;
@@ -107,9 +106,6 @@ class IOThread implements Runnable, AutoCloseable {
this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
this.clock = clock;
this.localQueueTimeOut = localQueueTimeOut;
- this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10).toMillis() > pollIntervalUS / 1000
- ? localQueueTimeOut.dividedBy(10)
- : Duration.ofMillis(pollIntervalUS / 1000);
if (runThreads) {
this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
thread.setDaemon(true);
@@ -231,9 +227,10 @@ class IOThread implements Runnable, AutoCloseable {
private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) {
for (Document doc : docs) {
- resultQueue.failOperation(
- EndPointResultFactory.createTransientError(
- endpoint, doc.getOperationId(), servletException), clusterId);
+ resultQueue.failOperation(EndPointResultFactory.createTransientError(endpoint,
+ doc.getOperationId(),
+ servletException),
+ clusterId);
}
}
@@ -327,8 +324,8 @@ class IOThread implements Runnable, AutoCloseable {
} catch (Throwable throwable1) {
drainFirstDocumentsInQueueIfOld();
- log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint
- + "'. Will re-try connecting. Failed with '" + Exceptions.toMessageString(throwable1) + "'",throwable1);
+ log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint + "'. Will re-try connecting.",
+ throwable1);
executeProblemsCounter.incrementAndGet();
return ConnectionState.DISCONNECTED;
}
@@ -341,8 +338,9 @@ class IOThread implements Runnable, AutoCloseable {
} catch (ServerResponseException ser) {
executeProblemsCounter.incrementAndGet();
- log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint
- + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(ser) + "'",ser);
+ log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint +
+ "' failed. Will re-try handshake.",
+ ser);
drainFirstDocumentsInQueueIfOld();
resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint));
@@ -350,8 +348,9 @@ class IOThread implements Runnable, AutoCloseable {
} catch (Throwable throwable) { // This cover IOException as well
executeProblemsCounter.incrementAndGet();
resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint));
- log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint
- + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable);
+ log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" +
+ endpoint + "' failed. Will re-try handshake.",
+ throwable);
drainFirstDocumentsInQueueIfOld();
currentConnection.close();
return ConnectionState.DISCONNECTED;
@@ -366,14 +365,14 @@ class IOThread implements Runnable, AutoCloseable {
}
catch (ServerResponseException ser) {
log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint +
- "'. Will re-try. Endpoint responded with an unexpected HTTP response code. '"
- + Exceptions.toMessageString(ser) + "'",ser);
+ "'. Will re-try. Endpoint responded with an unexpected HTTP response code.",
+ ser);
return ConnectionState.CONNECTED;
}
catch (Throwable e) {
- log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint +
- "'. Will re-try. Connection level error. Failed with '" +
- Exceptions.toMessageString(e) + "'", e);
+ log.log(Level.INFO,
+ "Connection level error handing data over to endpoint '" + endpoint + "'. Will re-try.",
+ e);
currentConnection.close();
return ConnectionState.DISCONNECTED;
}
@@ -532,4 +531,7 @@ class IOThread implements Runnable, AutoCloseable {
/** For testing. Returns a snapshot of the old connections of this. Not thread safe. */
public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); }
+ /** For testing */
+ public EndpointResultQueue resultQueue() { return resultQueue; }
+
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 59fb968906f..a36a08b0c26 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -1,231 +1,257 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.communication;
-import com.yahoo.vespa.http.client.FeedConnectException;
+import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.FeedProtocolException;
import com.yahoo.vespa.http.client.ManualClock;
import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.V3HttpAPITest;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.SessionParams;
import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.EndpointResult;
import com.yahoo.vespa.http.client.core.ServerResponseException;
+import com.yahoo.vespa.http.client.core.ThrottlePolicy;
+import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
+import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import org.junit.Test;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.time.Clock;
import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-// DO NOT ADD TESTS HERE, add to NewIOThreadTest
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+
+/**
+ * TODO: Migrate IOThreadTests here.
+ *
+ * @author bratseth
+ */
public class IOThreadTest {
- private static final Endpoint ENDPOINT = Endpoint.create("myhost");
-
- final Clock clock = Clock.systemUTC();
- final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class);
- final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class);
- final String exceptionMessage = "SOME EXCEPTION FOO";
- CountDownLatch latch = new CountDownLatch(1);
- String docId1 = V3HttpAPITest.documents.get(0).getDocumentId();
- Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(),
- V3HttpAPITest.documents.get(0).getContents(),
- null,
- clock.instant());
- String docId2 = V3HttpAPITest.documents.get(1).getDocumentId();
- Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(),
- V3HttpAPITest.documents.get(1).getContents(),
- null,
- clock.instant());
- DocumentQueue documentQueue = new DocumentQueue(4, clock);
-
- public IOThreadTest() {
- when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT);
+ @Test
+ public void testSuccessfulWriting() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ assertEquals(0, tester.incomplete());
+ assertEquals(0, tester.success());
+ assertEquals(0, tester.failures());
+ tester.send("doc1");
+ tester.send("doc2");
+ tester.send("doc3");
+ assertEquals(3, tester.incomplete());
+ assertEquals(0, tester.success());
+ assertEquals(0, tester.failures());
+ tester.tick(1); // connect
+ assertEquals(3, tester.incomplete());
+ tester.tick(1); // sync
+ assertEquals(3, tester.incomplete());
+ tester.tick(1); // process queue
+ assertEquals(0, tester.incomplete());
+ assertEquals(3, tester.success());
+ assertEquals(0, tester.failures());
}
- /**
- * Set up mock so that it can handle both failDocument() and resultReceived().
- *
- * @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail.
- * @param expectedDocIdOk on ok, this has to be the doc id, or the mock will fail.
- * @param isTransient checked on failure, if different, the mock will fail.
- * @param expectedException checked on failure, if exception toString is different, the mock will fail.
- */
- void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk, boolean isTransient, String expectedException) {
- doAnswer(invocation -> {
- EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0];
- assertThat(endpointResult.getOperationId(), is(expectedDocIdFail));
- assertThat(endpointResult.getDetail().getException().toString(), containsString(expectedException));
- assertThat(endpointResult.getDetail().getResultType(), is(isTransient ? Result.ResultType.TRANSITIVE_ERROR : Result.ResultType.FATAL_ERROR));
-
- latch.countDown();
- return null;
- }).when(endpointResultQueue).failOperation(any(), eq(0));
-
- doAnswer(invocation -> {
- EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0];
- assertThat(endpointResult.getOperationId(), is(expectedDocIdOk));
- assertThat(endpointResult.getDetail().getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- latch.countDown();
- return null;
- }).when(endpointResultQueue).resultReceived(any(), eq(0));
- }
+ @Test
+ public void testExceptionOnConnect() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized"));
- private IOThread createIOThread(int maxInFlightRequests, long localQueueTimeOut) {
- return new IOThread(null,
- ENDPOINT,
- endpointResultQueue,
- new SingletonGatewayConnectionFactory(apacheGatewayConnection),
- 0,
- 0,
- maxInFlightRequests,
- Duration.ofMillis(localQueueTimeOut),
- documentQueue,
- 0,
- Duration.ofSeconds(15),
- true,
- 10,
- clock);
+ tester.send("doc1");
+ tester.tick(3);
+ assertEquals(1, tester.incomplete());
+ assertEquals(0, ioThread.resultQueue().getPendingSize());
+ assertEquals(0, tester.success());
+ assertEquals("Awaiting retry", 0, tester.failures());
}
@Test
- public void singleDocumentSuccess() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- InputStream serverResponse = new ByteArrayInputStream(
- (docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any())).thenReturn(serverResponse);
- setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage);
- try (IOThread ioThread = createIOThread(10000, 10000)) {
- ioThread.post(doc1);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ public void testExceptionOnHandshake() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized"));
+
+ tester.send("doc1");
+ tester.tick(3);
+ assertEquals(1, tester.incomplete());
+ assertEquals(0, ioThread.resultQueue().getPendingSize());
+ assertEquals(0, tester.success());
+ assertEquals("Awaiting retry", 0, tester.failures());
}
@Test
- public void testDocumentWriteError() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- when(apacheGatewayConnection.write(any())).thenThrow(new IOException(exceptionMessage));
- setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage);
- try (IOThread ioThread = createIOThread(10000, 10000)) {
- ioThread.post(doc1);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ public void testExceptionOnWrite() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ firstConnection.throwOnWrite(new IOException("Test failure"));
+
+ tester.send("doc1");
+ tester.tick(3);
+ assertEquals(1, tester.incomplete());
+ assertEquals(0, ioThread.resultQueue().getPendingSize());
+ assertEquals(0, tester.success());
+ assertEquals("Awaiting retry since write exceptions is a transient failure",
+ 0, tester.failures());
}
@Test
- public void testTwoDocumentsFirstWriteErrorSecondOk() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- InputStream serverResponse = new ByteArrayInputStream(
- (docId2 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any()))
- .thenThrow(new IOException(exceptionMessage))
- .thenReturn(serverResponse);
- latch = new CountDownLatch(2);
- setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage);
-
- try (IOThread ioThread = createIOThread(10000, 10000)) {
- ioThread.post(doc1);
- ioThread.post(doc2);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ public void testPollingOldConnections() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ tester.tick(3);
+
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ assertEquals(0, ioThread.oldConnections().size());
+
+ firstConnection.hold(true);
+ tester.send("doc1");
+ tester.tick(1);
+
+ tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15
+ tester.tick(3);
+
+ assertEquals(1, ioThread.oldConnections().size());
+ assertEquals(firstConnection, ioThread.oldConnections().get(0));
+ assertNotSame(firstConnection, ioThread.currentConnection());
+ assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000);
+
+ // Check old connection poll pattern (exponential backoff)
+ assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+
+ tester.clock().advance(Duration.ofSeconds(200));
+ tester.tick(1);
+ assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size());
}
- @Test
- public void testQueueTimeOutNoNoConnectionToServer() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(false);
- InputStream serverResponse = new ByteArrayInputStream(("").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any())).thenReturn(serverResponse);
- setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true,
- "java.lang.Exception: Not sending document operation, timed out in queue after");
- try (IOThread ioThread = createIOThread(10, 10)) {
- ioThread.post(doc1);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds,
+ int advanceSeconds,
+ DryRunGatewayConnection connection,
+ OperationProcessorTester tester) {
+ tester.clock().advance(Duration.ofSeconds(advanceSeconds));
+ tester.tick(1);
+ assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000);
}
- @Test
- public void testEndpointProtocolExceptionPropagation()
- throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- int errorCode = 403;
- String errorMessage = "Not authorized";
- doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake();
- Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
-
- try (IOThread ioThread = createIOThread(10, 10)) {
- ioThread.post(doc1);
- FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
- assertThat(reportedException, instanceOf(FeedProtocolException.class));
- FeedProtocolException actualException = (FeedProtocolException) reportedException;
- assertThat(actualException.getHttpStatusCode(), equalTo(errorCode));
- assertThat(actualException.getHttpResponseMessage(), equalTo(errorMessage));
- assertThat(actualException.getEndpoint(), equalTo(ENDPOINT));
- assertThat(actualException.getMessage(), equalTo("Endpoint 'myhost:4080' returned an error on handshake: 403 - Not authorized"));
+ private static class OperationProcessorTester {
+
+ private final Endpoint endpoint;
+ private final int clusterId = 0;
+ private final ManualClock clock;
+ private final TestResultCallback resultCallback;
+ private final OperationProcessor operationProcessor;
+
+ public OperationProcessorTester() {
+ endpoint = Endpoint.create("test-endpoint");
+ SessionParams.Builder params = new SessionParams.Builder();
+ Cluster.Builder clusterParams = new Cluster.Builder();
+ clusterParams.addEndpoint(endpoint);
+ params.addCluster(clusterParams.build());
+ ConnectionParams.Builder connectionParams = new ConnectionParams.Builder();
+ connectionParams.setDryRun(true);
+ connectionParams.setRunThreads(false);
+ params.setConnectionParams(connectionParams.build());
+
+ clock = new ManualClock(Instant.ofEpochMilli(0));
+ resultCallback = new TestResultCallback();
+ operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()),
+ resultCallback,
+ params.build(),
+ new ScheduledThreadPoolExecutor(1),
+ clock);
}
- }
- @Test
- public void testEndpointConnectExceptionsPropagation()
- throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- String errorMessage = "generic error message";
- IOException cause = new IOException(errorMessage);
- doThrow(cause).when(apacheGatewayConnection).handshake();
- Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
-
- try (IOThread ioThread = createIOThread(10, 10)) {
- ioThread.post(doc1);
- FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
- assertThat(reportedException, instanceOf(FeedConnectException.class));
- FeedConnectException actualException = (FeedConnectException) reportedException;
- assertThat(actualException.getCause(), equalTo(cause));
- assertThat(actualException.getEndpoint(), equalTo(ENDPOINT));
- assertThat(actualException.getMessage(), equalTo("Handshake to endpoint 'myhost:4080' failed: generic error message"));
+ public ManualClock clock() { return clock; }
+
+ /** Asserts that this has but a single IOThread and returns it */
+ public IOThread getSingleIOThread() {
+ assertEquals(1, clusterConnections().size());
+ assertEquals(1, clusterConnections().get(0).ioThreads().size());
+ return clusterConnections().get(0).ioThreads().get(0);
+ }
+
+ /** Do n iteration of work in all io threads of this */
+ public void tick(int n) {
+ for (int i = 0; i < n; i++)
+ for (ClusterConnection cluster : operationProcessor.clusters())
+ for (IOThread thread : cluster.ioThreads())
+ thread.tick();
+ }
+
+ public void send(String documentId) {
+ operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant()));
+ }
+
+ public int incomplete() {
+ return operationProcessor.getIncompleteResultQueueSize();
+ }
+
+ public int success() {
+ return resultCallback.successes;
+ }
+
+ public List<ClusterConnection> clusterConnections() {
+ return operationProcessor.clusters();
+ }
+
+ public int failures() {
+ return resultCallback.failures;
+ }
+
+ public int endpointExceptions() {
+ return resultCallback.endpointExceptions;
+ }
+
+ public Result lastResult() {
+ return resultCallback.lastResult;
}
- }
- private static Future<FeedEndpointException> endpointErrorCapturer(EndpointResultQueue endpointResultQueue) {
- CompletableFuture<FeedEndpointException> futureResult = new CompletableFuture<>();
- doAnswer(invocation -> {
- if (futureResult.isDone()) return null;
- FeedEndpointException reportedException = (FeedEndpointException) invocation.getArguments()[0];
- futureResult.complete(reportedException);
- return null;
- }).when(endpointResultQueue).onEndpointError(any());
- return futureResult;
}
- private static final class SingletonGatewayConnectionFactory implements GatewayConnectionFactory {
+ private static class TestResultCallback implements FeedClient.ResultCallback {
- private final GatewayConnection singletonConnection;
+ private int successes = 0;
+ private int failures = 0;
+ private int endpointExceptions = 0;
+ private Result lastResult;
- SingletonGatewayConnectionFactory(GatewayConnection singletonConnection) {
- this.singletonConnection = singletonConnection;
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ this.lastResult = documentResult;
+ if (documentResult.isSuccess())
+ successes++;
+ else
+ failures++;
}
@Override
- public GatewayConnection newConnection() { return singletonConnection; }
+ public void onEndpointException(FeedEndpointException exception) {
+ endpointExceptions++;
+ }
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java
deleted file mode 100644
index 7bac8f9cd9d..00000000000
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.ManualClock;
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.EndpointResult;
-import com.yahoo.vespa.http.client.core.ThrottlePolicy;
-import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
-import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
-import org.junit.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-
-/**
- * TODO: Migrate IOThreadTests here.
- *
- * @author bratseth
- */
-public class NewIOThreadTest {
-
- @Test
- public void testBasics() {
- OperationProcessorTester tester = new OperationProcessorTester();
- assertEquals(0, tester.inflight());
- assertEquals(0, tester.success());
- assertEquals(0, tester.failures());
- tester.send("doc1");
- tester.send("doc2");
- tester.send("doc3");
- assertEquals(3, tester.inflight());
- assertEquals(0, tester.success());
- assertEquals(0, tester.failures());
- tester.success("doc1");
- tester.success("doc2");
- tester.success("doc3");
- assertEquals(0, tester.inflight());
- assertEquals(3, tester.success());
- assertEquals(0, tester.failures());
- }
-
- @Test
- public void testPollingOldConnections() {
- OperationProcessorTester tester = new OperationProcessorTester();
- tester.tick(3);
-
- assertEquals(1, tester.clusterConnections().size());
- assertEquals(1, tester.clusterConnections().get(0).ioThreads().size());
- IOThread ioThread = tester.clusterConnections().get(0).ioThreads().get(0);
- DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
- assertEquals(0, ioThread.oldConnections().size());
-
- firstConnection.hold(true);
- tester.send("doc1");
- tester.tick(1);
-
- tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15
- tester.tick(3);
-
- assertEquals(1, ioThread.oldConnections().size());
- assertEquals(firstConnection, ioThread.oldConnections().get(0));
- assertNotSame(firstConnection, ioThread.currentConnection());
- assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000);
-
- // Check old connection poll pattern (exponential backoff)
- assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
-
- tester.clock().advance(Duration.ofSeconds(200));
- tester.tick(1);
- assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size());
- }
-
- private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds,
- int advanceSeconds,
- DryRunGatewayConnection connection,
- OperationProcessorTester tester) {
- tester.clock().advance(Duration.ofSeconds(advanceSeconds));
- tester.tick(1);
- assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000);
- }
-
- private static class OperationProcessorTester {
-
- private final Endpoint endpoint;
- private final int clusterId = 0;
- private final ManualClock clock;
- private final TestResultCallback resultCallback;
- private final OperationProcessor operationProcessor;
-
- public OperationProcessorTester() {
- endpoint = Endpoint.create("test-endpoint");
- SessionParams.Builder params = new SessionParams.Builder();
- Cluster.Builder clusterParams = new Cluster.Builder();
- clusterParams.addEndpoint(endpoint);
- params.addCluster(clusterParams.build());
- ConnectionParams.Builder connectionParams = new ConnectionParams.Builder();
- connectionParams.setDryRun(true);
- connectionParams.setRunThreads(false);
- params.setConnectionParams(connectionParams.build());
-
- clock = new ManualClock(Instant.ofEpochMilli(0));
- resultCallback = new TestResultCallback();
- operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()),
- resultCallback,
- params.build(),
- new ScheduledThreadPoolExecutor(1),
- clock);
- }
-
- public ManualClock clock() { return clock; }
-
- /** Do n iteration of work in all io threads of this */
- public void tick(int n) {
- for (int i = 0; i < n; i++)
- for (ClusterConnection cluster : operationProcessor.clusters())
- for (IOThread thread : cluster.ioThreads())
- thread.tick();
- }
-
- public void send(String documentId) {
- operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant()));
- }
-
- public void success(String documentId) {
- operationProcessor.resultReceived(new EndpointResult(documentId, new Result.Detail(endpoint)), clusterId);
- }
-
- public int inflight() {
- return operationProcessor.getIncompleteResultQueueSize();
- }
-
- public int success() {
- return resultCallback.successes;
- }
-
- public List<ClusterConnection> clusterConnections() {
- return operationProcessor.clusters();
- }
-
- public int failures() {
- return resultCallback.failures;
- }
-
- }
-
- private static class TestResultCallback implements FeedClient.ResultCallback {
-
- private int successes = 0;
- private int failures = 0;
-
- @Override
- public void onCompletion(String docId, Result documentResult) {
- successes++;
- }
-
- @Override
- public void onEndpointException(FeedEndpointException exception) {
- failures++;
- }
-
-
- }
-
-}