diff options
Diffstat (limited to 'vespa-http-client/src')
13 files changed, 308 insertions, 2064 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java index d3e4b4ed762..3a67c80224f 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java @@ -11,6 +11,7 @@ import com.yahoo.vespa.http.client.config.Endpoint; * @author bjorncs */ public abstract class FeedEndpointException extends RuntimeException { + private final Endpoint endpoint; protected FeedEndpointException(String message, Throwable cause, Endpoint endpoint) { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java index a4bd5d51496..ee107a7d4d2 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java @@ -7,6 +7,7 @@ import static java.lang.Math.min; /** * Class that has a method for finding next maxInFlight. + * * @author dybis */ public class ThrottlePolicy { @@ -16,6 +17,7 @@ public class ThrottlePolicy { /** * Generate nex in-flight value for throttling. + * * @param maxPerformanceChange This value limit the dynamics of the algorithm. * @param numOk number of success in last phase * @param previousNumOk number of success in previous (before last) phase. @@ -27,8 +29,7 @@ public class ThrottlePolicy { public int calcNewMaxInFlight(double maxPerformanceChange, int numOk, int previousNumOk, int previousMaxInFlight, int maxInFlightNow, boolean messagesQueued) { - double difference = calculateRuleBasedDifference( - maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow); + double difference = calculateRuleBasedDifference(maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow); boolean previousRunWasBetter = numOk < previousNumOk; boolean previousRunHadLessInFlight = previousMaxInFlight < maxInFlightNow; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index 129fc000271..623ea543ffb 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -5,8 +5,10 @@ import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.core.Document; import com.yahoo.vespa.http.client.core.ErrorCode; import com.yahoo.vespa.http.client.core.OperationStatus; +import com.yahoo.vespa.http.client.core.ServerResponseException; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Clock; @@ -29,7 +31,13 @@ public class DryRunGatewayConnection implements GatewayConnection { /** Set to true to hold off responding with a result to any incoming operations until this is set false */ private boolean hold = false; - private List<Document> held = new ArrayList<>(); + private final List<Document> held = new ArrayList<>(); + + /** If this is set, handshake operations will throw this exception */ + private ServerResponseException throwThisOnHandshake = null; + + /** If this is set, all write operations will throw this exception */ + private IOException throwThisOnWrite = null; public DryRunGatewayConnection(Endpoint endpoint, Clock clock) { this.endpoint = endpoint; @@ -37,27 +45,27 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public InputStream write(List<Document> docs) { - StringBuilder result = new StringBuilder(); + public InputStream write(List<Document> docs) throws IOException { + if (throwThisOnWrite != null) + throw throwThisOnWrite; + if (hold) { held.addAll(docs); + return new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)); } else { + StringBuilder result = new StringBuilder(); for (Document doc : held) result.append(okResponse(doc).render()); held.clear(); for (Document doc : docs) result.append(okResponse(doc).render()); + return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8)); } - return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8)); - } - - public void hold(boolean hold) { - this.hold = hold; } @Override - public InputStream poll() { + public InputStream poll() throws IOException { lastPollTime = clock.instant(); return write(new ArrayList<>()); } @@ -66,7 +74,7 @@ public class DryRunGatewayConnection implements GatewayConnection { public Instant lastPollTime() { return lastPollTime; } @Override - public InputStream drain() { + public InputStream drain() throws IOException { return write(new ArrayList<>()); } @@ -85,14 +93,29 @@ public class DryRunGatewayConnection implements GatewayConnection { } @Override - public void handshake() { } + public void handshake() throws ServerResponseException { + if (throwThisOnHandshake != null) + throw throwThisOnHandshake; + } @Override public void close() { } + public void hold(boolean hold) { + this.hold = hold; + } + /** Returns the document currently held in this */ public List<Document> held() { return Collections.unmodifiableList(held); } + public void throwOnWrite(IOException throwThisOnWrite) { + this.throwThisOnWrite = throwThisOnWrite; + } + + public void throwOnHandshake(ServerResponseException throwThisOnHandshake) { + this.throwThisOnHandshake = throwThisOnHandshake; + } + private OperationStatus okResponse(Document document) { return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, ""); } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java index 1dd8b3bf3ec..a5a37a31665 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java @@ -65,7 +65,6 @@ class EndpointResultQueue { private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) { operationProcessor.resultReceived(result, clusterId); - TimerFuture timerFuture = futureByOperation.remove(result.getOperationId()); if (timerFuture == null) { if (duplicateGivesWarning) { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 8f052a1b3f6..652f65489db 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -34,7 +34,7 @@ import java.util.logging.Logger; * * @author Einar M R Rosenvinge */ -class IOThread implements Runnable, AutoCloseable { +public class IOThread implements Runnable, AutoCloseable { private static final Logger log = Logger.getLogger(IOThread.class.getName()); @@ -51,7 +51,6 @@ class IOThread implements Runnable, AutoCloseable { private final int maxChunkSizeBytes; private final int maxInFlightRequests; private final Duration localQueueTimeOut; - private final Duration maxOldConnectionPollInterval; private final GatewayThrottler gatewayThrottler; private final Duration connectionTimeToLive; private final long pollIntervalUS; @@ -107,9 +106,6 @@ class IOThread implements Runnable, AutoCloseable { this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s] this.clock = clock; this.localQueueTimeOut = localQueueTimeOut; - this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10).toMillis() > pollIntervalUS / 1000 - ? localQueueTimeOut.dividedBy(10) - : Duration.ofMillis(pollIntervalUS / 1000); if (runThreads) { this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); thread.setDaemon(true); @@ -231,9 +227,10 @@ class IOThread implements Runnable, AutoCloseable { private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) { for (Document doc : docs) { - resultQueue.failOperation( - EndPointResultFactory.createTransientError( - endpoint, doc.getOperationId(), servletException), clusterId); + resultQueue.failOperation(EndPointResultFactory.createTransientError(endpoint, + doc.getOperationId(), + servletException), + clusterId); } } @@ -327,8 +324,8 @@ class IOThread implements Runnable, AutoCloseable { } catch (Throwable throwable1) { drainFirstDocumentsInQueueIfOld(); - log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint - + "'. Will re-try connecting. Failed with '" + Exceptions.toMessageString(throwable1) + "'",throwable1); + log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint + "'. Will re-try connecting.", + throwable1); executeProblemsCounter.incrementAndGet(); return ConnectionState.DISCONNECTED; } @@ -341,8 +338,9 @@ class IOThread implements Runnable, AutoCloseable { } catch (ServerResponseException ser) { executeProblemsCounter.incrementAndGet(); - log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint - + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(ser) + "'",ser); + log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint + + "' failed. Will re-try handshake.", + ser); drainFirstDocumentsInQueueIfOld(); resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint)); @@ -350,8 +348,9 @@ class IOThread implements Runnable, AutoCloseable { } catch (Throwable throwable) { // This cover IOException as well executeProblemsCounter.incrementAndGet(); resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint)); - log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint - + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable); + log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + + endpoint + "' failed. Will re-try handshake.", + throwable); drainFirstDocumentsInQueueIfOld(); currentConnection.close(); return ConnectionState.DISCONNECTED; @@ -366,14 +365,14 @@ class IOThread implements Runnable, AutoCloseable { } catch (ServerResponseException ser) { log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint + - "'. Will re-try. Endpoint responded with an unexpected HTTP response code. '" - + Exceptions.toMessageString(ser) + "'",ser); + "'. Will re-try. Endpoint responded with an unexpected HTTP response code.", + ser); return ConnectionState.CONNECTED; } catch (Throwable e) { - log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint + - "'. Will re-try. Connection level error. Failed with '" + - Exceptions.toMessageString(e) + "'", e); + log.log(Level.INFO, + "Connection level error handing data over to endpoint '" + endpoint + "'. Will re-try.", + e); currentConnection.close(); return ConnectionState.DISCONNECTED; } @@ -532,4 +531,7 @@ class IOThread implements Runnable, AutoCloseable { /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */ public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); } + /** For testing */ + public EndpointResultQueue resultQueue() { return resultQueue; } + } diff --git a/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java index 909131a8979..6110c04e85f 100644 --- a/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java +++ b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java @@ -28,13 +28,11 @@ public class ExampleUsageFeedClientTest { Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - exampleCode("localhost", serverA.getPort(), "localhost", serverB.getPort()); serverA.close(); serverB.close(); } - private static CharSequence generateDocument(String docId) { // Just a dummy example of an update document operation. return "{\"update\": \""+ docId + "\"," diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java deleted file mode 100644 index 0813cb36078..00000000000 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java +++ /dev/null @@ -1,382 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler; -import org.junit.Test; - -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.time.Clock; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import static com.yahoo.vespa.http.client.TestUtils.writeDocument; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -/** - * - * @author Einar M R Rosenvinge - */ -@SuppressWarnings("deprecation") -public class QueueBoundsTest { - - public static final List<TestDocument> documents; - - static { - List<TestDocument> docs = new ArrayList<>(); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bobdylan/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n" + - " <title>Best of Bob Dylan</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/oleivars/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/oleivars/BestOf\">\n" + - " <title>Best of Ole Ivars</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf\">\n" + - " <title>Best of Bjarne Fritjofs</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/larryingvars/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/larryingvars/BestOf\">\n" + - " <title>Best of Larry Ingvars</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - documents = Collections.unmodifiableList(docs); - } - - @Test - public void requireThatFullInputQueueBlocksAndUnblocks() throws Exception { - V3MockParsingRequestHandler mockXmlParsingRequestHandler = new V3MockParsingRequestHandler(); - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION); - try (Server server = new Server(mockXmlParsingRequestHandler, 0); - Session session = - new com.yahoo.vespa.http.client.core.api.SessionImpl( - new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", server.getPort(), false)) - .build()) - .setFeedParams(new FeedParams.Builder() - .setMaxChunkSizeBytes(1) - .setMaxInFlightRequests(1) - .setLocalQueueTimeOut(40000) - .build()) - .setConnectionParams(new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .build()) - .setClientQueueSize(2) - .build(), - SessionFactory.createTimeoutExecutor(), - Clock.systemUTC())) { - FeederThread feeder = new FeederThread(session); - try { - feeder.start(); - assertFeedNotBlocking(feeder, 0); - assertThat(session.results().size(), is(0)); - assertFeedNotBlocking(feeder, 1); - assertThat(session.results().size(), is(0)); - CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 2); - assertThat(session.results().size(), is(0)); - - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK); - lastPostFeed.await(60, TimeUnit.SECONDS); - assertThat(lastPostFeed.getCount(), equalTo(0L)); - assertResultQueueSize(session, 3, 60, TimeUnit.SECONDS); - } finally { - feeder.stop(); - } - } - } - - @Test - public void requireThatFullIntermediateQueueBlocksAndUnblocks() throws Exception { - V3MockParsingRequestHandler slowHandler = - new V3MockParsingRequestHandler("B", HttpServletResponse.SC_OK, - V3MockParsingRequestHandler.Scenario.DELAYED_RESPONSE); - - try (Server serverA = new Server(new V3MockParsingRequestHandler("A"), 0); - Server serverB = new Server(slowHandler, 0); - com.yahoo.vespa.http.client.core.api.SessionImpl session = new com.yahoo.vespa.http.client.core.api.SessionImpl( - new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .setFeedParams(new FeedParams.Builder() - .setMaxChunkSizeBytes(1) - .build()) - .setConnectionParams(new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .build()) - .setClientQueueSize(6) //3 per cluster - .build(), SessionFactory.createTimeoutExecutor(), - Clock.systemUTC())) { - - FeederThread feeder = new FeederThread(session); - try { - feeder.start(); - assertFeedNotBlocking(feeder, 0); - assertFeedNotBlocking(feeder, 1); - assertFeedNotBlocking(feeder, 2); - - //A input queue size now: 3 - //B input queue size now: 3 - //feeder thread not blocked - //intermediate result queue size now: 3 - //result queue size now: 0 - assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS); - assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS); - - assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS); - assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS); - - //server B is slow, server A is fast - - - //A input queue size now: 0 - //B input queue size now: 2, IOThread writing 1 and blocked because of no response yet - //feeder thread still not blocked - //intermediate result queue size now: 3 - //result queue size now: 0 - assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS); - assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS); - - CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 3); - - //A input queue size now: 0 - //B input queue size now: 2, IOThread writing 1 and blocked because of no response yet - //feeder thread blocking with 1 op - //intermediate result queue size now: 3 - //result queue size now: 0 - - slowHandler.poke(); - - //A input queue size now: 0 - //B input queue size now: 2, IOThread writing 1 and blocked because of no response again - //feeder thread unblocked - //intermediate result queue size now: 3 - //result queue size now: 1 - - lastPostFeed.await(60, TimeUnit.SECONDS); - assertThat(lastPostFeed.getCount(), equalTo(0L)); - - slowHandler.pokeAllAndUnblockFromNowOn(); - - //A input queue size now: 0 - //B input queue size now: 0, IOThread not blocked - //feeder thread unblocked - //intermediate result queue size now: 0 - //result queue size now: 4 - - assertResultQueueSize(session, 4, 60, TimeUnit.SECONDS); - } finally { - slowHandler.pokeAllAndUnblockFromNowOn(); - feeder.stop(); - } - } - } - - @Test - public void testErrorRecovery() throws Exception { - V3MockParsingRequestHandler mockXmlParsingRequestHandler = new V3MockParsingRequestHandler(); - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION); - try (Server server = new Server(mockXmlParsingRequestHandler, 0); - Session session = - new com.yahoo.vespa.http.client.core.api.SessionImpl( - new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", server.getPort(), false)) - .build()) - .setFeedParams(new FeedParams.Builder() - .setMaxChunkSizeBytes(1) - .setMaxInFlightRequests(1) - .setLocalQueueTimeOut(3000) - .build()) - .setConnectionParams(new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .setClientQueueSize(1) - .build(), - SessionFactory.createTimeoutExecutor(), - Clock.systemUTC())) { - FeederThread feeder = new FeederThread(session); - feeder.start(); - try { - { - System.out.println("We start with failed connection, post a document."); - assertFeedNotBlocking(feeder, 0); - assertEquals(0, session.results().size()); - - CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 1); - System.out.println("No result so far."); - assertEquals(0, session.results().size()); - System.out.println("Make connection ok."); - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK); - assert(lastPostFeed.await(120, TimeUnit.SECONDS)); - assertEquals(0L, lastPostFeed.getCount()); - assertResultQueueSize(session, 2, 120, TimeUnit.SECONDS); - } - - System.out.println("Take down connection"); - - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION); - { - assertFeedNotBlocking(feeder, 2); - System.out.println("Fed one document, fit in queue."); - assertEquals(2, session.results().size()); - System.out.println("Fed one document more, wait for failure."); - - assertFeedNotBlocking(feeder, 3); - System.out.println("Wait for results for all three documents."); - while (session.results().size() != 3) { - Thread.sleep(1); - } - System.out.println("Back to ok, test feeding again."); - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK); - assertResultQueueSize(session, 4, 120, TimeUnit.SECONDS); - } - int errors = 0; - for (Result result : session.results()) { - assertEquals(1, result.getDetails().size()); - if (! result.isSuccess()) { - errors++; - } - } - assertEquals(1, errors); - } finally { - feeder.stop(); - } - } - } - - private void assertResultQueueSize(Session session, int size, long timeout, TimeUnit timeUnit) throws InterruptedException { - long timeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit); - long waitTimeMs = 0; - while (true) { - if (session.results().size() == size) { - break; - } - Thread.sleep(100); - waitTimeMs += 100; - if (waitTimeMs > timeoutMs) { - fail("Queue never reached size " + size + " before timeout of " + timeout + " " + timeUnit + ". Size now: " + session.results().size()); - } - } - } - - private void assertIncompleteResultQueueSize(com.yahoo.vespa.http.client.core.api.SessionImpl session, int size, long timeout, TimeUnit timeUnit) throws InterruptedException { - long timeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit); - long waitTimeMs = 0; - while (true) { - if (session.getIncompleteResultQueueSize() == size) { - break; - } - Thread.sleep(100); - waitTimeMs += 100; - if (waitTimeMs > timeoutMs) { - fail("Queue of incomplete results never reached size " + size + " before timeout of " + timeout + " " + timeUnit + ". Size now: " + session.getIncompleteResultQueueSize()); - } - } - } - - private CountDownLatch assertFeedBlocking(FeederThread feeder, int idx) throws InterruptedException { - CountDownLatch preFeed = new CountDownLatch(1); - CountDownLatch postFeed = new CountDownLatch(1); - feeder.documents.add(new Triplet<>(preFeed, documents.get(idx), postFeed)); - preFeed.await(60, TimeUnit.SECONDS); - assertThat(preFeed.getCount(), equalTo(0L)); - postFeed.await(2, TimeUnit.SECONDS); - assertThat(feeder.getState(), not(equalTo(Thread.State.RUNNABLE))); - assertThat(postFeed.getCount(), equalTo(1L)); - return postFeed; - } - - private void assertFeedNotBlocking(FeederThread feeder, int idx) throws InterruptedException { - CountDownLatch preFeed = new CountDownLatch(1); - CountDownLatch postFeed = new CountDownLatch(1); - feeder.documents.add(new Triplet<>(preFeed, documents.get(idx), postFeed)); - preFeed.await(60, TimeUnit.SECONDS); - assertThat(preFeed.getCount(), equalTo(0L)); - postFeed.await(60, TimeUnit.SECONDS); - assertThat(postFeed.getCount(), equalTo(0L)); - } - - public static class Triplet<F, S, T> { - public final F first; - public final S second; - public final T third; - - public Triplet(final F first, final S second, final T third) { - this.first = first; - this.second = second; - this.third = third; - } - } - - private class FeederThread implements Runnable { - private final Session session; - private final BlockingQueue<Triplet<CountDownLatch, TestDocument, CountDownLatch>> documents = new LinkedBlockingQueue<>(); - private final Thread thread; - private volatile boolean shouldRun = true; - - FeederThread(Session session) { - this.session = session; - this.thread = new Thread(this); - } - - public void start() { - this.thread.start(); - } - - public void stop() { - shouldRun = false; - thread.interrupt(); - } - - public Thread.State getState() { - return thread.getState(); - } - - @Override - public void run() { - try { - while (shouldRun) { - Triplet<CountDownLatch, TestDocument, CountDownLatch> triplet = documents.poll(200, TimeUnit.MILLISECONDS); - if (triplet == null) { - continue; - } - triplet.first.countDown(); - writeDocument(session, triplet.second); - triplet.third.countDown(); - } - } catch (IOException ioe) { - ioe.printStackTrace(); - } catch (InterruptedException e) { - //ignore - } catch (RuntimeException re) { - if (re.getCause() == null || (!(re.getCause() instanceof InterruptedException))) { - re.printStackTrace(); - } - } - - } - - } -} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java deleted file mode 100644 index 5fbc367c620..00000000000 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java +++ /dev/null @@ -1,1042 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static com.yahoo.vespa.http.client.TestUtils.getResults; -import static com.yahoo.vespa.http.client.V3HttpAPITest.documents; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.assertThat; - -/** - * - * @author Einar M R Rosenvinge - */ -@SuppressWarnings("deprecation") -public class V3HttpAPIMultiClusterTest { - - private void writeDocuments(Session session) throws IOException { - TestUtils.writeDocuments(session, documents); - } - - private void writeDocument(Session session) throws IOException { - TestUtils.writeDocuments(session, Collections.<TestDocument>singletonList(documents.get(0))); - } - - private void waitForHandshakesOk(int handshakes, Session session) throws InterruptedException { - int waitedTimeMs = 0; - while (session.getStatsAsJson().split("\"successfullHandshakes\":1").length != 1+ handshakes) { - waitedTimeMs += 100; - assert(waitedTimeMs < 300000); - Thread.sleep(100); - } - } - - @Test - public void testOpenClose() throws Exception { - try (Server serverA = new Server( - new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Session session = SessionFactory.create(new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .build())) { - assertThat(session, notNullValue()); - } - } - - @Test - public void testPriorityAndTraceFlag() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.EXPECT_HIGHEST_PRIORITY_AND_TRACELEVEL_123), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .setConnectionParams(new ConnectionParams.Builder() - .setTraceLevel(123) - .build()) - .setFeedParams(new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setPriority("HIGHEST") - .build()) - .build())) { - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat("Results received: " + results.values(), results.size(), is(documents.size())); - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r.getDetails().toString(), r.isSuccess(), is(true)); - } - } - } - - @Test - public void testRetries() throws Exception { - V3MockParsingRequestHandler unstableServer = - new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK); - V3MockParsingRequestHandler failedServer = - new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK); - try (Server serverA = new Server(failedServer, 0); - Server serverB = new Server(unstableServer, 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build() - ) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build() - ) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(2) - .setMinTimeBetweenRetries(200, TimeUnit.MILLISECONDS) - .build() - ) - .build() - )) { - waitForHandshakesOk(2, session); - // Both servers worked fine so far, handshake established. Now fail transient when trying to send - // data on both. This should cause the OperationProcessor to retry the document. One of the server - // will come up, but not the other. - unstableServer.setScenario(V3MockParsingRequestHandler.Scenario.SERVER_ERROR_TWICE_THEN_OK); - // This will cause retries, but it will not come up. - failedServer.setScenario(V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED); - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat("Results received: " + results.values(), results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(2)); - assert(r.getDetails().get(0).getResultType() != r.getDetails().get(1).getResultType()); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatFeedingWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .build())) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat("Results received: " + results.values(), results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(true)); - assertThat(r.getDetails().size(), is(3)); - assertThat(r.getDetails().get(0).getTraceMessage(), is("Trace message")); - } - assertThat(results.isEmpty(), is(true)); - final String stats = session.getStatsAsJson(); - assertThat(stats, containsString("maxChunkSizeBytes\":51200")); - } - } - - @Test - public void requireThatOneImmediateDisconnectWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(1000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - - - @Test - public void requireThatAllImmediateDisconnectWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(1000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - - @Test - public void requireThatOneTimeoutWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.NEVER_RETURN_ANY_RESULTS), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(10000) - .setServerTimeout(2, TimeUnit.SECONDS) - .setClientTimeout(2, TimeUnit.SECONDS) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - waitForHandshakesOk(3, session); - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - //assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatOneWrongSessionIdWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1000) - .setServerTimeout(2, TimeUnit.SECONDS) - .setClientTimeout(2, TimeUnit.SECONDS) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - waitForHandshakesOk(2, session); - writeDocuments(session); - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatAllWrongSessionIdWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setServerTimeout(2, TimeUnit.SECONDS) - .setClientTimeout(2, TimeUnit.SECONDS) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - //assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatOneNonAcceptedVersionWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocument(session); - - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - Result.Detail failed = details.remove(Endpoint.create("localhost", serverC.getPort(), false)); - assertThat(failed.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - for (Result.Detail detail : details.values()) { - assertThat(detail.getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - } - } - } - - @Test - public void requireThatAllNonAcceptedVersionWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - //assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatOneUnexpectedVersionWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatAllUnexpectedVersionWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocument(session); - - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - for (Result.Detail detail : r.getDetails()) { - assertThat(detail.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - } - } - - @Test - public void requireThatOneInternalServerErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocument(session); - - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - Result.Detail failed = details.remove(Endpoint.create("localhost", serverC.getPort(), false)); - assertThat(failed.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - for (Result.Detail detail : details.values()) { - assertThat(detail.getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - } - } - } - - @Test - public void requireThatAllInternalServerErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocument(session); - - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - for (Result.Detail detail : r.getDetails()) { - assertThat(detail.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - } - } - - @Test - public void requireThatOneCouldNotFeedErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatAllCouldNotFeedErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatOneMbusErrorWorks() throws Exception { - final V3MockParsingRequestHandler unstableServer = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR); - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(unstableServer, 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(100) - .build()) - .build())) { - - writeDocuments(session); - - // Make it fail, but it should still retry since it is a MBUS error that is transitive - // for the client even though fatal for message bus. - Thread.sleep(1000); - assertThat(session.results().size(), is(0)); - unstableServer.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK); - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(true)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatAllMbusErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build() - ) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .build() - ) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build() - ) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.FATAL_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.FATAL_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.FATAL_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatBadVipBehaviorDoesNotFailBadly() throws Exception { - V3MockParsingRequestHandler handlerA = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.BAD_REQUEST); - V3MockParsingRequestHandler handlerB = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.BAD_REQUEST); - - try (Server serverA = new Server(handlerA, 0); - Server serverB = new Server(handlerB, 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(1000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - //Set A in bad state => A returns bad request. - handlerA.badRequestScenarioShouldReturnBadRequest.set(true); - - //write one document, should fail - writeDocument(session); - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(2)); - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - - - //Set B in bad state => B returns bad request. - handlerB.badRequestScenarioShouldReturnBadRequest.set(true); - } //try to close session - - } -} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java deleted file mode 100644 index 780de3e695c..00000000000 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler; -import org.junit.Test; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static com.yahoo.vespa.http.client.TestUtils.getResults; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * - * @author Einar M R Rosenvinge - */ -@SuppressWarnings("deprecation") -public class V3HttpAPITest { - - public static final List<TestDocument> documents; - - static { - List<TestDocument> docs = new ArrayList<>(); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bobdylan/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n" + - " <title>Best of Bob Dylan</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/oleivars/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/oleivars/BestOf\">\n" + - " <title>Best of Ole Ivars</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf\">\n" + - " <title>Best of Bjarne Fritjofs</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - documents = Collections.unmodifiableList(docs); - } - - private void writeDocuments(Session session) throws IOException { - TestUtils.writeDocuments(session, documents); - } - - private void writeDocument(Session session) throws IOException { - TestUtils.writeDocuments(session, Collections.<TestDocument>singletonList(documents.get(0))); - } - - private void testServerWithMock(V3MockParsingRequestHandler serverMock, boolean failFast, boolean conditionNotMet) throws Exception { - try (Server server = new Server(serverMock, 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .setFeedParams(new FeedParams.Builder() - .setLocalQueueTimeOut(failFast ? 0 : 120000) - .setServerTimeout(120, TimeUnit.SECONDS) - .setClientTimeout(120, TimeUnit.SECONDS) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", server.getPort(), false)) - .build()) - .build())) { - - writeDocument(session); - Map<String, Result> results = getResults(session, 1); - assertEquals(1, results.size()); - - TestDocument document = documents.get(0); - Result r = results.remove(document.getDocumentId()); - assertNotNull(r); - if (conditionNotMet) - assertEquals(Result.ResultType.CONDITION_NOT_MET, r.getDetails().iterator().next().getResultType()); - assertFalse(r.getDetails().toString(), r.isSuccess()); - assertTrue(results.isEmpty()); - } - } - - @Test - public void testSingleDestination() throws Exception { - try (Server server = new Server(new V3MockParsingRequestHandler(), 0); - Session session = SessionFactory.create(Endpoint.create("localhost", server.getPort(), false))) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertEquals(documents.size(), results.size()); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertNotNull(r); - assertTrue(r.getDetails().toString(), r.isSuccess()); - } - assertTrue(results.isEmpty()); - } - } - - @Test - public void requireThatBadResponseCodeFails() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler(401/*Unauthorized*/), true, false); - testServerWithMock(new V3MockParsingRequestHandler(403/*Forbidden*/), true, false); - testServerWithMock(new V3MockParsingRequestHandler(407/*Proxy Authentication Required*/), true, false); - } - - @Test - public void requireThatUnexpectedVersionIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), true, false); - } - - @Test - public void requireThatNonAcceptedVersionIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), true, false); - } - - @Test - public void requireThatNon200OkIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), true, false); - } - - @Test - public void requireThatMbusErrorIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), false, false); - } - - @Test - public void requireThatTimeoutWorks() throws Exception { - try (Server server = new Server(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.NEVER_RETURN_ANY_RESULTS), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .setFeedParams(new FeedParams.Builder() - .setLocalQueueTimeOut(0) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(2) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setServerTimeout(500, TimeUnit.MILLISECONDS) - .setClientTimeout(500, TimeUnit.MILLISECONDS) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", server.getPort(), false)) - .build()) - .build())) { - - writeDocuments(session); - - Map<String, Result> results = getResults(session, documents.size()); - assertEquals(documents.size(), results.size()); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertNotNull(r); - assertFalse(r.getDetails().toString(), r.isSuccess()); - assertEquals(Result.ResultType.TRANSITIVE_ERROR, r.getDetails().iterator().next().getResultType()); - } - assertTrue(results.isEmpty()); - } - } - - @Test - public void requireThatCouldNotFeedErrorIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), false, false); - } - - @Test - public void requireThatImmediateDisconnectIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), true, false); - } - @Test - public void testConditionNotMet() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.CONDITON_NOT_MET), false, true); - } - -} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java new file mode 100644 index 00000000000..9b563e193d5 --- /dev/null +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java @@ -0,0 +1,125 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client.core; + +import com.yahoo.vespa.http.client.FeedClient; +import com.yahoo.vespa.http.client.FeedEndpointException; +import com.yahoo.vespa.http.client.ManualClock; +import com.yahoo.vespa.http.client.Result; +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.SessionParams; +import com.yahoo.vespa.http.client.core.communication.ClusterConnection; +import com.yahoo.vespa.http.client.core.communication.IOThread; +import com.yahoo.vespa.http.client.core.communication.IOThreadTest; +import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler; +import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; + +import java.time.Instant; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; + +/** + * Helper for testing with an operation processor + * + * @author bratseth + */ +public class OperationProcessorTester { + + private final Endpoint endpoint; + private final int clusterId = 0; + private final ManualClock clock; + private final TestResultCallback resultCallback; + private final OperationProcessor operationProcessor; + + public OperationProcessorTester() { + endpoint = Endpoint.create("test-endpoint"); + SessionParams.Builder params = new SessionParams.Builder(); + Cluster.Builder clusterParams = new Cluster.Builder(); + clusterParams.addEndpoint(endpoint); + params.addCluster(clusterParams.build()); + ConnectionParams.Builder connectionParams = new ConnectionParams.Builder(); + connectionParams.setDryRun(true); + connectionParams.setRunThreads(false); + params.setConnectionParams(connectionParams.build()); + + clock = new ManualClock(Instant.ofEpochMilli(0)); + resultCallback = new TestResultCallback(); + operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()), + resultCallback, + params.build(), + new ScheduledThreadPoolExecutor(1), + clock); + } + + public ManualClock clock() { return clock; } + + /** Asserts that this has but a single IOThread and returns it */ + public IOThread getSingleIOThread() { + assertEquals(1, clusterConnections().size()); + assertEquals(1, clusterConnections().get(0).ioThreads().size()); + return clusterConnections().get(0).ioThreads().get(0); + } + + /** Do n iteration of work in all io threads of this */ + public void tick(int n) { + for (int i = 0; i < n; i++) + for (ClusterConnection cluster : operationProcessor.clusters()) + for (IOThread thread : cluster.ioThreads()) + thread.tick(); + } + + public void send(String documentId) { + operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant())); + } + + public int incomplete() { + return operationProcessor.getIncompleteResultQueueSize(); + } + + public int success() { + return resultCallback.successes; + } + + public List<ClusterConnection> clusterConnections() { + return operationProcessor.clusters(); + } + + public int failures() { + return resultCallback.failures; + } + + public int endpointExceptions() { + return resultCallback.endpointExceptions; + } + + public Result lastResult() { + return resultCallback.lastResult; + } + + private static class TestResultCallback implements FeedClient.ResultCallback { + + private int successes = 0; + private int failures = 0; + private int endpointExceptions = 0; + private Result lastResult; + + @Override + public void onCompletion(String docId, Result documentResult) { + this.lastResult = documentResult; + if (documentResult.isSuccess()) + successes++; + else + failures++; + } + + @Override + public void onEndpointException(FeedEndpointException exception) { + endpointExceptions++; + } + + } + +} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index 59fb968906f..e684c929fda 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -1,232 +1,150 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.http.client.core.communication; -import com.yahoo.vespa.http.client.FeedConnectException; +import com.yahoo.vespa.http.client.FeedClient; import com.yahoo.vespa.http.client.FeedEndpointException; -import com.yahoo.vespa.http.client.FeedProtocolException; -import com.yahoo.vespa.http.client.ManualClock; import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.V3HttpAPITest; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.OperationProcessorTester; import com.yahoo.vespa.http.client.core.ServerResponseException; import org.junit.Test; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.time.Clock; import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -// DO NOT ADD TESTS HERE, add to NewIOThreadTest -public class IOThreadTest { - - private static final Endpoint ENDPOINT = Endpoint.create("myhost"); - - final Clock clock = Clock.systemUTC(); - final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class); - final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class); - final String exceptionMessage = "SOME EXCEPTION FOO"; - CountDownLatch latch = new CountDownLatch(1); - String docId1 = V3HttpAPITest.documents.get(0).getDocumentId(); - Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(), - V3HttpAPITest.documents.get(0).getContents(), - null, - clock.instant()); - String docId2 = V3HttpAPITest.documents.get(1).getDocumentId(); - Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(), - V3HttpAPITest.documents.get(1).getContents(), - null, - clock.instant()); - DocumentQueue documentQueue = new DocumentQueue(4, clock); - - public IOThreadTest() { - when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT); - } - - /** - * Set up mock so that it can handle both failDocument() and resultReceived(). - * - * @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail. - * @param expectedDocIdOk on ok, this has to be the doc id, or the mock will fail. - * @param isTransient checked on failure, if different, the mock will fail. - * @param expectedException checked on failure, if exception toString is different, the mock will fail. - */ - void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk, boolean isTransient, String expectedException) { - doAnswer(invocation -> { - EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0]; - assertThat(endpointResult.getOperationId(), is(expectedDocIdFail)); - assertThat(endpointResult.getDetail().getException().toString(), containsString(expectedException)); - assertThat(endpointResult.getDetail().getResultType(), is(isTransient ? Result.ResultType.TRANSITIVE_ERROR : Result.ResultType.FATAL_ERROR)); - - latch.countDown(); - return null; - }).when(endpointResultQueue).failOperation(any(), eq(0)); - - doAnswer(invocation -> { - EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0]; - assertThat(endpointResult.getOperationId(), is(expectedDocIdOk)); - assertThat(endpointResult.getDetail().getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - latch.countDown(); - return null; - }).when(endpointResultQueue).resultReceived(any(), eq(0)); - } - private IOThread createIOThread(int maxInFlightRequests, long localQueueTimeOut) { - return new IOThread(null, - ENDPOINT, - endpointResultQueue, - new SingletonGatewayConnectionFactory(apacheGatewayConnection), - 0, - 0, - maxInFlightRequests, - Duration.ofMillis(localQueueTimeOut), - documentQueue, - 0, - Duration.ofSeconds(15), - true, - 10, - clock); - } +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; - @Test - public void singleDocumentSuccess() throws Exception { - when(apacheGatewayConnection.connect()).thenReturn(true); - InputStream serverResponse = new ByteArrayInputStream( - (docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8)); - when(apacheGatewayConnection.write(any())).thenReturn(serverResponse); - setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage); - try (IOThread ioThread = createIOThread(10000, 10000)) { - ioThread.post(doc1); - assert (latch.await(120, TimeUnit.SECONDS)); - } - } +/** + * TODO: Migrate IOThreadTests here. + * + * @author bratseth + */ +public class IOThreadTest { @Test - public void testDocumentWriteError() throws Exception { - when(apacheGatewayConnection.connect()).thenReturn(true); - when(apacheGatewayConnection.write(any())).thenThrow(new IOException(exceptionMessage)); - setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage); - try (IOThread ioThread = createIOThread(10000, 10000)) { - ioThread.post(doc1); - assert (latch.await(120, TimeUnit.SECONDS)); - } + public void testSuccessfulWriting() { + OperationProcessorTester tester = new OperationProcessorTester(); + assertEquals(0, tester.incomplete()); + assertEquals(0, tester.success()); + assertEquals(0, tester.failures()); + tester.send("doc1"); + tester.send("doc2"); + tester.send("doc3"); + assertEquals(3, tester.incomplete()); + assertEquals(0, tester.success()); + assertEquals(0, tester.failures()); + tester.tick(1); // connect + assertEquals(3, tester.incomplete()); + tester.tick(1); // sync + assertEquals(3, tester.incomplete()); + tester.tick(1); // process queue + assertEquals(0, tester.incomplete()); + assertEquals(3, tester.success()); + assertEquals(0, tester.failures()); } @Test - public void testTwoDocumentsFirstWriteErrorSecondOk() throws Exception { - when(apacheGatewayConnection.connect()).thenReturn(true); - InputStream serverResponse = new ByteArrayInputStream( - (docId2 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8)); - when(apacheGatewayConnection.write(any())) - .thenThrow(new IOException(exceptionMessage)) - .thenReturn(serverResponse); - latch = new CountDownLatch(2); - setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage); - - try (IOThread ioThread = createIOThread(10000, 10000)) { - ioThread.post(doc1); - ioThread.post(doc2); - assert (latch.await(120, TimeUnit.SECONDS)); - } + public void testExceptionOnConnect() { + OperationProcessorTester tester = new OperationProcessorTester(); + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized")); + + tester.send("doc1"); + tester.tick(3); + assertEquals(1, tester.incomplete()); + assertEquals(0, ioThread.resultQueue().getPendingSize()); + assertEquals(0, tester.success()); + assertEquals("Awaiting retry", 0, tester.failures()); } @Test - public void testQueueTimeOutNoNoConnectionToServer() throws Exception { - when(apacheGatewayConnection.connect()).thenReturn(false); - InputStream serverResponse = new ByteArrayInputStream(("").getBytes(StandardCharsets.UTF_8)); - when(apacheGatewayConnection.write(any())).thenReturn(serverResponse); - setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, - "java.lang.Exception: Not sending document operation, timed out in queue after"); - try (IOThread ioThread = createIOThread(10, 10)) { - ioThread.post(doc1); - assert (latch.await(120, TimeUnit.SECONDS)); - } + public void testExceptionOnHandshake() { + OperationProcessorTester tester = new OperationProcessorTester(); + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized")); + + tester.send("doc1"); + tester.tick(3); + assertEquals(1, tester.incomplete()); + assertEquals(0, ioThread.resultQueue().getPendingSize()); + assertEquals(0, tester.success()); + assertEquals("Awaiting retry", 0, tester.failures()); } @Test - public void testEndpointProtocolExceptionPropagation() - throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { - when(apacheGatewayConnection.connect()).thenReturn(true); - int errorCode = 403; - String errorMessage = "Not authorized"; - doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake(); - Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); - - try (IOThread ioThread = createIOThread(10, 10)) { - ioThread.post(doc1); - FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); - assertThat(reportedException, instanceOf(FeedProtocolException.class)); - FeedProtocolException actualException = (FeedProtocolException) reportedException; - assertThat(actualException.getHttpStatusCode(), equalTo(errorCode)); - assertThat(actualException.getHttpResponseMessage(), equalTo(errorMessage)); - assertThat(actualException.getEndpoint(), equalTo(ENDPOINT)); - assertThat(actualException.getMessage(), equalTo("Endpoint 'myhost:4080' returned an error on handshake: 403 - Not authorized")); - } + public void testExceptionOnWrite() { + OperationProcessorTester tester = new OperationProcessorTester(); + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + firstConnection.throwOnWrite(new IOException("Test failure")); + + tester.send("doc1"); + tester.tick(3); + assertEquals(1, tester.incomplete()); + assertEquals(0, ioThread.resultQueue().getPendingSize()); + assertEquals(0, tester.success()); + assertEquals("Awaiting retry since write exceptions is a transient failure", + 0, tester.failures()); } @Test - public void testEndpointConnectExceptionsPropagation() - throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { - when(apacheGatewayConnection.connect()).thenReturn(true); - String errorMessage = "generic error message"; - IOException cause = new IOException(errorMessage); - doThrow(cause).when(apacheGatewayConnection).handshake(); - Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); - - try (IOThread ioThread = createIOThread(10, 10)) { - ioThread.post(doc1); - FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); - assertThat(reportedException, instanceOf(FeedConnectException.class)); - FeedConnectException actualException = (FeedConnectException) reportedException; - assertThat(actualException.getCause(), equalTo(cause)); - assertThat(actualException.getEndpoint(), equalTo(ENDPOINT)); - assertThat(actualException.getMessage(), equalTo("Handshake to endpoint 'myhost:4080' failed: generic error message")); - } - } - - private static Future<FeedEndpointException> endpointErrorCapturer(EndpointResultQueue endpointResultQueue) { - CompletableFuture<FeedEndpointException> futureResult = new CompletableFuture<>(); - doAnswer(invocation -> { - if (futureResult.isDone()) return null; - FeedEndpointException reportedException = (FeedEndpointException) invocation.getArguments()[0]; - futureResult.complete(reportedException); - return null; - }).when(endpointResultQueue).onEndpointError(any()); - return futureResult; + public void testPollingOldConnections() { + OperationProcessorTester tester = new OperationProcessorTester(); + tester.tick(3); + + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + assertEquals(0, ioThread.oldConnections().size()); + + firstConnection.hold(true); + tester.send("doc1"); + tester.tick(1); + + tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15 + tester.tick(3); + + assertEquals(1, ioThread.oldConnections().size()); + assertEquals(firstConnection, ioThread.oldConnections().get(0)); + assertNotSame(firstConnection, ioThread.currentConnection()); + assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000); + + // Check old connection poll pattern (exponential backoff) + assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + + tester.clock().advance(Duration.ofSeconds(200)); + tester.tick(1); + assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size()); } - private static final class SingletonGatewayConnectionFactory implements GatewayConnectionFactory { - - private final GatewayConnection singletonConnection; - - SingletonGatewayConnectionFactory(GatewayConnection singletonConnection) { - this.singletonConnection = singletonConnection; - } - - @Override - public GatewayConnection newConnection() { return singletonConnection; } - + private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds, + int advanceSeconds, + DryRunGatewayConnection connection, + OperationProcessorTester tester) { + tester.clock().advance(Duration.ofSeconds(advanceSeconds)); + tester.tick(1); + assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000); } } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java deleted file mode 100644 index 7bac8f9cd9d..00000000000 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.FeedClient; -import com.yahoo.vespa.http.client.FeedEndpointException; -import com.yahoo.vespa.http.client.ManualClock; -import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.EndpointResult; -import com.yahoo.vespa.http.client.core.ThrottlePolicy; -import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler; -import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; -import org.junit.Test; - -import java.time.Duration; -import java.time.Instant; -import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; - -/** - * TODO: Migrate IOThreadTests here. - * - * @author bratseth - */ -public class NewIOThreadTest { - - @Test - public void testBasics() { - OperationProcessorTester tester = new OperationProcessorTester(); - assertEquals(0, tester.inflight()); - assertEquals(0, tester.success()); - assertEquals(0, tester.failures()); - tester.send("doc1"); - tester.send("doc2"); - tester.send("doc3"); - assertEquals(3, tester.inflight()); - assertEquals(0, tester.success()); - assertEquals(0, tester.failures()); - tester.success("doc1"); - tester.success("doc2"); - tester.success("doc3"); - assertEquals(0, tester.inflight()); - assertEquals(3, tester.success()); - assertEquals(0, tester.failures()); - } - - @Test - public void testPollingOldConnections() { - OperationProcessorTester tester = new OperationProcessorTester(); - tester.tick(3); - - assertEquals(1, tester.clusterConnections().size()); - assertEquals(1, tester.clusterConnections().get(0).ioThreads().size()); - IOThread ioThread = tester.clusterConnections().get(0).ioThreads().get(0); - DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); - assertEquals(0, ioThread.oldConnections().size()); - - firstConnection.hold(true); - tester.send("doc1"); - tester.tick(1); - - tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15 - tester.tick(3); - - assertEquals(1, ioThread.oldConnections().size()); - assertEquals(firstConnection, ioThread.oldConnections().get(0)); - assertNotSame(firstConnection, ioThread.currentConnection()); - assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000); - - // Check old connection poll pattern (exponential backoff) - assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - - tester.clock().advance(Duration.ofSeconds(200)); - tester.tick(1); - assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size()); - } - - private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds, - int advanceSeconds, - DryRunGatewayConnection connection, - OperationProcessorTester tester) { - tester.clock().advance(Duration.ofSeconds(advanceSeconds)); - tester.tick(1); - assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000); - } - - private static class OperationProcessorTester { - - private final Endpoint endpoint; - private final int clusterId = 0; - private final ManualClock clock; - private final TestResultCallback resultCallback; - private final OperationProcessor operationProcessor; - - public OperationProcessorTester() { - endpoint = Endpoint.create("test-endpoint"); - SessionParams.Builder params = new SessionParams.Builder(); - Cluster.Builder clusterParams = new Cluster.Builder(); - clusterParams.addEndpoint(endpoint); - params.addCluster(clusterParams.build()); - ConnectionParams.Builder connectionParams = new ConnectionParams.Builder(); - connectionParams.setDryRun(true); - connectionParams.setRunThreads(false); - params.setConnectionParams(connectionParams.build()); - - clock = new ManualClock(Instant.ofEpochMilli(0)); - resultCallback = new TestResultCallback(); - operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()), - resultCallback, - params.build(), - new ScheduledThreadPoolExecutor(1), - clock); - } - - public ManualClock clock() { return clock; } - - /** Do n iteration of work in all io threads of this */ - public void tick(int n) { - for (int i = 0; i < n; i++) - for (ClusterConnection cluster : operationProcessor.clusters()) - for (IOThread thread : cluster.ioThreads()) - thread.tick(); - } - - public void send(String documentId) { - operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant())); - } - - public void success(String documentId) { - operationProcessor.resultReceived(new EndpointResult(documentId, new Result.Detail(endpoint)), clusterId); - } - - public int inflight() { - return operationProcessor.getIncompleteResultQueueSize(); - } - - public int success() { - return resultCallback.successes; - } - - public List<ClusterConnection> clusterConnections() { - return operationProcessor.clusters(); - } - - public int failures() { - return resultCallback.failures; - } - - } - - private static class TestResultCallback implements FeedClient.ResultCallback { - - private int successes = 0; - private int failures = 0; - - @Override - public void onCompletion(String docId, Result documentResult) { - successes++; - } - - @Override - public void onEndpointException(FeedEndpointException exception) { - failures++; - } - - - } - -} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java index ec929d68efb..9ea66b57fb3 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java @@ -12,11 +12,8 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Random; -import java.util.concurrent.atomic.AtomicLong; -import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.eq; @@ -28,14 +25,14 @@ public class IncompleteResultsThrottlerTest { @Test public void simpleStaticQueueSizeTest() { IncompleteResultsThrottler incompleteResultsThrottler = new IncompleteResultsThrottler(2, 2, null, null); - assertThat(incompleteResultsThrottler.waitingThreads(), is(0)); + assertEquals(0, incompleteResultsThrottler.waitingThreads()); incompleteResultsThrottler.operationStart(); incompleteResultsThrottler.operationStart(); - assertThat(incompleteResultsThrottler.waitingThreads(), is(2)); + assertEquals(2, incompleteResultsThrottler.waitingThreads()); incompleteResultsThrottler.resultReady(true); - assertThat(incompleteResultsThrottler.waitingThreads(), is(1)); + assertEquals(1, incompleteResultsThrottler.waitingThreads()); incompleteResultsThrottler.resultReady(true); - assertThat(incompleteResultsThrottler.waitingThreads(), is(0)); + assertEquals(0, incompleteResultsThrottler.waitingThreads()); } /** |