aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@oath.com>2020-09-02 19:55:02 +0200
committerGitHub <noreply@github.com>2020-09-02 19:55:02 +0200
commit51f266785a4d6f1b3ac3e88ac897adae2ab94459 (patch)
treeb42e04b9a2433228509bc6accfb211d7d7ad840f /vespa-http-client
parented218c46326f21ff78a03bffe425c4b0284dcb22 (diff)
parent500e792bd97b3086a65cc4fe24f318371bcfd4de (diff)
Merge pull request #14229 from vespa-engine/bratseth/http-client-tests
Bratseth/http client tests
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java5
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java45
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java40
-rw-r--r--vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java2
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java382
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java1042
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java200
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java125
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java322
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java196
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java11
13 files changed, 308 insertions, 2064 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
index d3e4b4ed762..3a67c80224f 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
@@ -11,6 +11,7 @@ import com.yahoo.vespa.http.client.config.Endpoint;
* @author bjorncs
*/
public abstract class FeedEndpointException extends RuntimeException {
+
private final Endpoint endpoint;
protected FeedEndpointException(String message, Throwable cause, Endpoint endpoint) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java
index a4bd5d51496..ee107a7d4d2 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java
@@ -7,6 +7,7 @@ import static java.lang.Math.min;
/**
* Class that has a method for finding next maxInFlight.
+ *
* @author dybis
*/
public class ThrottlePolicy {
@@ -16,6 +17,7 @@ public class ThrottlePolicy {
/**
* Generate nex in-flight value for throttling.
+ *
* @param maxPerformanceChange This value limit the dynamics of the algorithm.
* @param numOk number of success in last phase
* @param previousNumOk number of success in previous (before last) phase.
@@ -27,8 +29,7 @@ public class ThrottlePolicy {
public int calcNewMaxInFlight(double maxPerformanceChange, int numOk, int previousNumOk, int previousMaxInFlight,
int maxInFlightNow, boolean messagesQueued) {
- double difference = calculateRuleBasedDifference(
- maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow);
+ double difference = calculateRuleBasedDifference(maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow);
boolean previousRunWasBetter = numOk < previousNumOk;
boolean previousRunHadLessInFlight = previousMaxInFlight < maxInFlightNow;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index 129fc000271..623ea543ffb 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -5,8 +5,10 @@ import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.ErrorCode;
import com.yahoo.vespa.http.client.core.OperationStatus;
+import com.yahoo.vespa.http.client.core.ServerResponseException;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
@@ -29,7 +31,13 @@ public class DryRunGatewayConnection implements GatewayConnection {
/** Set to true to hold off responding with a result to any incoming operations until this is set false */
private boolean hold = false;
- private List<Document> held = new ArrayList<>();
+ private final List<Document> held = new ArrayList<>();
+
+ /** If this is set, handshake operations will throw this exception */
+ private ServerResponseException throwThisOnHandshake = null;
+
+ /** If this is set, all write operations will throw this exception */
+ private IOException throwThisOnWrite = null;
public DryRunGatewayConnection(Endpoint endpoint, Clock clock) {
this.endpoint = endpoint;
@@ -37,27 +45,27 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public InputStream write(List<Document> docs) {
- StringBuilder result = new StringBuilder();
+ public InputStream write(List<Document> docs) throws IOException {
+ if (throwThisOnWrite != null)
+ throw throwThisOnWrite;
+
if (hold) {
held.addAll(docs);
+ return new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8));
}
else {
+ StringBuilder result = new StringBuilder();
for (Document doc : held)
result.append(okResponse(doc).render());
held.clear();
for (Document doc : docs)
result.append(okResponse(doc).render());
+ return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8));
}
- return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8));
- }
-
- public void hold(boolean hold) {
- this.hold = hold;
}
@Override
- public InputStream poll() {
+ public InputStream poll() throws IOException {
lastPollTime = clock.instant();
return write(new ArrayList<>());
}
@@ -66,7 +74,7 @@ public class DryRunGatewayConnection implements GatewayConnection {
public Instant lastPollTime() { return lastPollTime; }
@Override
- public InputStream drain() {
+ public InputStream drain() throws IOException {
return write(new ArrayList<>());
}
@@ -85,14 +93,29 @@ public class DryRunGatewayConnection implements GatewayConnection {
}
@Override
- public void handshake() { }
+ public void handshake() throws ServerResponseException {
+ if (throwThisOnHandshake != null)
+ throw throwThisOnHandshake;
+ }
@Override
public void close() { }
+ public void hold(boolean hold) {
+ this.hold = hold;
+ }
+
/** Returns the document currently held in this */
public List<Document> held() { return Collections.unmodifiableList(held); }
+ public void throwOnWrite(IOException throwThisOnWrite) {
+ this.throwThisOnWrite = throwThisOnWrite;
+ }
+
+ public void throwOnHandshake(ServerResponseException throwThisOnHandshake) {
+ this.throwThisOnHandshake = throwThisOnHandshake;
+ }
+
private OperationStatus okResponse(Document document) {
return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, "");
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
index 1dd8b3bf3ec..a5a37a31665 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
@@ -65,7 +65,6 @@ class EndpointResultQueue {
private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) {
operationProcessor.resultReceived(result, clusterId);
-
TimerFuture timerFuture = futureByOperation.remove(result.getOperationId());
if (timerFuture == null) {
if (duplicateGivesWarning) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 8f052a1b3f6..652f65489db 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -34,7 +34,7 @@ import java.util.logging.Logger;
*
* @author Einar M R Rosenvinge
*/
-class IOThread implements Runnable, AutoCloseable {
+public class IOThread implements Runnable, AutoCloseable {
private static final Logger log = Logger.getLogger(IOThread.class.getName());
@@ -51,7 +51,6 @@ class IOThread implements Runnable, AutoCloseable {
private final int maxChunkSizeBytes;
private final int maxInFlightRequests;
private final Duration localQueueTimeOut;
- private final Duration maxOldConnectionPollInterval;
private final GatewayThrottler gatewayThrottler;
private final Duration connectionTimeToLive;
private final long pollIntervalUS;
@@ -107,9 +106,6 @@ class IOThread implements Runnable, AutoCloseable {
this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
this.clock = clock;
this.localQueueTimeOut = localQueueTimeOut;
- this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10).toMillis() > pollIntervalUS / 1000
- ? localQueueTimeOut.dividedBy(10)
- : Duration.ofMillis(pollIntervalUS / 1000);
if (runThreads) {
this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
thread.setDaemon(true);
@@ -231,9 +227,10 @@ class IOThread implements Runnable, AutoCloseable {
private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) {
for (Document doc : docs) {
- resultQueue.failOperation(
- EndPointResultFactory.createTransientError(
- endpoint, doc.getOperationId(), servletException), clusterId);
+ resultQueue.failOperation(EndPointResultFactory.createTransientError(endpoint,
+ doc.getOperationId(),
+ servletException),
+ clusterId);
}
}
@@ -327,8 +324,8 @@ class IOThread implements Runnable, AutoCloseable {
} catch (Throwable throwable1) {
drainFirstDocumentsInQueueIfOld();
- log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint
- + "'. Will re-try connecting. Failed with '" + Exceptions.toMessageString(throwable1) + "'",throwable1);
+ log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint + "'. Will re-try connecting.",
+ throwable1);
executeProblemsCounter.incrementAndGet();
return ConnectionState.DISCONNECTED;
}
@@ -341,8 +338,9 @@ class IOThread implements Runnable, AutoCloseable {
} catch (ServerResponseException ser) {
executeProblemsCounter.incrementAndGet();
- log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint
- + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(ser) + "'",ser);
+ log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint +
+ "' failed. Will re-try handshake.",
+ ser);
drainFirstDocumentsInQueueIfOld();
resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint));
@@ -350,8 +348,9 @@ class IOThread implements Runnable, AutoCloseable {
} catch (Throwable throwable) { // This cover IOException as well
executeProblemsCounter.incrementAndGet();
resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint));
- log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint
- + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable);
+ log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" +
+ endpoint + "' failed. Will re-try handshake.",
+ throwable);
drainFirstDocumentsInQueueIfOld();
currentConnection.close();
return ConnectionState.DISCONNECTED;
@@ -366,14 +365,14 @@ class IOThread implements Runnable, AutoCloseable {
}
catch (ServerResponseException ser) {
log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint +
- "'. Will re-try. Endpoint responded with an unexpected HTTP response code. '"
- + Exceptions.toMessageString(ser) + "'",ser);
+ "'. Will re-try. Endpoint responded with an unexpected HTTP response code.",
+ ser);
return ConnectionState.CONNECTED;
}
catch (Throwable e) {
- log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint +
- "'. Will re-try. Connection level error. Failed with '" +
- Exceptions.toMessageString(e) + "'", e);
+ log.log(Level.INFO,
+ "Connection level error handing data over to endpoint '" + endpoint + "'. Will re-try.",
+ e);
currentConnection.close();
return ConnectionState.DISCONNECTED;
}
@@ -532,4 +531,7 @@ class IOThread implements Runnable, AutoCloseable {
/** For testing. Returns a snapshot of the old connections of this. Not thread safe. */
public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); }
+ /** For testing */
+ public EndpointResultQueue resultQueue() { return resultQueue; }
+
}
diff --git a/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java
index 909131a8979..6110c04e85f 100644
--- a/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java
+++ b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java
@@ -28,13 +28,11 @@ public class ExampleUsageFeedClientTest {
Server serverB =
new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
-
exampleCode("localhost", serverA.getPort(), "localhost", serverB.getPort());
serverA.close();
serverB.close();
}
-
private static CharSequence generateDocument(String docId) {
// Just a dummy example of an update document operation.
return "{\"update\": \""+ docId + "\","
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
deleted file mode 100644
index 0813cb36078..00000000000
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
+++ /dev/null
@@ -1,382 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler;
-import org.junit.Test;
-
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.time.Clock;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import static com.yahoo.vespa.http.client.TestUtils.writeDocument;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-/**
- *
- * @author Einar M R Rosenvinge
- */
-@SuppressWarnings("deprecation")
-public class QueueBoundsTest {
-
- public static final List<TestDocument> documents;
-
- static {
- List<TestDocument> docs = new ArrayList<>();
- docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bobdylan/BestOf",
- ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n" +
- " <title>Best of Bob Dylan</title>\n" +
- "</document>\n").getBytes(StandardCharsets.UTF_8)));
- docs.add(new TestDocument("id:music:music::http://music.yahoo.com/oleivars/BestOf",
- ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/oleivars/BestOf\">\n" +
- " <title>Best of Ole Ivars</title>\n" +
- "</document>\n").getBytes(StandardCharsets.UTF_8)));
- docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf",
- ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf\">\n" +
- " <title>Best of Bjarne Fritjofs</title>\n" +
- "</document>\n").getBytes(StandardCharsets.UTF_8)));
- docs.add(new TestDocument("id:music:music::http://music.yahoo.com/larryingvars/BestOf",
- ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/larryingvars/BestOf\">\n" +
- " <title>Best of Larry Ingvars</title>\n" +
- "</document>\n").getBytes(StandardCharsets.UTF_8)));
- documents = Collections.unmodifiableList(docs);
- }
-
- @Test
- public void requireThatFullInputQueueBlocksAndUnblocks() throws Exception {
- V3MockParsingRequestHandler mockXmlParsingRequestHandler = new V3MockParsingRequestHandler();
- mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION);
- try (Server server = new Server(mockXmlParsingRequestHandler, 0);
- Session session =
- new com.yahoo.vespa.http.client.core.api.SessionImpl(
- new SessionParams.Builder()
- .addCluster(new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", server.getPort(), false))
- .build())
- .setFeedParams(new FeedParams.Builder()
- .setMaxChunkSizeBytes(1)
- .setMaxInFlightRequests(1)
- .setLocalQueueTimeOut(40000)
- .build())
- .setConnectionParams(new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .build())
- .setClientQueueSize(2)
- .build(),
- SessionFactory.createTimeoutExecutor(),
- Clock.systemUTC())) {
- FeederThread feeder = new FeederThread(session);
- try {
- feeder.start();
- assertFeedNotBlocking(feeder, 0);
- assertThat(session.results().size(), is(0));
- assertFeedNotBlocking(feeder, 1);
- assertThat(session.results().size(), is(0));
- CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 2);
- assertThat(session.results().size(), is(0));
-
- mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK);
- lastPostFeed.await(60, TimeUnit.SECONDS);
- assertThat(lastPostFeed.getCount(), equalTo(0L));
- assertResultQueueSize(session, 3, 60, TimeUnit.SECONDS);
- } finally {
- feeder.stop();
- }
- }
- }
-
- @Test
- public void requireThatFullIntermediateQueueBlocksAndUnblocks() throws Exception {
- V3MockParsingRequestHandler slowHandler =
- new V3MockParsingRequestHandler("B", HttpServletResponse.SC_OK,
- V3MockParsingRequestHandler.Scenario.DELAYED_RESPONSE);
-
- try (Server serverA = new Server(new V3MockParsingRequestHandler("A"), 0);
- Server serverB = new Server(slowHandler, 0);
- com.yahoo.vespa.http.client.core.api.SessionImpl session = new com.yahoo.vespa.http.client.core.api.SessionImpl(
- new SessionParams.Builder()
- .addCluster(new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .setFeedParams(new FeedParams.Builder()
- .setMaxChunkSizeBytes(1)
- .build())
- .setConnectionParams(new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .build())
- .setClientQueueSize(6) //3 per cluster
- .build(), SessionFactory.createTimeoutExecutor(),
- Clock.systemUTC())) {
-
- FeederThread feeder = new FeederThread(session);
- try {
- feeder.start();
- assertFeedNotBlocking(feeder, 0);
- assertFeedNotBlocking(feeder, 1);
- assertFeedNotBlocking(feeder, 2);
-
- //A input queue size now: 3
- //B input queue size now: 3
- //feeder thread not blocked
- //intermediate result queue size now: 3
- //result queue size now: 0
- assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS);
- assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS);
-
- assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS);
- assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS);
-
- //server B is slow, server A is fast
-
-
- //A input queue size now: 0
- //B input queue size now: 2, IOThread writing 1 and blocked because of no response yet
- //feeder thread still not blocked
- //intermediate result queue size now: 3
- //result queue size now: 0
- assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS);
- assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS);
-
- CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 3);
-
- //A input queue size now: 0
- //B input queue size now: 2, IOThread writing 1 and blocked because of no response yet
- //feeder thread blocking with 1 op
- //intermediate result queue size now: 3
- //result queue size now: 0
-
- slowHandler.poke();
-
- //A input queue size now: 0
- //B input queue size now: 2, IOThread writing 1 and blocked because of no response again
- //feeder thread unblocked
- //intermediate result queue size now: 3
- //result queue size now: 1
-
- lastPostFeed.await(60, TimeUnit.SECONDS);
- assertThat(lastPostFeed.getCount(), equalTo(0L));
-
- slowHandler.pokeAllAndUnblockFromNowOn();
-
- //A input queue size now: 0
- //B input queue size now: 0, IOThread not blocked
- //feeder thread unblocked
- //intermediate result queue size now: 0
- //result queue size now: 4
-
- assertResultQueueSize(session, 4, 60, TimeUnit.SECONDS);
- } finally {
- slowHandler.pokeAllAndUnblockFromNowOn();
- feeder.stop();
- }
- }
- }
-
- @Test
- public void testErrorRecovery() throws Exception {
- V3MockParsingRequestHandler mockXmlParsingRequestHandler = new V3MockParsingRequestHandler();
- mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION);
- try (Server server = new Server(mockXmlParsingRequestHandler, 0);
- Session session =
- new com.yahoo.vespa.http.client.core.api.SessionImpl(
- new SessionParams.Builder()
- .addCluster(new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", server.getPort(), false))
- .build())
- .setFeedParams(new FeedParams.Builder()
- .setMaxChunkSizeBytes(1)
- .setMaxInFlightRequests(1)
- .setLocalQueueTimeOut(3000)
- .build())
- .setConnectionParams(new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(1)
- .build())
- .setClientQueueSize(1)
- .build(),
- SessionFactory.createTimeoutExecutor(),
- Clock.systemUTC())) {
- FeederThread feeder = new FeederThread(session);
- feeder.start();
- try {
- {
- System.out.println("We start with failed connection, post a document.");
- assertFeedNotBlocking(feeder, 0);
- assertEquals(0, session.results().size());
-
- CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 1);
- System.out.println("No result so far.");
- assertEquals(0, session.results().size());
- System.out.println("Make connection ok.");
- mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK);
- assert(lastPostFeed.await(120, TimeUnit.SECONDS));
- assertEquals(0L, lastPostFeed.getCount());
- assertResultQueueSize(session, 2, 120, TimeUnit.SECONDS);
- }
-
- System.out.println("Take down connection");
-
- mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION);
- {
- assertFeedNotBlocking(feeder, 2);
- System.out.println("Fed one document, fit in queue.");
- assertEquals(2, session.results().size());
- System.out.println("Fed one document more, wait for failure.");
-
- assertFeedNotBlocking(feeder, 3);
- System.out.println("Wait for results for all three documents.");
- while (session.results().size() != 3) {
- Thread.sleep(1);
- }
- System.out.println("Back to ok, test feeding again.");
- mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK);
- assertResultQueueSize(session, 4, 120, TimeUnit.SECONDS);
- }
- int errors = 0;
- for (Result result : session.results()) {
- assertEquals(1, result.getDetails().size());
- if (! result.isSuccess()) {
- errors++;
- }
- }
- assertEquals(1, errors);
- } finally {
- feeder.stop();
- }
- }
- }
-
- private void assertResultQueueSize(Session session, int size, long timeout, TimeUnit timeUnit) throws InterruptedException {
- long timeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
- long waitTimeMs = 0;
- while (true) {
- if (session.results().size() == size) {
- break;
- }
- Thread.sleep(100);
- waitTimeMs += 100;
- if (waitTimeMs > timeoutMs) {
- fail("Queue never reached size " + size + " before timeout of " + timeout + " " + timeUnit + ". Size now: " + session.results().size());
- }
- }
- }
-
- private void assertIncompleteResultQueueSize(com.yahoo.vespa.http.client.core.api.SessionImpl session, int size, long timeout, TimeUnit timeUnit) throws InterruptedException {
- long timeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
- long waitTimeMs = 0;
- while (true) {
- if (session.getIncompleteResultQueueSize() == size) {
- break;
- }
- Thread.sleep(100);
- waitTimeMs += 100;
- if (waitTimeMs > timeoutMs) {
- fail("Queue of incomplete results never reached size " + size + " before timeout of " + timeout + " " + timeUnit + ". Size now: " + session.getIncompleteResultQueueSize());
- }
- }
- }
-
- private CountDownLatch assertFeedBlocking(FeederThread feeder, int idx) throws InterruptedException {
- CountDownLatch preFeed = new CountDownLatch(1);
- CountDownLatch postFeed = new CountDownLatch(1);
- feeder.documents.add(new Triplet<>(preFeed, documents.get(idx), postFeed));
- preFeed.await(60, TimeUnit.SECONDS);
- assertThat(preFeed.getCount(), equalTo(0L));
- postFeed.await(2, TimeUnit.SECONDS);
- assertThat(feeder.getState(), not(equalTo(Thread.State.RUNNABLE)));
- assertThat(postFeed.getCount(), equalTo(1L));
- return postFeed;
- }
-
- private void assertFeedNotBlocking(FeederThread feeder, int idx) throws InterruptedException {
- CountDownLatch preFeed = new CountDownLatch(1);
- CountDownLatch postFeed = new CountDownLatch(1);
- feeder.documents.add(new Triplet<>(preFeed, documents.get(idx), postFeed));
- preFeed.await(60, TimeUnit.SECONDS);
- assertThat(preFeed.getCount(), equalTo(0L));
- postFeed.await(60, TimeUnit.SECONDS);
- assertThat(postFeed.getCount(), equalTo(0L));
- }
-
- public static class Triplet<F, S, T> {
- public final F first;
- public final S second;
- public final T third;
-
- public Triplet(final F first, final S second, final T third) {
- this.first = first;
- this.second = second;
- this.third = third;
- }
- }
-
- private class FeederThread implements Runnable {
- private final Session session;
- private final BlockingQueue<Triplet<CountDownLatch, TestDocument, CountDownLatch>> documents = new LinkedBlockingQueue<>();
- private final Thread thread;
- private volatile boolean shouldRun = true;
-
- FeederThread(Session session) {
- this.session = session;
- this.thread = new Thread(this);
- }
-
- public void start() {
- this.thread.start();
- }
-
- public void stop() {
- shouldRun = false;
- thread.interrupt();
- }
-
- public Thread.State getState() {
- return thread.getState();
- }
-
- @Override
- public void run() {
- try {
- while (shouldRun) {
- Triplet<CountDownLatch, TestDocument, CountDownLatch> triplet = documents.poll(200, TimeUnit.MILLISECONDS);
- if (triplet == null) {
- continue;
- }
- triplet.first.countDown();
- writeDocument(session, triplet.second);
- triplet.third.countDown();
- }
- } catch (IOException ioe) {
- ioe.printStackTrace();
- } catch (InterruptedException e) {
- //ignore
- } catch (RuntimeException re) {
- if (re.getCause() == null || (!(re.getCause() instanceof InterruptedException))) {
- re.printStackTrace();
- }
- }
-
- }
-
- }
-}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java
deleted file mode 100644
index 5fbc367c620..00000000000
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java
+++ /dev/null
@@ -1,1042 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static com.yahoo.vespa.http.client.TestUtils.getResults;
-import static com.yahoo.vespa.http.client.V3HttpAPITest.documents;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-/**
- *
- * @author Einar M R Rosenvinge
- */
-@SuppressWarnings("deprecation")
-public class V3HttpAPIMultiClusterTest {
-
- private void writeDocuments(Session session) throws IOException {
- TestUtils.writeDocuments(session, documents);
- }
-
- private void writeDocument(Session session) throws IOException {
- TestUtils.writeDocuments(session, Collections.<TestDocument>singletonList(documents.get(0)));
- }
-
- private void waitForHandshakesOk(int handshakes, Session session) throws InterruptedException {
- int waitedTimeMs = 0;
- while (session.getStatsAsJson().split("\"successfullHandshakes\":1").length != 1+ handshakes) {
- waitedTimeMs += 100;
- assert(waitedTimeMs < 300000);
- Thread.sleep(100);
- }
- }
-
- @Test
- public void testOpenClose() throws Exception {
- try (Server serverA = new Server(
- new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Session session = SessionFactory.create(new SessionParams.Builder()
- .addCluster(new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .build())) {
- assertThat(session, notNullValue());
- }
- }
-
- @Test
- public void testPriorityAndTraceFlag() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(
- 200, V3MockParsingRequestHandler.Scenario.EXPECT_HIGHEST_PRIORITY_AND_TRACELEVEL_123), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .setConnectionParams(new ConnectionParams.Builder()
- .setTraceLevel(123)
- .build())
- .setFeedParams(new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setPriority("HIGHEST")
- .build())
- .build())) {
- writeDocuments(session);
- Map<String, Result> results = getResults(session, documents.size());
- assertThat("Results received: " + results.values(), results.size(), is(documents.size()));
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r.getDetails().toString(), r.isSuccess(), is(true));
- }
- }
- }
-
- @Test
- public void testRetries() throws Exception {
- V3MockParsingRequestHandler unstableServer =
- new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK);
- V3MockParsingRequestHandler failedServer =
- new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK);
- try (Server serverA = new Server(failedServer, 0);
- Server serverB = new Server(unstableServer, 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build()
- )
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build()
- )
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(2)
- .setMinTimeBetweenRetries(200, TimeUnit.MILLISECONDS)
- .build()
- )
- .build()
- )) {
- waitForHandshakesOk(2, session);
- // Both servers worked fine so far, handshake established. Now fail transient when trying to send
- // data on both. This should cause the OperationProcessor to retry the document. One of the server
- // will come up, but not the other.
- unstableServer.setScenario(V3MockParsingRequestHandler.Scenario.SERVER_ERROR_TWICE_THEN_OK);
- // This will cause retries, but it will not come up.
- failedServer.setScenario(V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED);
- writeDocuments(session);
- Map<String, Result> results = getResults(session, documents.size());
- assertThat("Results received: " + results.values(), results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(2));
- assert(r.getDetails().get(0).getResultType() != r.getDetails().get(1).getResultType());
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatFeedingWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .build())) {
-
- writeDocuments(session);
- Map<String, Result> results = getResults(session, documents.size());
- assertThat("Results received: " + results.values(), results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(true));
- assertThat(r.getDetails().size(), is(3));
- assertThat(r.getDetails().get(0).getTraceMessage(), is("Trace message"));
- }
- assertThat(results.isEmpty(), is(true));
- final String stats = session.getStatsAsJson();
- assertThat(stats, containsString("maxChunkSizeBytes\":51200"));
- }
- }
-
- @Test
- public void requireThatOneImmediateDisconnectWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setLocalQueueTimeOut(1000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(1)
- .build())
- .build())) {
-
- writeDocuments(session); //cannot fail here, they are just enqueued
-
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
-
-
- @Test
- public void requireThatAllImmediateDisconnectWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setLocalQueueTimeOut(1000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(1)
- .build())
- .build())) {
-
- writeDocuments(session); //cannot fail here, they are just enqueued
-
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
-
- @Test
- public void requireThatOneTimeoutWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.NEVER_RETURN_ANY_RESULTS), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(10000)
- .setServerTimeout(2, TimeUnit.SECONDS)
- .setClientTimeout(2, TimeUnit.SECONDS)
- .setLocalQueueTimeOut(2000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(1)
- .build())
- .build())) {
-
- waitForHandshakesOk(3, session);
- writeDocuments(session);
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- //assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatOneWrongSessionIdWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1000)
- .setServerTimeout(2, TimeUnit.SECONDS)
- .setClientTimeout(2, TimeUnit.SECONDS)
- .setLocalQueueTimeOut(2000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(0)
- .build())
- .build())) {
-
- waitForHandshakesOk(2, session);
- writeDocuments(session);
-
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatAllWrongSessionIdWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setServerTimeout(2, TimeUnit.SECONDS)
- .setClientTimeout(2, TimeUnit.SECONDS)
- .setLocalQueueTimeOut(2000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(0)
- .build())
- .build())) {
-
- writeDocuments(session);
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- //assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatOneNonAcceptedVersionWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setLocalQueueTimeOut(2000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(0)
- .build())
- .build())) {
-
- writeDocument(session);
-
- Map<String, Result> results = getResults(session, 1);
- assertThat(results.size(), is(1));
- Result r = results.values().iterator().next();
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- Result.Detail failed = details.remove(Endpoint.create("localhost", serverC.getPort(), false));
- assertThat(failed.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- for (Result.Detail detail : details.values()) {
- assertThat(detail.getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- }
- }
- }
-
- @Test
- public void requireThatAllNonAcceptedVersionWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setLocalQueueTimeOut(2000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(1)
- .build())
- .build())) {
-
- writeDocuments(session); //cannot fail here, they are just enqueued
-
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- //assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatOneUnexpectedVersionWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setLocalQueueTimeOut(2000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(0)
- .build())
- .build())) {
-
- writeDocuments(session); //cannot fail here, they are just enqueued
-
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatAllUnexpectedVersionWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setLocalQueueTimeOut(2000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(0)
- .build())
- .build())) {
-
- writeDocument(session);
-
- Map<String, Result> results = getResults(session, 1);
- assertThat(results.size(), is(1));
- Result r = results.values().iterator().next();
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- for (Result.Detail detail : r.getDetails()) {
- assertThat(detail.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- }
- }
-
- @Test
- public void requireThatOneInternalServerErrorWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setLocalQueueTimeOut(2000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(0)
- .build())
- .build())) {
-
- writeDocument(session);
-
- Map<String, Result> results = getResults(session, 1);
- assertThat(results.size(), is(1));
- Result r = results.values().iterator().next();
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- Result.Detail failed = details.remove(Endpoint.create("localhost", serverC.getPort(), false));
- assertThat(failed.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- for (Result.Detail detail : details.values()) {
- assertThat(detail.getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- }
- }
- }
-
- @Test
- public void requireThatAllInternalServerErrorWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setLocalQueueTimeOut(2000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(0)
- .build())
- .build())) {
-
- writeDocument(session);
-
- Map<String, Result> results = getResults(session, 1);
- assertThat(results.size(), is(1));
- Result r = results.values().iterator().next();
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- for (Result.Detail detail : r.getDetails()) {
- assertThat(detail.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- }
- }
-
- @Test
- public void requireThatOneCouldNotFeedErrorWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(1)
- .build())
- .build())) {
-
- writeDocuments(session);
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatAllCouldNotFeedErrorWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(1)
- .build())
- .build())) {
-
- writeDocuments(session);
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatOneMbusErrorWorks() throws Exception {
- final V3MockParsingRequestHandler unstableServer = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR);
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
- Server serverC = new Server(unstableServer, 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(100)
- .build())
- .build())) {
-
- writeDocuments(session);
-
- // Make it fail, but it should still retry since it is a MBUS error that is transitive
- // for the client even though fatal for message bus.
- Thread.sleep(1000);
- assertThat(session.results().size(), is(0));
- unstableServer.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK);
-
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(true));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
-
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatAllMbusErrorWorks() throws Exception {
- try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0);
- Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0);
- Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build()
- )
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .build()
- )
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(0)
- .build()
- )
- .build())) {
-
- writeDocuments(session); //cannot fail here, they are just enqueued
-
- Map<String, Result> results = getResults(session, documents.size());
- assertThat(results.size(), is(documents.size()));
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(3));
-
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.FATAL_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.FATAL_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.FATAL_ERROR));
- }
- assertThat(results.isEmpty(), is(true));
- }
- }
-
- @Test
- public void requireThatBadVipBehaviorDoesNotFailBadly() throws Exception {
- V3MockParsingRequestHandler handlerA = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.BAD_REQUEST);
- V3MockParsingRequestHandler handlerB = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.BAD_REQUEST);
-
- try (Server serverA = new Server(handlerA, 0);
- Server serverB = new Server(handlerB, 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setMaxSleepTimeMs(0)
- .setMaxChunkSizeBytes(1)
- .setLocalQueueTimeOut(1000)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(1)
- .build())
- .build())) {
-
- //Set A in bad state => A returns bad request.
- handlerA.badRequestScenarioShouldReturnBadRequest.set(true);
-
- //write one document, should fail
- writeDocument(session);
- Map<String, Result> results = getResults(session, 1);
- assertThat(results.size(), is(1));
- Result r = results.values().iterator().next();
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
- assertThat(r.getDetails().size(), is(2));
- Map<Endpoint, Result.Detail> details = new HashMap<>();
- for (Result.Detail detail : r.getDetails()) {
- details.put(detail.getEndpoint(), detail);
- }
- assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR));
- assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
-
-
- //Set B in bad state => B returns bad request.
- handlerB.badRequestScenarioShouldReturnBadRequest.set(true);
- } //try to close session
-
- }
-}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
deleted file mode 100644
index 780de3e695c..00000000000
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static com.yahoo.vespa.http.client.TestUtils.getResults;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- *
- * @author Einar M R Rosenvinge
- */
-@SuppressWarnings("deprecation")
-public class V3HttpAPITest {
-
- public static final List<TestDocument> documents;
-
- static {
- List<TestDocument> docs = new ArrayList<>();
- docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bobdylan/BestOf",
- ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n" +
- " <title>Best of Bob Dylan</title>\n" +
- "</document>\n").getBytes(StandardCharsets.UTF_8)));
- docs.add(new TestDocument("id:music:music::http://music.yahoo.com/oleivars/BestOf",
- ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/oleivars/BestOf\">\n" +
- " <title>Best of Ole Ivars</title>\n" +
- "</document>\n").getBytes(StandardCharsets.UTF_8)));
- docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf",
- ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf\">\n" +
- " <title>Best of Bjarne Fritjofs</title>\n" +
- "</document>\n").getBytes(StandardCharsets.UTF_8)));
- documents = Collections.unmodifiableList(docs);
- }
-
- private void writeDocuments(Session session) throws IOException {
- TestUtils.writeDocuments(session, documents);
- }
-
- private void writeDocument(Session session) throws IOException {
- TestUtils.writeDocuments(session, Collections.<TestDocument>singletonList(documents.get(0)));
- }
-
- private void testServerWithMock(V3MockParsingRequestHandler serverMock, boolean failFast, boolean conditionNotMet) throws Exception {
- try (Server server = new Server(serverMock, 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(0)
- .build())
- .setFeedParams(new FeedParams.Builder()
- .setLocalQueueTimeOut(failFast ? 0 : 120000)
- .setServerTimeout(120, TimeUnit.SECONDS)
- .setClientTimeout(120, TimeUnit.SECONDS)
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", server.getPort(), false))
- .build())
- .build())) {
-
- writeDocument(session);
- Map<String, Result> results = getResults(session, 1);
- assertEquals(1, results.size());
-
- TestDocument document = documents.get(0);
- Result r = results.remove(document.getDocumentId());
- assertNotNull(r);
- if (conditionNotMet)
- assertEquals(Result.ResultType.CONDITION_NOT_MET, r.getDetails().iterator().next().getResultType());
- assertFalse(r.getDetails().toString(), r.isSuccess());
- assertTrue(results.isEmpty());
- }
- }
-
- @Test
- public void testSingleDestination() throws Exception {
- try (Server server = new Server(new V3MockParsingRequestHandler(), 0);
- Session session = SessionFactory.create(Endpoint.create("localhost", server.getPort(), false))) {
-
- writeDocuments(session);
- Map<String, Result> results = getResults(session, documents.size());
- assertEquals(documents.size(), results.size());
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertNotNull(r);
- assertTrue(r.getDetails().toString(), r.isSuccess());
- }
- assertTrue(results.isEmpty());
- }
- }
-
- @Test
- public void requireThatBadResponseCodeFails() throws Exception {
- testServerWithMock(new V3MockParsingRequestHandler(401/*Unauthorized*/), true, false);
- testServerWithMock(new V3MockParsingRequestHandler(403/*Forbidden*/), true, false);
- testServerWithMock(new V3MockParsingRequestHandler(407/*Proxy Authentication Required*/), true, false);
- }
-
- @Test
- public void requireThatUnexpectedVersionIsHandledProperly() throws Exception {
- testServerWithMock(new V3MockParsingRequestHandler(
- 200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), true, false);
- }
-
- @Test
- public void requireThatNonAcceptedVersionIsHandledProperly() throws Exception {
- testServerWithMock(new V3MockParsingRequestHandler(
- 200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), true, false);
- }
-
- @Test
- public void requireThatNon200OkIsHandledProperly() throws Exception {
- testServerWithMock(new V3MockParsingRequestHandler(
- 200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), true, false);
- }
-
- @Test
- public void requireThatMbusErrorIsHandledProperly() throws Exception {
- testServerWithMock(new V3MockParsingRequestHandler(
- 200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), false, false);
- }
-
- @Test
- public void requireThatTimeoutWorks() throws Exception {
- try (Server server = new Server(new V3MockParsingRequestHandler(
- 200, V3MockParsingRequestHandler.Scenario.NEVER_RETURN_ANY_RESULTS), 0);
- Session session = SessionFactory.create(
- new SessionParams.Builder()
- .setFeedParams(new FeedParams.Builder()
- .setLocalQueueTimeOut(0)
- .build())
- .setConnectionParams(
- new ConnectionParams.Builder()
- .setNumPersistentConnectionsPerEndpoint(1)
- .setMaxRetries(2)
- .build())
- .setFeedParams(
- new FeedParams.Builder()
- .setServerTimeout(500, TimeUnit.MILLISECONDS)
- .setClientTimeout(500, TimeUnit.MILLISECONDS)
- .build())
- .addCluster(
- new Cluster.Builder()
- .addEndpoint(Endpoint.create("localhost", server.getPort(), false))
- .build())
- .build())) {
-
- writeDocuments(session);
-
- Map<String, Result> results = getResults(session, documents.size());
- assertEquals(documents.size(), results.size());
-
- for (TestDocument document : documents) {
- Result r = results.remove(document.getDocumentId());
- assertNotNull(r);
- assertFalse(r.getDetails().toString(), r.isSuccess());
- assertEquals(Result.ResultType.TRANSITIVE_ERROR, r.getDetails().iterator().next().getResultType());
- }
- assertTrue(results.isEmpty());
- }
- }
-
- @Test
- public void requireThatCouldNotFeedErrorIsHandledProperly() throws Exception {
- testServerWithMock(new V3MockParsingRequestHandler(
- 200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), false, false);
- }
-
- @Test
- public void requireThatImmediateDisconnectIsHandledProperly() throws Exception {
- testServerWithMock(new V3MockParsingRequestHandler(
- 200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), true, false);
- }
- @Test
- public void testConditionNotMet() throws Exception {
- testServerWithMock(new V3MockParsingRequestHandler(
- 200, V3MockParsingRequestHandler.Scenario.CONDITON_NOT_MET), false, true);
- }
-
-}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java
new file mode 100644
index 00000000000..9b563e193d5
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java
@@ -0,0 +1,125 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core;
+
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedEndpointException;
+import com.yahoo.vespa.http.client.ManualClock;
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import com.yahoo.vespa.http.client.core.communication.ClusterConnection;
+import com.yahoo.vespa.http.client.core.communication.IOThread;
+import com.yahoo.vespa.http.client.core.communication.IOThreadTest;
+import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
+import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Helper for testing with an operation processor
+ *
+ * @author bratseth
+ */
+public class OperationProcessorTester {
+
+ private final Endpoint endpoint;
+ private final int clusterId = 0;
+ private final ManualClock clock;
+ private final TestResultCallback resultCallback;
+ private final OperationProcessor operationProcessor;
+
+ public OperationProcessorTester() {
+ endpoint = Endpoint.create("test-endpoint");
+ SessionParams.Builder params = new SessionParams.Builder();
+ Cluster.Builder clusterParams = new Cluster.Builder();
+ clusterParams.addEndpoint(endpoint);
+ params.addCluster(clusterParams.build());
+ ConnectionParams.Builder connectionParams = new ConnectionParams.Builder();
+ connectionParams.setDryRun(true);
+ connectionParams.setRunThreads(false);
+ params.setConnectionParams(connectionParams.build());
+
+ clock = new ManualClock(Instant.ofEpochMilli(0));
+ resultCallback = new TestResultCallback();
+ operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()),
+ resultCallback,
+ params.build(),
+ new ScheduledThreadPoolExecutor(1),
+ clock);
+ }
+
+ public ManualClock clock() { return clock; }
+
+ /** Asserts that this has but a single IOThread and returns it */
+ public IOThread getSingleIOThread() {
+ assertEquals(1, clusterConnections().size());
+ assertEquals(1, clusterConnections().get(0).ioThreads().size());
+ return clusterConnections().get(0).ioThreads().get(0);
+ }
+
+ /** Do n iteration of work in all io threads of this */
+ public void tick(int n) {
+ for (int i = 0; i < n; i++)
+ for (ClusterConnection cluster : operationProcessor.clusters())
+ for (IOThread thread : cluster.ioThreads())
+ thread.tick();
+ }
+
+ public void send(String documentId) {
+ operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant()));
+ }
+
+ public int incomplete() {
+ return operationProcessor.getIncompleteResultQueueSize();
+ }
+
+ public int success() {
+ return resultCallback.successes;
+ }
+
+ public List<ClusterConnection> clusterConnections() {
+ return operationProcessor.clusters();
+ }
+
+ public int failures() {
+ return resultCallback.failures;
+ }
+
+ public int endpointExceptions() {
+ return resultCallback.endpointExceptions;
+ }
+
+ public Result lastResult() {
+ return resultCallback.lastResult;
+ }
+
+ private static class TestResultCallback implements FeedClient.ResultCallback {
+
+ private int successes = 0;
+ private int failures = 0;
+ private int endpointExceptions = 0;
+ private Result lastResult;
+
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ this.lastResult = documentResult;
+ if (documentResult.isSuccess())
+ successes++;
+ else
+ failures++;
+ }
+
+ @Override
+ public void onEndpointException(FeedEndpointException exception) {
+ endpointExceptions++;
+ }
+
+ }
+
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 59fb968906f..e684c929fda 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -1,232 +1,150 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.communication;
-import com.yahoo.vespa.http.client.FeedConnectException;
+import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.FeedProtocolException;
-import com.yahoo.vespa.http.client.ManualClock;
import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.V3HttpAPITest;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.OperationProcessorTester;
import com.yahoo.vespa.http.client.core.ServerResponseException;
import org.junit.Test;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.time.Clock;
import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-// DO NOT ADD TESTS HERE, add to NewIOThreadTest
-public class IOThreadTest {
-
- private static final Endpoint ENDPOINT = Endpoint.create("myhost");
-
- final Clock clock = Clock.systemUTC();
- final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class);
- final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class);
- final String exceptionMessage = "SOME EXCEPTION FOO";
- CountDownLatch latch = new CountDownLatch(1);
- String docId1 = V3HttpAPITest.documents.get(0).getDocumentId();
- Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(),
- V3HttpAPITest.documents.get(0).getContents(),
- null,
- clock.instant());
- String docId2 = V3HttpAPITest.documents.get(1).getDocumentId();
- Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(),
- V3HttpAPITest.documents.get(1).getContents(),
- null,
- clock.instant());
- DocumentQueue documentQueue = new DocumentQueue(4, clock);
-
- public IOThreadTest() {
- when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT);
- }
-
- /**
- * Set up mock so that it can handle both failDocument() and resultReceived().
- *
- * @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail.
- * @param expectedDocIdOk on ok, this has to be the doc id, or the mock will fail.
- * @param isTransient checked on failure, if different, the mock will fail.
- * @param expectedException checked on failure, if exception toString is different, the mock will fail.
- */
- void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk, boolean isTransient, String expectedException) {
- doAnswer(invocation -> {
- EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0];
- assertThat(endpointResult.getOperationId(), is(expectedDocIdFail));
- assertThat(endpointResult.getDetail().getException().toString(), containsString(expectedException));
- assertThat(endpointResult.getDetail().getResultType(), is(isTransient ? Result.ResultType.TRANSITIVE_ERROR : Result.ResultType.FATAL_ERROR));
-
- latch.countDown();
- return null;
- }).when(endpointResultQueue).failOperation(any(), eq(0));
-
- doAnswer(invocation -> {
- EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0];
- assertThat(endpointResult.getOperationId(), is(expectedDocIdOk));
- assertThat(endpointResult.getDetail().getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- latch.countDown();
- return null;
- }).when(endpointResultQueue).resultReceived(any(), eq(0));
- }
- private IOThread createIOThread(int maxInFlightRequests, long localQueueTimeOut) {
- return new IOThread(null,
- ENDPOINT,
- endpointResultQueue,
- new SingletonGatewayConnectionFactory(apacheGatewayConnection),
- 0,
- 0,
- maxInFlightRequests,
- Duration.ofMillis(localQueueTimeOut),
- documentQueue,
- 0,
- Duration.ofSeconds(15),
- true,
- 10,
- clock);
- }
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
- @Test
- public void singleDocumentSuccess() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- InputStream serverResponse = new ByteArrayInputStream(
- (docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any())).thenReturn(serverResponse);
- setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage);
- try (IOThread ioThread = createIOThread(10000, 10000)) {
- ioThread.post(doc1);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
- }
+/**
+ * TODO: Migrate IOThreadTests here.
+ *
+ * @author bratseth
+ */
+public class IOThreadTest {
@Test
- public void testDocumentWriteError() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- when(apacheGatewayConnection.write(any())).thenThrow(new IOException(exceptionMessage));
- setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage);
- try (IOThread ioThread = createIOThread(10000, 10000)) {
- ioThread.post(doc1);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ public void testSuccessfulWriting() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ assertEquals(0, tester.incomplete());
+ assertEquals(0, tester.success());
+ assertEquals(0, tester.failures());
+ tester.send("doc1");
+ tester.send("doc2");
+ tester.send("doc3");
+ assertEquals(3, tester.incomplete());
+ assertEquals(0, tester.success());
+ assertEquals(0, tester.failures());
+ tester.tick(1); // connect
+ assertEquals(3, tester.incomplete());
+ tester.tick(1); // sync
+ assertEquals(3, tester.incomplete());
+ tester.tick(1); // process queue
+ assertEquals(0, tester.incomplete());
+ assertEquals(3, tester.success());
+ assertEquals(0, tester.failures());
}
@Test
- public void testTwoDocumentsFirstWriteErrorSecondOk() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- InputStream serverResponse = new ByteArrayInputStream(
- (docId2 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any()))
- .thenThrow(new IOException(exceptionMessage))
- .thenReturn(serverResponse);
- latch = new CountDownLatch(2);
- setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage);
-
- try (IOThread ioThread = createIOThread(10000, 10000)) {
- ioThread.post(doc1);
- ioThread.post(doc2);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ public void testExceptionOnConnect() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized"));
+
+ tester.send("doc1");
+ tester.tick(3);
+ assertEquals(1, tester.incomplete());
+ assertEquals(0, ioThread.resultQueue().getPendingSize());
+ assertEquals(0, tester.success());
+ assertEquals("Awaiting retry", 0, tester.failures());
}
@Test
- public void testQueueTimeOutNoNoConnectionToServer() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(false);
- InputStream serverResponse = new ByteArrayInputStream(("").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any())).thenReturn(serverResponse);
- setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true,
- "java.lang.Exception: Not sending document operation, timed out in queue after");
- try (IOThread ioThread = createIOThread(10, 10)) {
- ioThread.post(doc1);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ public void testExceptionOnHandshake() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized"));
+
+ tester.send("doc1");
+ tester.tick(3);
+ assertEquals(1, tester.incomplete());
+ assertEquals(0, ioThread.resultQueue().getPendingSize());
+ assertEquals(0, tester.success());
+ assertEquals("Awaiting retry", 0, tester.failures());
}
@Test
- public void testEndpointProtocolExceptionPropagation()
- throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- int errorCode = 403;
- String errorMessage = "Not authorized";
- doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake();
- Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
-
- try (IOThread ioThread = createIOThread(10, 10)) {
- ioThread.post(doc1);
- FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
- assertThat(reportedException, instanceOf(FeedProtocolException.class));
- FeedProtocolException actualException = (FeedProtocolException) reportedException;
- assertThat(actualException.getHttpStatusCode(), equalTo(errorCode));
- assertThat(actualException.getHttpResponseMessage(), equalTo(errorMessage));
- assertThat(actualException.getEndpoint(), equalTo(ENDPOINT));
- assertThat(actualException.getMessage(), equalTo("Endpoint 'myhost:4080' returned an error on handshake: 403 - Not authorized"));
- }
+ public void testExceptionOnWrite() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ firstConnection.throwOnWrite(new IOException("Test failure"));
+
+ tester.send("doc1");
+ tester.tick(3);
+ assertEquals(1, tester.incomplete());
+ assertEquals(0, ioThread.resultQueue().getPendingSize());
+ assertEquals(0, tester.success());
+ assertEquals("Awaiting retry since write exceptions is a transient failure",
+ 0, tester.failures());
}
@Test
- public void testEndpointConnectExceptionsPropagation()
- throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- String errorMessage = "generic error message";
- IOException cause = new IOException(errorMessage);
- doThrow(cause).when(apacheGatewayConnection).handshake();
- Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
-
- try (IOThread ioThread = createIOThread(10, 10)) {
- ioThread.post(doc1);
- FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
- assertThat(reportedException, instanceOf(FeedConnectException.class));
- FeedConnectException actualException = (FeedConnectException) reportedException;
- assertThat(actualException.getCause(), equalTo(cause));
- assertThat(actualException.getEndpoint(), equalTo(ENDPOINT));
- assertThat(actualException.getMessage(), equalTo("Handshake to endpoint 'myhost:4080' failed: generic error message"));
- }
- }
-
- private static Future<FeedEndpointException> endpointErrorCapturer(EndpointResultQueue endpointResultQueue) {
- CompletableFuture<FeedEndpointException> futureResult = new CompletableFuture<>();
- doAnswer(invocation -> {
- if (futureResult.isDone()) return null;
- FeedEndpointException reportedException = (FeedEndpointException) invocation.getArguments()[0];
- futureResult.complete(reportedException);
- return null;
- }).when(endpointResultQueue).onEndpointError(any());
- return futureResult;
+ public void testPollingOldConnections() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ tester.tick(3);
+
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ assertEquals(0, ioThread.oldConnections().size());
+
+ firstConnection.hold(true);
+ tester.send("doc1");
+ tester.tick(1);
+
+ tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15
+ tester.tick(3);
+
+ assertEquals(1, ioThread.oldConnections().size());
+ assertEquals(firstConnection, ioThread.oldConnections().get(0));
+ assertNotSame(firstConnection, ioThread.currentConnection());
+ assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000);
+
+ // Check old connection poll pattern (exponential backoff)
+ assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+
+ tester.clock().advance(Duration.ofSeconds(200));
+ tester.tick(1);
+ assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size());
}
- private static final class SingletonGatewayConnectionFactory implements GatewayConnectionFactory {
-
- private final GatewayConnection singletonConnection;
-
- SingletonGatewayConnectionFactory(GatewayConnection singletonConnection) {
- this.singletonConnection = singletonConnection;
- }
-
- @Override
- public GatewayConnection newConnection() { return singletonConnection; }
-
+ private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds,
+ int advanceSeconds,
+ DryRunGatewayConnection connection,
+ OperationProcessorTester tester) {
+ tester.clock().advance(Duration.ofSeconds(advanceSeconds));
+ tester.tick(1);
+ assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000);
}
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java
deleted file mode 100644
index 7bac8f9cd9d..00000000000
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.ManualClock;
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.EndpointResult;
-import com.yahoo.vespa.http.client.core.ThrottlePolicy;
-import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
-import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
-import org.junit.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-
-/**
- * TODO: Migrate IOThreadTests here.
- *
- * @author bratseth
- */
-public class NewIOThreadTest {
-
- @Test
- public void testBasics() {
- OperationProcessorTester tester = new OperationProcessorTester();
- assertEquals(0, tester.inflight());
- assertEquals(0, tester.success());
- assertEquals(0, tester.failures());
- tester.send("doc1");
- tester.send("doc2");
- tester.send("doc3");
- assertEquals(3, tester.inflight());
- assertEquals(0, tester.success());
- assertEquals(0, tester.failures());
- tester.success("doc1");
- tester.success("doc2");
- tester.success("doc3");
- assertEquals(0, tester.inflight());
- assertEquals(3, tester.success());
- assertEquals(0, tester.failures());
- }
-
- @Test
- public void testPollingOldConnections() {
- OperationProcessorTester tester = new OperationProcessorTester();
- tester.tick(3);
-
- assertEquals(1, tester.clusterConnections().size());
- assertEquals(1, tester.clusterConnections().get(0).ioThreads().size());
- IOThread ioThread = tester.clusterConnections().get(0).ioThreads().get(0);
- DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
- assertEquals(0, ioThread.oldConnections().size());
-
- firstConnection.hold(true);
- tester.send("doc1");
- tester.tick(1);
-
- tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15
- tester.tick(3);
-
- assertEquals(1, ioThread.oldConnections().size());
- assertEquals(firstConnection, ioThread.oldConnections().get(0));
- assertNotSame(firstConnection, ioThread.currentConnection());
- assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000);
-
- // Check old connection poll pattern (exponential backoff)
- assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
-
- tester.clock().advance(Duration.ofSeconds(200));
- tester.tick(1);
- assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size());
- }
-
- private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds,
- int advanceSeconds,
- DryRunGatewayConnection connection,
- OperationProcessorTester tester) {
- tester.clock().advance(Duration.ofSeconds(advanceSeconds));
- tester.tick(1);
- assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000);
- }
-
- private static class OperationProcessorTester {
-
- private final Endpoint endpoint;
- private final int clusterId = 0;
- private final ManualClock clock;
- private final TestResultCallback resultCallback;
- private final OperationProcessor operationProcessor;
-
- public OperationProcessorTester() {
- endpoint = Endpoint.create("test-endpoint");
- SessionParams.Builder params = new SessionParams.Builder();
- Cluster.Builder clusterParams = new Cluster.Builder();
- clusterParams.addEndpoint(endpoint);
- params.addCluster(clusterParams.build());
- ConnectionParams.Builder connectionParams = new ConnectionParams.Builder();
- connectionParams.setDryRun(true);
- connectionParams.setRunThreads(false);
- params.setConnectionParams(connectionParams.build());
-
- clock = new ManualClock(Instant.ofEpochMilli(0));
- resultCallback = new TestResultCallback();
- operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()),
- resultCallback,
- params.build(),
- new ScheduledThreadPoolExecutor(1),
- clock);
- }
-
- public ManualClock clock() { return clock; }
-
- /** Do n iteration of work in all io threads of this */
- public void tick(int n) {
- for (int i = 0; i < n; i++)
- for (ClusterConnection cluster : operationProcessor.clusters())
- for (IOThread thread : cluster.ioThreads())
- thread.tick();
- }
-
- public void send(String documentId) {
- operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant()));
- }
-
- public void success(String documentId) {
- operationProcessor.resultReceived(new EndpointResult(documentId, new Result.Detail(endpoint)), clusterId);
- }
-
- public int inflight() {
- return operationProcessor.getIncompleteResultQueueSize();
- }
-
- public int success() {
- return resultCallback.successes;
- }
-
- public List<ClusterConnection> clusterConnections() {
- return operationProcessor.clusters();
- }
-
- public int failures() {
- return resultCallback.failures;
- }
-
- }
-
- private static class TestResultCallback implements FeedClient.ResultCallback {
-
- private int successes = 0;
- private int failures = 0;
-
- @Override
- public void onCompletion(String docId, Result documentResult) {
- successes++;
- }
-
- @Override
- public void onEndpointException(FeedEndpointException exception) {
- failures++;
- }
-
-
- }
-
-}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
index ec929d68efb..9ea66b57fb3 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
@@ -12,11 +12,8 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
-import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.eq;
@@ -28,14 +25,14 @@ public class IncompleteResultsThrottlerTest {
@Test
public void simpleStaticQueueSizeTest() {
IncompleteResultsThrottler incompleteResultsThrottler = new IncompleteResultsThrottler(2, 2, null, null);
- assertThat(incompleteResultsThrottler.waitingThreads(), is(0));
+ assertEquals(0, incompleteResultsThrottler.waitingThreads());
incompleteResultsThrottler.operationStart();
incompleteResultsThrottler.operationStart();
- assertThat(incompleteResultsThrottler.waitingThreads(), is(2));
+ assertEquals(2, incompleteResultsThrottler.waitingThreads());
incompleteResultsThrottler.resultReady(true);
- assertThat(incompleteResultsThrottler.waitingThreads(), is(1));
+ assertEquals(1, incompleteResultsThrottler.waitingThreads());
incompleteResultsThrottler.resultReady(true);
- assertThat(incompleteResultsThrottler.waitingThreads(), is(0));
+ assertEquals(0, incompleteResultsThrottler.waitingThreads());
}
/**