diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-09-01 15:34:41 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-09-01 15:34:41 +0200 |
commit | d6cce38048caa708eac01364f06510ba0e4172a1 (patch) | |
tree | b3a3e376240cc9f7d82189767bdaf46ce7f0c161 /vespa-http-client | |
parent | d4a63b0f6f20cfc86ab246029ed72d5f2e1c7ed8 (diff) |
Remove slow tests of deprecated Session
Diffstat (limited to 'vespa-http-client')
5 files changed, 0 insertions, 1627 deletions
diff --git a/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java index 909131a8979..6110c04e85f 100644 --- a/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java +++ b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java @@ -28,13 +28,11 @@ public class ExampleUsageFeedClientTest { Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - exampleCode("localhost", serverA.getPort(), "localhost", serverB.getPort()); serverA.close(); serverB.close(); } - private static CharSequence generateDocument(String docId) { // Just a dummy example of an update document operation. return "{\"update\": \""+ docId + "\"," diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java deleted file mode 100644 index 0813cb36078..00000000000 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java +++ /dev/null @@ -1,382 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler; -import org.junit.Test; - -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.time.Clock; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import static com.yahoo.vespa.http.client.TestUtils.writeDocument; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -/** - * - * @author Einar M R Rosenvinge - */ -@SuppressWarnings("deprecation") -public class QueueBoundsTest { - - public static final List<TestDocument> documents; - - static { - List<TestDocument> docs = new ArrayList<>(); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bobdylan/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n" + - " <title>Best of Bob Dylan</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/oleivars/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/oleivars/BestOf\">\n" + - " <title>Best of Ole Ivars</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf\">\n" + - " <title>Best of Bjarne Fritjofs</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/larryingvars/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/larryingvars/BestOf\">\n" + - " <title>Best of Larry Ingvars</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - documents = Collections.unmodifiableList(docs); - } - - @Test - public void requireThatFullInputQueueBlocksAndUnblocks() throws Exception { - V3MockParsingRequestHandler mockXmlParsingRequestHandler = new V3MockParsingRequestHandler(); - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION); - try (Server server = new Server(mockXmlParsingRequestHandler, 0); - Session session = - new com.yahoo.vespa.http.client.core.api.SessionImpl( - new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", server.getPort(), false)) - .build()) - .setFeedParams(new FeedParams.Builder() - .setMaxChunkSizeBytes(1) - .setMaxInFlightRequests(1) - .setLocalQueueTimeOut(40000) - .build()) - .setConnectionParams(new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .build()) - .setClientQueueSize(2) - .build(), - SessionFactory.createTimeoutExecutor(), - Clock.systemUTC())) { - FeederThread feeder = new FeederThread(session); - try { - feeder.start(); - assertFeedNotBlocking(feeder, 0); - assertThat(session.results().size(), is(0)); - assertFeedNotBlocking(feeder, 1); - assertThat(session.results().size(), is(0)); - CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 2); - assertThat(session.results().size(), is(0)); - - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK); - lastPostFeed.await(60, TimeUnit.SECONDS); - assertThat(lastPostFeed.getCount(), equalTo(0L)); - assertResultQueueSize(session, 3, 60, TimeUnit.SECONDS); - } finally { - feeder.stop(); - } - } - } - - @Test - public void requireThatFullIntermediateQueueBlocksAndUnblocks() throws Exception { - V3MockParsingRequestHandler slowHandler = - new V3MockParsingRequestHandler("B", HttpServletResponse.SC_OK, - V3MockParsingRequestHandler.Scenario.DELAYED_RESPONSE); - - try (Server serverA = new Server(new V3MockParsingRequestHandler("A"), 0); - Server serverB = new Server(slowHandler, 0); - com.yahoo.vespa.http.client.core.api.SessionImpl session = new com.yahoo.vespa.http.client.core.api.SessionImpl( - new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .setFeedParams(new FeedParams.Builder() - .setMaxChunkSizeBytes(1) - .build()) - .setConnectionParams(new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .build()) - .setClientQueueSize(6) //3 per cluster - .build(), SessionFactory.createTimeoutExecutor(), - Clock.systemUTC())) { - - FeederThread feeder = new FeederThread(session); - try { - feeder.start(); - assertFeedNotBlocking(feeder, 0); - assertFeedNotBlocking(feeder, 1); - assertFeedNotBlocking(feeder, 2); - - //A input queue size now: 3 - //B input queue size now: 3 - //feeder thread not blocked - //intermediate result queue size now: 3 - //result queue size now: 0 - assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS); - assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS); - - assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS); - assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS); - - //server B is slow, server A is fast - - - //A input queue size now: 0 - //B input queue size now: 2, IOThread writing 1 and blocked because of no response yet - //feeder thread still not blocked - //intermediate result queue size now: 3 - //result queue size now: 0 - assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS); - assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS); - - CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 3); - - //A input queue size now: 0 - //B input queue size now: 2, IOThread writing 1 and blocked because of no response yet - //feeder thread blocking with 1 op - //intermediate result queue size now: 3 - //result queue size now: 0 - - slowHandler.poke(); - - //A input queue size now: 0 - //B input queue size now: 2, IOThread writing 1 and blocked because of no response again - //feeder thread unblocked - //intermediate result queue size now: 3 - //result queue size now: 1 - - lastPostFeed.await(60, TimeUnit.SECONDS); - assertThat(lastPostFeed.getCount(), equalTo(0L)); - - slowHandler.pokeAllAndUnblockFromNowOn(); - - //A input queue size now: 0 - //B input queue size now: 0, IOThread not blocked - //feeder thread unblocked - //intermediate result queue size now: 0 - //result queue size now: 4 - - assertResultQueueSize(session, 4, 60, TimeUnit.SECONDS); - } finally { - slowHandler.pokeAllAndUnblockFromNowOn(); - feeder.stop(); - } - } - } - - @Test - public void testErrorRecovery() throws Exception { - V3MockParsingRequestHandler mockXmlParsingRequestHandler = new V3MockParsingRequestHandler(); - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION); - try (Server server = new Server(mockXmlParsingRequestHandler, 0); - Session session = - new com.yahoo.vespa.http.client.core.api.SessionImpl( - new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", server.getPort(), false)) - .build()) - .setFeedParams(new FeedParams.Builder() - .setMaxChunkSizeBytes(1) - .setMaxInFlightRequests(1) - .setLocalQueueTimeOut(3000) - .build()) - .setConnectionParams(new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .setClientQueueSize(1) - .build(), - SessionFactory.createTimeoutExecutor(), - Clock.systemUTC())) { - FeederThread feeder = new FeederThread(session); - feeder.start(); - try { - { - System.out.println("We start with failed connection, post a document."); - assertFeedNotBlocking(feeder, 0); - assertEquals(0, session.results().size()); - - CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 1); - System.out.println("No result so far."); - assertEquals(0, session.results().size()); - System.out.println("Make connection ok."); - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK); - assert(lastPostFeed.await(120, TimeUnit.SECONDS)); - assertEquals(0L, lastPostFeed.getCount()); - assertResultQueueSize(session, 2, 120, TimeUnit.SECONDS); - } - - System.out.println("Take down connection"); - - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION); - { - assertFeedNotBlocking(feeder, 2); - System.out.println("Fed one document, fit in queue."); - assertEquals(2, session.results().size()); - System.out.println("Fed one document more, wait for failure."); - - assertFeedNotBlocking(feeder, 3); - System.out.println("Wait for results for all three documents."); - while (session.results().size() != 3) { - Thread.sleep(1); - } - System.out.println("Back to ok, test feeding again."); - mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK); - assertResultQueueSize(session, 4, 120, TimeUnit.SECONDS); - } - int errors = 0; - for (Result result : session.results()) { - assertEquals(1, result.getDetails().size()); - if (! result.isSuccess()) { - errors++; - } - } - assertEquals(1, errors); - } finally { - feeder.stop(); - } - } - } - - private void assertResultQueueSize(Session session, int size, long timeout, TimeUnit timeUnit) throws InterruptedException { - long timeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit); - long waitTimeMs = 0; - while (true) { - if (session.results().size() == size) { - break; - } - Thread.sleep(100); - waitTimeMs += 100; - if (waitTimeMs > timeoutMs) { - fail("Queue never reached size " + size + " before timeout of " + timeout + " " + timeUnit + ". Size now: " + session.results().size()); - } - } - } - - private void assertIncompleteResultQueueSize(com.yahoo.vespa.http.client.core.api.SessionImpl session, int size, long timeout, TimeUnit timeUnit) throws InterruptedException { - long timeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit); - long waitTimeMs = 0; - while (true) { - if (session.getIncompleteResultQueueSize() == size) { - break; - } - Thread.sleep(100); - waitTimeMs += 100; - if (waitTimeMs > timeoutMs) { - fail("Queue of incomplete results never reached size " + size + " before timeout of " + timeout + " " + timeUnit + ". Size now: " + session.getIncompleteResultQueueSize()); - } - } - } - - private CountDownLatch assertFeedBlocking(FeederThread feeder, int idx) throws InterruptedException { - CountDownLatch preFeed = new CountDownLatch(1); - CountDownLatch postFeed = new CountDownLatch(1); - feeder.documents.add(new Triplet<>(preFeed, documents.get(idx), postFeed)); - preFeed.await(60, TimeUnit.SECONDS); - assertThat(preFeed.getCount(), equalTo(0L)); - postFeed.await(2, TimeUnit.SECONDS); - assertThat(feeder.getState(), not(equalTo(Thread.State.RUNNABLE))); - assertThat(postFeed.getCount(), equalTo(1L)); - return postFeed; - } - - private void assertFeedNotBlocking(FeederThread feeder, int idx) throws InterruptedException { - CountDownLatch preFeed = new CountDownLatch(1); - CountDownLatch postFeed = new CountDownLatch(1); - feeder.documents.add(new Triplet<>(preFeed, documents.get(idx), postFeed)); - preFeed.await(60, TimeUnit.SECONDS); - assertThat(preFeed.getCount(), equalTo(0L)); - postFeed.await(60, TimeUnit.SECONDS); - assertThat(postFeed.getCount(), equalTo(0L)); - } - - public static class Triplet<F, S, T> { - public final F first; - public final S second; - public final T third; - - public Triplet(final F first, final S second, final T third) { - this.first = first; - this.second = second; - this.third = third; - } - } - - private class FeederThread implements Runnable { - private final Session session; - private final BlockingQueue<Triplet<CountDownLatch, TestDocument, CountDownLatch>> documents = new LinkedBlockingQueue<>(); - private final Thread thread; - private volatile boolean shouldRun = true; - - FeederThread(Session session) { - this.session = session; - this.thread = new Thread(this); - } - - public void start() { - this.thread.start(); - } - - public void stop() { - shouldRun = false; - thread.interrupt(); - } - - public Thread.State getState() { - return thread.getState(); - } - - @Override - public void run() { - try { - while (shouldRun) { - Triplet<CountDownLatch, TestDocument, CountDownLatch> triplet = documents.poll(200, TimeUnit.MILLISECONDS); - if (triplet == null) { - continue; - } - triplet.first.countDown(); - writeDocument(session, triplet.second); - triplet.third.countDown(); - } - } catch (IOException ioe) { - ioe.printStackTrace(); - } catch (InterruptedException e) { - //ignore - } catch (RuntimeException re) { - if (re.getCause() == null || (!(re.getCause() instanceof InterruptedException))) { - re.printStackTrace(); - } - } - - } - - } -} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java deleted file mode 100644 index 5fbc367c620..00000000000 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java +++ /dev/null @@ -1,1042 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static com.yahoo.vespa.http.client.TestUtils.getResults; -import static com.yahoo.vespa.http.client.V3HttpAPITest.documents; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.assertThat; - -/** - * - * @author Einar M R Rosenvinge - */ -@SuppressWarnings("deprecation") -public class V3HttpAPIMultiClusterTest { - - private void writeDocuments(Session session) throws IOException { - TestUtils.writeDocuments(session, documents); - } - - private void writeDocument(Session session) throws IOException { - TestUtils.writeDocuments(session, Collections.<TestDocument>singletonList(documents.get(0))); - } - - private void waitForHandshakesOk(int handshakes, Session session) throws InterruptedException { - int waitedTimeMs = 0; - while (session.getStatsAsJson().split("\"successfullHandshakes\":1").length != 1+ handshakes) { - waitedTimeMs += 100; - assert(waitedTimeMs < 300000); - Thread.sleep(100); - } - } - - @Test - public void testOpenClose() throws Exception { - try (Server serverA = new Server( - new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Session session = SessionFactory.create(new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .build())) { - assertThat(session, notNullValue()); - } - } - - @Test - public void testPriorityAndTraceFlag() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.EXPECT_HIGHEST_PRIORITY_AND_TRACELEVEL_123), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .setConnectionParams(new ConnectionParams.Builder() - .setTraceLevel(123) - .build()) - .setFeedParams(new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setPriority("HIGHEST") - .build()) - .build())) { - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat("Results received: " + results.values(), results.size(), is(documents.size())); - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r.getDetails().toString(), r.isSuccess(), is(true)); - } - } - } - - @Test - public void testRetries() throws Exception { - V3MockParsingRequestHandler unstableServer = - new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK); - V3MockParsingRequestHandler failedServer = - new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK); - try (Server serverA = new Server(failedServer, 0); - Server serverB = new Server(unstableServer, 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build() - ) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build() - ) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(2) - .setMinTimeBetweenRetries(200, TimeUnit.MILLISECONDS) - .build() - ) - .build() - )) { - waitForHandshakesOk(2, session); - // Both servers worked fine so far, handshake established. Now fail transient when trying to send - // data on both. This should cause the OperationProcessor to retry the document. One of the server - // will come up, but not the other. - unstableServer.setScenario(V3MockParsingRequestHandler.Scenario.SERVER_ERROR_TWICE_THEN_OK); - // This will cause retries, but it will not come up. - failedServer.setScenario(V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED); - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat("Results received: " + results.values(), results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(2)); - assert(r.getDetails().get(0).getResultType() != r.getDetails().get(1).getResultType()); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatFeedingWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .build())) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat("Results received: " + results.values(), results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(true)); - assertThat(r.getDetails().size(), is(3)); - assertThat(r.getDetails().get(0).getTraceMessage(), is("Trace message")); - } - assertThat(results.isEmpty(), is(true)); - final String stats = session.getStatsAsJson(); - assertThat(stats, containsString("maxChunkSizeBytes\":51200")); - } - } - - @Test - public void requireThatOneImmediateDisconnectWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(1000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - - - @Test - public void requireThatAllImmediateDisconnectWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(1000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - - @Test - public void requireThatOneTimeoutWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.NEVER_RETURN_ANY_RESULTS), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(10000) - .setServerTimeout(2, TimeUnit.SECONDS) - .setClientTimeout(2, TimeUnit.SECONDS) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - waitForHandshakesOk(3, session); - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - //assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatOneWrongSessionIdWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1000) - .setServerTimeout(2, TimeUnit.SECONDS) - .setClientTimeout(2, TimeUnit.SECONDS) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - waitForHandshakesOk(2, session); - writeDocuments(session); - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatAllWrongSessionIdWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setServerTimeout(2, TimeUnit.SECONDS) - .setClientTimeout(2, TimeUnit.SECONDS) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - //assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatOneNonAcceptedVersionWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocument(session); - - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - Result.Detail failed = details.remove(Endpoint.create("localhost", serverC.getPort(), false)); - assertThat(failed.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - for (Result.Detail detail : details.values()) { - assertThat(detail.getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - } - } - } - - @Test - public void requireThatAllNonAcceptedVersionWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - //assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatOneUnexpectedVersionWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatAllUnexpectedVersionWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocument(session); - - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - for (Result.Detail detail : r.getDetails()) { - assertThat(detail.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - } - } - - @Test - public void requireThatOneInternalServerErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocument(session); - - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - Result.Detail failed = details.remove(Endpoint.create("localhost", serverC.getPort(), false)); - assertThat(failed.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - for (Result.Detail detail : details.values()) { - assertThat(detail.getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - } - } - } - - @Test - public void requireThatAllInternalServerErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(2000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .build())) { - - writeDocument(session); - - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - for (Result.Detail detail : r.getDetails()) { - assertThat(detail.getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - } - } - - @Test - public void requireThatOneCouldNotFeedErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatAllCouldNotFeedErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatOneMbusErrorWorks() throws Exception { - final V3MockParsingRequestHandler unstableServer = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR); - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0); - Server serverC = new Server(unstableServer, 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(100) - .build()) - .build())) { - - writeDocuments(session); - - // Make it fail, but it should still retry since it is a MBUS error that is transitive - // for the client even though fatal for message bus. - Thread.sleep(1000); - assertThat(session.results().size(), is(0)); - unstableServer.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK); - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(true)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatAllMbusErrorWorks() throws Exception { - try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0); - Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0); - Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build() - ) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .build() - ) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build() - ) - .build())) { - - writeDocuments(session); //cannot fail here, they are just enqueued - - Map<String, Result> results = getResults(session, documents.size()); - assertThat(results.size(), is(documents.size())); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(3)); - - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.FATAL_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.FATAL_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).getResultType(), is(Result.ResultType.FATAL_ERROR)); - } - assertThat(results.isEmpty(), is(true)); - } - } - - @Test - public void requireThatBadVipBehaviorDoesNotFailBadly() throws Exception { - V3MockParsingRequestHandler handlerA = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.BAD_REQUEST); - V3MockParsingRequestHandler handlerB = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.BAD_REQUEST); - - try (Server serverA = new Server(handlerA, 0); - Server serverB = new Server(handlerB, 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false)) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false)) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setMaxSleepTimeMs(0) - .setMaxChunkSizeBytes(1) - .setLocalQueueTimeOut(1000) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(1) - .build()) - .build())) { - - //Set A in bad state => A returns bad request. - handlerA.badRequestScenarioShouldReturnBadRequest.set(true); - - //write one document, should fail - writeDocument(session); - Map<String, Result> results = getResults(session, 1); - assertThat(results.size(), is(1)); - Result r = results.values().iterator().next(); - assertThat(r, not(nullValue())); - assertThat(r.getDetails().toString(), r.isSuccess(), is(false)); - assertThat(r.getDetails().size(), is(2)); - Map<Endpoint, Result.Detail> details = new HashMap<>(); - for (Result.Detail detail : r.getDetails()) { - details.put(detail.getEndpoint(), detail); - } - assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).getResultType(), is(Result.ResultType.TRANSITIVE_ERROR)); - assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - - - //Set B in bad state => B returns bad request. - handlerB.badRequestScenarioShouldReturnBadRequest.set(true); - } //try to close session - - } -} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java deleted file mode 100644 index 780de3e695c..00000000000 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler; -import org.junit.Test; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static com.yahoo.vespa.http.client.TestUtils.getResults; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * - * @author Einar M R Rosenvinge - */ -@SuppressWarnings("deprecation") -public class V3HttpAPITest { - - public static final List<TestDocument> documents; - - static { - List<TestDocument> docs = new ArrayList<>(); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bobdylan/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n" + - " <title>Best of Bob Dylan</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/oleivars/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/oleivars/BestOf\">\n" + - " <title>Best of Ole Ivars</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf", - ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf\">\n" + - " <title>Best of Bjarne Fritjofs</title>\n" + - "</document>\n").getBytes(StandardCharsets.UTF_8))); - documents = Collections.unmodifiableList(docs); - } - - private void writeDocuments(Session session) throws IOException { - TestUtils.writeDocuments(session, documents); - } - - private void writeDocument(Session session) throws IOException { - TestUtils.writeDocuments(session, Collections.<TestDocument>singletonList(documents.get(0))); - } - - private void testServerWithMock(V3MockParsingRequestHandler serverMock, boolean failFast, boolean conditionNotMet) throws Exception { - try (Server server = new Server(serverMock, 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(0) - .build()) - .setFeedParams(new FeedParams.Builder() - .setLocalQueueTimeOut(failFast ? 0 : 120000) - .setServerTimeout(120, TimeUnit.SECONDS) - .setClientTimeout(120, TimeUnit.SECONDS) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", server.getPort(), false)) - .build()) - .build())) { - - writeDocument(session); - Map<String, Result> results = getResults(session, 1); - assertEquals(1, results.size()); - - TestDocument document = documents.get(0); - Result r = results.remove(document.getDocumentId()); - assertNotNull(r); - if (conditionNotMet) - assertEquals(Result.ResultType.CONDITION_NOT_MET, r.getDetails().iterator().next().getResultType()); - assertFalse(r.getDetails().toString(), r.isSuccess()); - assertTrue(results.isEmpty()); - } - } - - @Test - public void testSingleDestination() throws Exception { - try (Server server = new Server(new V3MockParsingRequestHandler(), 0); - Session session = SessionFactory.create(Endpoint.create("localhost", server.getPort(), false))) { - - writeDocuments(session); - Map<String, Result> results = getResults(session, documents.size()); - assertEquals(documents.size(), results.size()); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertNotNull(r); - assertTrue(r.getDetails().toString(), r.isSuccess()); - } - assertTrue(results.isEmpty()); - } - } - - @Test - public void requireThatBadResponseCodeFails() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler(401/*Unauthorized*/), true, false); - testServerWithMock(new V3MockParsingRequestHandler(403/*Forbidden*/), true, false); - testServerWithMock(new V3MockParsingRequestHandler(407/*Proxy Authentication Required*/), true, false); - } - - @Test - public void requireThatUnexpectedVersionIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), true, false); - } - - @Test - public void requireThatNonAcceptedVersionIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), true, false); - } - - @Test - public void requireThatNon200OkIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), true, false); - } - - @Test - public void requireThatMbusErrorIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), false, false); - } - - @Test - public void requireThatTimeoutWorks() throws Exception { - try (Server server = new Server(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.NEVER_RETURN_ANY_RESULTS), 0); - Session session = SessionFactory.create( - new SessionParams.Builder() - .setFeedParams(new FeedParams.Builder() - .setLocalQueueTimeOut(0) - .build()) - .setConnectionParams( - new ConnectionParams.Builder() - .setNumPersistentConnectionsPerEndpoint(1) - .setMaxRetries(2) - .build()) - .setFeedParams( - new FeedParams.Builder() - .setServerTimeout(500, TimeUnit.MILLISECONDS) - .setClientTimeout(500, TimeUnit.MILLISECONDS) - .build()) - .addCluster( - new Cluster.Builder() - .addEndpoint(Endpoint.create("localhost", server.getPort(), false)) - .build()) - .build())) { - - writeDocuments(session); - - Map<String, Result> results = getResults(session, documents.size()); - assertEquals(documents.size(), results.size()); - - for (TestDocument document : documents) { - Result r = results.remove(document.getDocumentId()); - assertNotNull(r); - assertFalse(r.getDetails().toString(), r.isSuccess()); - assertEquals(Result.ResultType.TRANSITIVE_ERROR, r.getDetails().iterator().next().getResultType()); - } - assertTrue(results.isEmpty()); - } - } - - @Test - public void requireThatCouldNotFeedErrorIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), false, false); - } - - @Test - public void requireThatImmediateDisconnectIsHandledProperly() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), true, false); - } - @Test - public void testConditionNotMet() throws Exception { - testServerWithMock(new V3MockParsingRequestHandler( - 200, V3MockParsingRequestHandler.Scenario.CONDITON_NOT_MET), false, true); - } - -} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java index ec929d68efb..e4970fa28fe 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java @@ -12,7 +12,6 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Random; -import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; |