summaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/test
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /vespa-http-client/src/test
Publish
Diffstat (limited to 'vespa-http-client/src/test')
-rw-r--r--vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java86
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java84
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java377
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java40
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallbackTest.java84
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestDocument.java27
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestOnCiBuildingSystemOnly.java14
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestUtils.java60
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java1052
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java190
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/ClusterTest.java38
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/ConnectionParamsTest.java60
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/EndpointTest.java51
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/FeedParamsTest.java54
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java38
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/EncoderTestCase.java240
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/ThrottlePolicyTest.java76
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/XmlFeedReaderTest.java259
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java318
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStreamTest.java134
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java38
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java89
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottlerTest.java56
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java126
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlockerTest.java63
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java269
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java370
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/handlers/V3MockParsingRequestHandler.java391
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/CommandLineArgumentsTest.java144
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/JsonReaderTest.java272
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/RunnerTest.java35
-rw-r--r--vespa-http-client/src/test/resources/vespacorpfeed-prod-sample.xml187
-rw-r--r--vespa-http-client/src/test/resources/xml-challenge.xml6
-rw-r--r--vespa-http-client/src/test/resources/xml-challenge2.xml5
-rw-r--r--vespa-http-client/src/test/resources/xml-challenge3.xml4
35 files changed, 5337 insertions, 0 deletions
diff --git a/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java
new file mode 100644
index 00000000000..3af5e1d2435
--- /dev/null
+++ b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java
@@ -0,0 +1,86 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedClientFactory;
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.Server;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Unit test that test documentation code.
+ * @author dybdahl
+ */
+public class ExampleUsageFeedClientTest {
+
+ @Test
+ public void testExampleCode() {
+ Server serverA =
+ new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB =
+ new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+
+
+ exampleCode("localhost", serverA.getPort(), "localhost", serverB.getPort());
+ serverA.close();
+ serverB.close();
+ }
+
+
+ private static CharSequence generateDocument(String docId) {
+ // Just a dummy example of an update document operation.
+ return "{\"update\": \""+ docId + "\","
+ + " \"fields\": { \"actualMapStringToArrayOfInt\": {"
+ + " \"assign\": ["
+ + "{ \"key\": \"fooKey\", \"value\": [ 2,1, 3] }"
+ + "]}}}";
+ }
+
+ // Example usage of FeedClient
+ public static void exampleCode(String hostNameA, int portServerA, String hostNameB, int portServerB) {
+ final boolean useSsl = false;
+ final SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create(hostNameA, portServerA, useSsl)).build())
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create(hostNameB, portServerB, useSsl)).build())
+ .setFeedParams(new FeedParams.Builder()
+ .setDataFormat(FeedParams.DataFormat.JSON_UTF8)
+ .build())
+ .build();
+
+ final AtomicInteger resultsReceived = new AtomicInteger(0);
+ final AtomicInteger errorsReceived = new AtomicInteger(0);
+
+ FeedClient feedClient = FeedClientFactory.create(sessionParams, new FeedClient.ResultCallback() {
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ resultsReceived.incrementAndGet();
+ if (! documentResult.getContext().equals(docId)) {
+ System.err.println("Context does not work as expected.");
+ errorsReceived.incrementAndGet();
+ }
+ if (!documentResult.isSuccess()) {
+ System.err.println("Problems with docID " + docId + ":" + documentResult.toString());
+ errorsReceived.incrementAndGet();
+ }
+ }
+ });
+ int sentCounter = 0;
+ final List<String> docIds = Arrays.asList("1", "2", "3", "4");
+ for (final String docId : docIds) {
+ CharSequence docData = generateDocument(docId);
+ feedClient.stream(docId, docData, docId);
+ sentCounter++;
+ System.out.println("Sent " + sentCounter + " received results from " + resultsReceived.get());
+ }
+ feedClient.close();
+ System.out.println("Finished, got " + errorsReceived.get()
+ + " errors from " + resultsReceived.get() + " results, sent " + sentCounter + " documents.");
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java
new file mode 100644
index 00000000000..872bc4192b2
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java
@@ -0,0 +1,84 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import com.yahoo.vespa.http.client.core.api.FeedClientImpl;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the API, using dryrun option to mock gateway.
+ * @author dybdahl
+ */
+public class FeedClientTest {
+
+ private final static String DOCID = "doc_id";
+
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder()
+ .addEndpoint(Endpoint.create("hostname"))
+ .build())
+ .setConnectionParams(new ConnectionParams.Builder()
+ .setDryRun(true)
+ .build())
+ .build();
+ final AtomicInteger resultsReceived = new AtomicInteger(0);
+ FeedClient.ResultCallback resultCallback = (docId, documentResult) -> {
+ assert(documentResult.isSuccess());
+ assertThat(docId, is(DOCID));
+ resultsReceived.incrementAndGet();
+ };
+
+ FeedClient feedClient = new FeedClientImpl(sessionParams, resultCallback, SessionFactory.createTimeoutExecutor());
+
+ @Test
+ public void testStreamAndClose() throws Exception {
+ feedClient.stream(DOCID, "blob");
+ feedClient.close();
+ assertThat(resultsReceived.get(), is(1));
+ }
+
+ @Test
+ public void testGetStatsAsJson() throws Exception {
+ feedClient.stream(DOCID, "blob");
+ while (resultsReceived.get() == 0) {Thread.sleep(3); }
+ String stats = feedClient.getStatsAsJson();
+ assertTrue(stats.contains("\"dryRun\":true"));
+ feedClient.close();
+ }
+
+ @Test
+ public void testFeedJson() throws Exception {
+ InputStream stream = new ByteArrayInputStream((String.format("[{\"remove\": \"%s\"}]", DOCID)
+ .getBytes(StandardCharsets.UTF_8)));
+ AtomicInteger docCounter = new AtomicInteger(0);
+ FeedClient.feedJson(stream, feedClient, docCounter);
+ assertThat(docCounter.get(), is(1));
+ feedClient.close();
+ assertThat(resultsReceived.get(), is(1));
+ }
+
+ @Test
+ public void testFeedXml() throws Exception {
+ InputStream stream = new ByteArrayInputStream((String.format(
+ "<document documenttype=\"music\" documentid=\"%s\">\n</document>\n", DOCID)
+ .getBytes(StandardCharsets.UTF_8)));
+ AtomicInteger docCounter = new AtomicInteger(0);
+ FeedClient.feedXml(stream, feedClient, docCounter);
+ assertThat(docCounter.get(), is(1));
+ feedClient.close();
+ assertThat(resultsReceived.get(), is(1));
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
new file mode 100644
index 00000000000..270483c2fc7
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
@@ -0,0 +1,377 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import com.yahoo.vespa.http.client.core.api.SessionImpl;
+import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler;
+import org.junit.Test;
+
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static com.yahoo.vespa.http.client.TestUtils.writeDocument;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Only runs on screwdriver to save time!
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.29
+ */
+public class QueueBoundsTest extends TestOnCiBuildingSystemOnly {
+
+ public static final List<TestDocument> documents;
+ static {
+ List<TestDocument> docs = new ArrayList<>();
+ docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bobdylan/BestOf",
+ ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n" +
+ " <title>Best of Bob Dylan</title>\n" +
+ "</document>\n").getBytes(StandardCharsets.UTF_8)));
+ docs.add(new TestDocument("id:music:music::http://music.yahoo.com/oleivars/BestOf",
+ ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/oleivars/BestOf\">\n" +
+ " <title>Best of Ole Ivars</title>\n" +
+ "</document>\n").getBytes(StandardCharsets.UTF_8)));
+ docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf",
+ ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf\">\n" +
+ " <title>Best of Bjarne Fritjofs</title>\n" +
+ "</document>\n").getBytes(StandardCharsets.UTF_8)));
+ docs.add(new TestDocument("id:music:music::http://music.yahoo.com/larryingvars/BestOf",
+ ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/larryingvars/BestOf\">\n" +
+ " <title>Best of Larry Ingvars</title>\n" +
+ "</document>\n").getBytes(StandardCharsets.UTF_8)));
+ documents = Collections.unmodifiableList(docs);
+ }
+
+ @Test
+ public void requireThatFullInputQueueBlocksAndUnblocks() throws Exception {
+ V3MockParsingRequestHandler mockXmlParsingRequestHandler = new V3MockParsingRequestHandler();
+ mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION);
+ try (Server server = new Server(mockXmlParsingRequestHandler, 0);
+ Session session =
+ new SessionImpl(
+ new SessionParams.Builder()
+ .addCluster(new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", server.getPort(), false))
+ .build())
+ .setFeedParams(new FeedParams.Builder()
+ .setMaxChunkSizeBytes(1)
+ .setMaxInFlightRequests(1)
+ .setLocalQueueTimeOut(40000)
+ .build())
+ .setConnectionParams(new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .build())
+ .setClientQueueSize(2)
+ .build(),
+ SessionFactory.createTimeoutExecutor())) {
+ FeederThread feeder = new FeederThread(session);
+ try {
+ feeder.start();
+ assertFeedNotBlocking(feeder, 0);
+ assertThat(session.results().size(), is(0));
+ assertFeedNotBlocking(feeder, 1);
+ assertThat(session.results().size(), is(0));
+ CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 2);
+ assertThat(session.results().size(), is(0));
+
+ mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK);
+ lastPostFeed.await(60, TimeUnit.SECONDS);
+ assertThat(lastPostFeed.getCount(), equalTo(0L));
+ assertResultQueueSize(session, 3, 60, TimeUnit.SECONDS);
+ } finally {
+ feeder.stop();
+ }
+ }
+ }
+
+ @Test
+ public void requireThatFullIntermediateQueueBlocksAndUnblocks() throws Exception {
+ V3MockParsingRequestHandler slowHandler =
+ new V3MockParsingRequestHandler("B", HttpServletResponse.SC_OK,
+ V3MockParsingRequestHandler.Scenario.DELAYED_RESPONSE);
+
+ try (Server serverA = new Server(new V3MockParsingRequestHandler("A"), 0);
+ Server serverB = new Server(slowHandler, 0);
+ SessionImpl session = new SessionImpl(
+ new SessionParams.Builder()
+ .addCluster(new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .setFeedParams(new FeedParams.Builder()
+ .setMaxChunkSizeBytes(1)
+ .build())
+ .setConnectionParams(new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .build())
+ .setClientQueueSize(6) //3 per cluster
+ .build(), SessionFactory.createTimeoutExecutor())) {
+
+ FeederThread feeder = new FeederThread(session);
+ try {
+ feeder.start();
+ assertFeedNotBlocking(feeder, 0);
+ assertFeedNotBlocking(feeder, 1);
+ assertFeedNotBlocking(feeder, 2);
+
+ //A input queue size now: 3
+ //B input queue size now: 3
+ //feeder thread not blocked
+ //intermediate result queue size now: 3
+ //result queue size now: 0
+ assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS);
+ assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS);
+
+ assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS);
+ assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS);
+
+ //server B is slow, server A is fast
+
+
+ //A input queue size now: 0
+ //B input queue size now: 2, IOThread writing 1 and blocked because of no response yet
+ //feeder thread still not blocked
+ //intermediate result queue size now: 3
+ //result queue size now: 0
+ assertResultQueueSize(session, 0, 60, TimeUnit.SECONDS);
+ assertIncompleteResultQueueSize(session, 3, 60, TimeUnit.SECONDS);
+
+ CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 3);
+
+ //A input queue size now: 0
+ //B input queue size now: 2, IOThread writing 1 and blocked because of no response yet
+ //feeder thread blocking with 1 op
+ //intermediate result queue size now: 3
+ //result queue size now: 0
+
+ slowHandler.poke();
+
+ //A input queue size now: 0
+ //B input queue size now: 2, IOThread writing 1 and blocked because of no response again
+ //feeder thread unblocked
+ //intermediate result queue size now: 3
+ //result queue size now: 1
+
+ lastPostFeed.await(60, TimeUnit.SECONDS);
+ assertThat(lastPostFeed.getCount(), equalTo(0L));
+
+ slowHandler.pokeAllAndUnblockFromNowOn();
+
+ //A input queue size now: 0
+ //B input queue size now: 0, IOThread not blocked
+ //feeder thread unblocked
+ //intermediate result queue size now: 0
+ //result queue size now: 4
+
+ assertResultQueueSize(session, 4, 60, TimeUnit.SECONDS);
+ } finally {
+ slowHandler.pokeAllAndUnblockFromNowOn();
+ feeder.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testErrorRecovery() throws Exception {
+ V3MockParsingRequestHandler mockXmlParsingRequestHandler = new V3MockParsingRequestHandler();
+ mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION);
+ try (Server server = new Server(mockXmlParsingRequestHandler, 0);
+ Session session =
+ new SessionImpl(
+ new SessionParams.Builder()
+ .addCluster(new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", server.getPort(), false))
+ .build())
+ .setFeedParams(new FeedParams.Builder()
+ .setMaxChunkSizeBytes(1)
+ .setMaxInFlightRequests(1)
+ .setLocalQueueTimeOut(3000)
+ .build())
+ .setConnectionParams(new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(1)
+ .build())
+ .setClientQueueSize(1)
+ .build(),
+ SessionFactory.createTimeoutExecutor())) {
+ FeederThread feeder = new FeederThread(session);
+ feeder.start();
+ try {
+ {
+ System.out.println("We start with failed connection, post a document.");
+ assertFeedNotBlocking(feeder, 0);
+ assertThat(session.results().size(), is(0));
+
+ CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 1);
+ System.out.println("No result so far.");
+ assertThat(session.results().size(), is(0));
+ System.out.println("Make connection ok.");
+ mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK);
+ assert(lastPostFeed.await(120, TimeUnit.SECONDS));
+ assertThat(lastPostFeed.getCount(), equalTo(0L));
+ assertResultQueueSize(session, 2, 120, TimeUnit.SECONDS);
+ }
+
+ System.out.println("Take down connection");
+
+ mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION);
+ {
+ assertFeedNotBlocking(feeder, 2);
+ System.out.println("Fed one document, fit in queue.");
+ assertThat(session.results().size(), is(2));
+ System.out.println("Fed one document more, wait for failure.");
+
+ assertFeedNotBlocking(feeder, 3);
+ System.out.println("Wait for results for all three documents.");
+ while (session.results().size() != 3) {
+ Thread.sleep(1);
+ }
+ System.out.println("Back to ok, test feeding again.");
+ mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK);
+ assertResultQueueSize(session, 4, 120, TimeUnit.SECONDS);
+ }
+ int errors = 0;
+ for (Result result : session.results()) {
+ assertThat(result.getDetails().size(), is(1));
+ if (! result.isSuccess()) {
+ errors++;
+ }
+ }
+ assertThat(errors, is(1));
+ } finally {
+ feeder.stop();
+ }
+ }
+ }
+
+ private void assertResultQueueSize(Session session, int size, long timeout, TimeUnit timeUnit) throws InterruptedException {
+ long timeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+ long waitTimeMs = 0;
+ while (true) {
+ if (session.results().size() == size) {
+ break;
+ }
+ Thread.sleep(100);
+ waitTimeMs += 100;
+ if (waitTimeMs > timeoutMs) {
+ fail("Queue never reached size " + size + " before timeout of " + timeout + " " + timeUnit + ". Size now: " + session.results().size());
+ }
+ }
+ }
+
+ private void assertIncompleteResultQueueSize(SessionImpl session, int size, long timeout, TimeUnit timeUnit) throws InterruptedException {
+ long timeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+ long waitTimeMs = 0;
+ while (true) {
+ if (session.getIncompleteResultQueueSize() == size) {
+ break;
+ }
+ Thread.sleep(100);
+ waitTimeMs += 100;
+ if (waitTimeMs > timeoutMs) {
+ fail("Queue of incomplete results never reached size " + size + " before timeout of " + timeout + " " + timeUnit + ". Size now: " + session.getIncompleteResultQueueSize());
+ }
+ }
+ }
+
+ private CountDownLatch assertFeedBlocking(FeederThread feeder, int idx) throws InterruptedException {
+ CountDownLatch preFeed = new CountDownLatch(1);
+ CountDownLatch postFeed = new CountDownLatch(1);
+ feeder.documents.add(new Triplet<>(preFeed, documents.get(idx), postFeed));
+ preFeed.await(60, TimeUnit.SECONDS);
+ assertThat(preFeed.getCount(), equalTo(0L));
+ postFeed.await(2, TimeUnit.SECONDS);
+ assertThat(feeder.getState(), not(equalTo(Thread.State.RUNNABLE)));
+ assertThat(postFeed.getCount(), equalTo(1L));
+ return postFeed;
+ }
+
+ private void assertFeedNotBlocking(FeederThread feeder, int idx) throws InterruptedException {
+ CountDownLatch preFeed = new CountDownLatch(1);
+ CountDownLatch postFeed = new CountDownLatch(1);
+ feeder.documents.add(new Triplet<>(preFeed, documents.get(idx), postFeed));
+ preFeed.await(60, TimeUnit.SECONDS);
+ assertThat(preFeed.getCount(), equalTo(0L));
+ postFeed.await(60, TimeUnit.SECONDS);
+ assertThat(postFeed.getCount(), equalTo(0L));
+ }
+
+ public static class Triplet<F, S, T> {
+ public final F first;
+ public final S second;
+ public final T third;
+
+ public Triplet(final F first, final S second, final T third) {
+ this.first = first;
+ this.second = second;
+ this.third = third;
+ }
+ }
+
+ private class FeederThread implements Runnable {
+ private final Session session;
+ private final BlockingQueue<Triplet<CountDownLatch, TestDocument, CountDownLatch>> documents = new LinkedBlockingQueue<>();
+ private final Thread thread;
+ private volatile boolean shouldRun = true;
+
+ FeederThread(Session session) {
+ this.session = session;
+ this.thread = new Thread(this);
+ }
+
+ public void start() {
+ this.thread.start();
+ }
+
+ public void stop() {
+ shouldRun = false;
+ thread.interrupt();
+ }
+
+ public Thread.State getState() {
+ return thread.getState();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (shouldRun) {
+ Triplet<CountDownLatch, TestDocument, CountDownLatch> triplet = documents.poll(200, TimeUnit.MILLISECONDS);
+ if (triplet == null) {
+ continue;
+ }
+ triplet.first.countDown();
+ writeDocument(session, triplet.second);
+ triplet.third.countDown();
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ } catch (InterruptedException e) {
+ //ignore
+ } catch (RuntimeException re) {
+ if (re.getCause() == null || (!(re.getCause() instanceof InterruptedException))) {
+ re.printStackTrace();
+ }
+ }
+
+ }
+
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java
new file mode 100644
index 00000000000..5faf5c8a029
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java
@@ -0,0 +1,40 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.20
+ */
+public final class Server implements AutoCloseable {
+
+ private final org.eclipse.jetty.server.Server server;
+
+ public Server(AbstractHandler handler, int port) {
+ this.server = new org.eclipse.jetty.server.Server(port);
+ server.setHandler(handler);
+ try {
+ server.start();
+ assert(server.isStarted());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws RuntimeException {
+ try {
+ server.stop();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("jetty server.stop() failed", e);
+ }
+ }
+
+ public int getPort() {
+ return ((ServerConnector)server.getConnectors()[0]).getLocalPort();
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallbackTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallbackTest.java
new file mode 100644
index 00000000000..e2145497589
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallbackTest.java
@@ -0,0 +1,84 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SimpleLoggerResultCallbackTest {
+ @Test
+ public void testAverageCalculation() {
+ SimpleLoggerResultCallback logger = new SimpleLoggerResultCallback(new AtomicInteger(3), 0);
+ Instant now = Instant.now();
+ logger.newSamplingPeriod(now);
+ Result result = mock(Result.class);
+ when(result.isSuccess()).thenReturn(true);
+ // 3 documents in 0.2 secs --> 15 docs/sec
+ logger.onCompletion("1", result);
+ logger.onCompletion("1", result);
+ logger.onCompletion("1", result);
+ double rate = logger.newSamplingPeriod(now.plusMillis(200)).rate;
+ assertEquals(rate, 15., 0.1 /* delta */);
+ }
+
+ @Test
+ public void testAverageCalculationExteremeValues() {
+ SimpleLoggerResultCallback logger = new SimpleLoggerResultCallback(new AtomicInteger(3), 0);
+ Instant now = Instant.now();
+ logger.newSamplingPeriod(now);
+ // 0 duration, 0 documents
+ double rate = logger.newSamplingPeriod(now).rate;
+ assertEquals(rate, 0, 0.1 /* delta */);
+ }
+
+ @Test
+ public void testOutput() {
+ SimpleLoggerResultCallback logger = new SimpleLoggerResultCallback(new AtomicInteger(3), 0);
+ Instant now = Instant.now();
+ logger.newSamplingPeriod(now);
+ Result result = mock(Result.class);
+ when(result.isSuccess()).thenReturn(true);
+ // 3 documents in 0.2 secs --> 15 docs/sec
+ logger.onCompletion("1", result);
+ logger.onCompletion("1", result);
+ logger.onCompletion("1", result);
+ double rate = logger.newSamplingPeriod(now.plusMillis(200)).rate;
+ assertEquals(rate, 15., 0.1 /* delta */);
+ }
+
+ @Test
+ public void testPrintout() {
+ ArrayList<String> outputList = new ArrayList<>();
+
+ SimpleLoggerResultCallback logger = new SimpleLoggerResultCallback(new AtomicInteger(30), 0) {
+ @Override
+ protected void println(String output) {
+ outputList.add(output);
+ }
+ @Override
+ protected DocumentRate newSamplingPeriod(Instant now) {
+ return new DocumentRate(19999999.2342342366664);
+ }
+ };
+ // 2 success, 1 failure
+ Result result = mock(Result.class);
+ when(result.isSuccess()).thenReturn(true);
+ logger.onCompletion("1", result);
+ logger.onCompletion("1", result);
+ when(result.isSuccess()).thenReturn(false);
+ when(result.toString()).thenReturn("fooError");
+ logger.onCompletion("1", result);
+ logger.printProgress();
+ assertThat(outputList.toString(),
+ containsString("Result received: 3 (1 failed so far, 30 sent, success rate 19999999.23 docs/sec)."));
+ assertThat(outputList.toString(), containsString("Failure: fooError"));
+ }
+
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestDocument.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestDocument.java
new file mode 100644
index 00000000000..adeba926d35
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestDocument.java
@@ -0,0 +1,27 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import net.jcip.annotations.Immutable;
+
+/**
+* @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+* @since 5.1.20
+*/
+@Immutable
+public class TestDocument {
+ private final String documentId;
+ private final byte[] contents;
+
+ TestDocument(String documentId, byte[] contents) {
+ this.documentId = documentId;
+ this.contents = contents;
+ }
+
+ public String getDocumentId() {
+ return documentId;
+ }
+
+ public byte[] getContents() {
+ return contents;
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestOnCiBuildingSystemOnly.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestOnCiBuildingSystemOnly.java
new file mode 100644
index 00000000000..dcd18179de9
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestOnCiBuildingSystemOnly.java
@@ -0,0 +1,14 @@
+package com.yahoo.vespa.http.client;
+
+import org.junit.Before;
+
+
+public class TestOnCiBuildingSystemOnly {
+ private static final String CI_BUILD_SYSTEN_ENV = "CI";
+ private static final String CI_BUILD_SYSTEM_ENABLED_VALUE = "true";
+
+ @Before
+ public void beforeMethod() {
+ org.junit.Assume.assumeTrue(CI_BUILD_SYSTEM_ENABLED_VALUE.equals(System.getenv(CI_BUILD_SYSTEN_ENV)));
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestUtils.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestUtils.java
new file mode 100644
index 00000000000..9e7c53e8133
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/TestUtils.java
@@ -0,0 +1,60 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
+
+import static org.junit.Assert.assertNull;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.20
+ */
+public class TestUtils {
+ public static void writeDocuments(Session session, List<TestDocument> documents) throws IOException {
+ for (TestDocument document : documents) {
+ writeDocument(session, document);
+ }
+ }
+
+ public static void writeDocument(Session session, TestDocument document) throws IOException {
+ OutputStream operation = session.stream(document.getDocumentId());
+ operation.write(document.getContents());
+ operation.close();
+ }
+
+ public static Map<String, Result> getResults(Session session, int num) throws InterruptedException {
+ Map<String, Result> results = new HashMap<>();
+ for (int i = 0; i < num; i++) {
+ Result r = session.results().poll(120, TimeUnit.SECONDS);
+ if (r == null) {
+ String extraInfo = "";
+ extraInfo = "stats=" + session.getStatsAsJson();
+ throw new IllegalStateException("Did not receive result within timeout. (" + extraInfo + ") " +
+ "Results received: " + results.values());
+ }
+ results.put(r.getDocumentId(), r);
+ }
+ assertNull(session.results().poll(100, TimeUnit.MILLISECONDS));
+ return results;
+ }
+
+ public static String zipStreamToString(InputStream inputStream) throws IOException {
+ GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
+ final StringBuilder rawContent = new StringBuilder();
+ while (true) {
+ int x = gzipInputStream.read();
+ if (x < 0) {
+ break;
+ }
+ rawContent.append((char) x);
+ }
+ return rawContent.toString();
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java
new file mode 100644
index 00000000000..e401fe5cfe6
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPIMultiClusterTest.java
@@ -0,0 +1,1052 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.yahoo.vespa.http.client.TestUtils.getResults;
+import static com.yahoo.vespa.http.client.V3HttpAPITest.documents;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Only runs on screwdriver to save time!
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.27
+ */
+public class V3HttpAPIMultiClusterTest extends TestOnCiBuildingSystemOnly {
+
+ private void writeDocuments(Session session) throws IOException {
+ TestUtils.writeDocuments(session, documents);
+ }
+
+ private void writeDocument(Session session) throws IOException {
+ TestUtils.writeDocuments(session, Collections.<TestDocument>singletonList(documents.get(0)));
+ }
+
+ private void waitForHandshakesOk(int handshakes, Session session) throws InterruptedException {
+ int waitedTimeMs = 0;
+ while (session.getStatsAsJson().split("\"successfullHandshakes\":1").length != 1+ handshakes) {
+ waitedTimeMs += 100;
+ assert(waitedTimeMs < 300000);
+ Thread.sleep(100);
+ }
+ }
+
+ @Test
+ public void testOpenClose() throws Exception {
+ try (Server serverA = new Server(
+ new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Session session = SessionFactory.create(new SessionParams.Builder()
+ .addCluster(new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .build())) {
+ assertThat(session, notNullValue());
+ }
+ }
+
+ @Test
+ public void testPriorityAndTraceFlag() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(
+ 200, V3MockParsingRequestHandler.Scenario.EXPECT_HIGHEST_PRIORITY_AND_TRACELEVEL_123), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .setConnectionParams(new ConnectionParams.Builder()
+ .setTraceLevel(123)
+ .build())
+ .setFeedParams(new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setPriority("HIGHEST")
+ .build())
+ .build())) {
+ writeDocuments(session);
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat("Results received: " + results.values(), results.size(), is(documents.size()));
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(true));
+ }
+ }
+ }
+
+ @Test
+ public void testRetries() throws Exception {
+ V3MockParsingRequestHandler unstableServer =
+ new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK);
+ V3MockParsingRequestHandler failedServer =
+ new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK);
+ try (Server serverA = new Server(failedServer, 0);
+ Server serverB = new Server(unstableServer, 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build()
+ )
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build()
+ )
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(2)
+ .setMinTimeBetweenRetries(200, TimeUnit.MILLISECONDS)
+ .build()
+ )
+ .build()
+ )) {
+ waitForHandshakesOk(2, session);
+ // Both servers worked fine so far, handshake established. Now fail transient when trying to send
+ // data on both. This should cause the OperationProcessor to retry the document. One of the server
+ // will come up, but not the other.
+ unstableServer.setScenario(V3MockParsingRequestHandler.Scenario.SERVER_ERROR_TWICE_THEN_OK);
+ // This will cause retries, but it will not come up.
+ failedServer.setScenario(V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED);
+ writeDocuments(session);
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat("Results received: " + results.values(), results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(2));
+ // One of the details should be true and one false.
+ assertThat(
+ r.getDetails().toString(),
+ r.getDetails().get(0).isSuccess() ^ r.getDetails().get(1).isSuccess(),
+ is(true));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatFeedingWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .build())) {
+
+ writeDocuments(session);
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat("Results received: " + results.values(), results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(true));
+ assertThat(r.getDetails().size(), is(3));
+ assertThat(r.getDetails().get(0).getTraceMessage(), is("Trace message"));
+ }
+ assertThat(results.isEmpty(), is(true));
+ final String stats = session.getStatsAsJson();
+ assertThat(stats, containsString("maxChunkSizeBytes\":51200"));
+ }
+ }
+
+ @Test
+ public void requireThatOneImmediateDisconnectWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setLocalQueueTimeOut(1000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(1)
+ .build())
+ .build())) {
+
+ writeDocuments(session); //cannot fail here, they are just enqueued
+
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+
+
+ @Test
+ public void requireThatAllImmediateDisconnectWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setLocalQueueTimeOut(1000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(1)
+ .build())
+ .build())) {
+
+ writeDocuments(session); //cannot fail here, they are just enqueued
+
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+
+ @Test
+ public void requireThatOneTimeoutWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.NEVER_RETURN_ANY_RESULTS), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(10000)
+ .setServerTimeout(2, TimeUnit.SECONDS)
+ .setClientTimeout(2, TimeUnit.SECONDS)
+ .setLocalQueueTimeOut(2000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(1)
+ .build())
+ .build())) {
+
+ waitForHandshakesOk(3, session);
+ writeDocuments(session);
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ //assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isTransient(), is(true));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatOneWrongSessionIdWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1000)
+ .setServerTimeout(2, TimeUnit.SECONDS)
+ .setClientTimeout(2, TimeUnit.SECONDS)
+ .setLocalQueueTimeOut(2000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(0)
+ .build())
+ .build())) {
+
+ waitForHandshakesOk(2, session);
+ writeDocuments(session);
+
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatAllWrongSessionIdWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_WRONG_SESSION_ID), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setServerTimeout(2, TimeUnit.SECONDS)
+ .setClientTimeout(2, TimeUnit.SECONDS)
+ .setLocalQueueTimeOut(2000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(0)
+ .build())
+ .build())) {
+
+ writeDocuments(session);
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ //assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatOneNonAcceptedVersionWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setLocalQueueTimeOut(2000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(0)
+ .build())
+ .build())) {
+
+ writeDocument(session);
+
+ Map<String, Result> results = getResults(session, 1);
+ assertThat(results.size(), is(1));
+ Result r = results.values().iterator().next();
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ Result.Detail failed = details.remove(Endpoint.create("localhost", serverC.getPort(), false));
+ assertThat(failed.toString(), failed.isSuccess(), is(false));
+ assertThat(failed.toString(), failed.isTransient(), is(true));
+ for (Result.Detail detail : details.values()) {
+ assertThat(detail.toString(), detail.isSuccess(), is(true));
+ assertThat(detail.toString(), detail.isTransient(), is(true));
+ }
+ }
+ }
+
+ @Test
+ public void requireThatAllNonAcceptedVersionWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setLocalQueueTimeOut(2000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(1)
+ .build())
+ .build())) {
+
+ writeDocuments(session); //cannot fail here, they are just enqueued
+
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ //assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatOneUnexpectedVersionWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setLocalQueueTimeOut(2000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(0)
+ .build())
+ .build())) {
+
+ writeDocuments(session); //cannot fail here, they are just enqueued
+
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatAllUnexpectedVersionWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setLocalQueueTimeOut(2000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(0)
+ .build())
+ .build())) {
+
+ writeDocument(session);
+
+ Map<String, Result> results = getResults(session, 1);
+ assertThat(results.size(), is(1));
+ Result r = results.values().iterator().next();
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ for (Result.Detail detail : r.getDetails()) {
+ assertThat(detail.toString(), detail.isSuccess(), is(false));
+ assertThat(detail.toString(), detail.isTransient(), is(true));
+ }
+ }
+ }
+
+ @Test
+ public void requireThatOneInternalServerErrorWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setLocalQueueTimeOut(2000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(0)
+ .build())
+ .build())) {
+
+ writeDocument(session);
+
+ Map<String, Result> results = getResults(session, 1);
+ assertThat(results.size(), is(1));
+ Result r = results.values().iterator().next();
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ Result.Detail failed = details.remove(Endpoint.create("localhost", serverC.getPort(), false));
+ assertThat(failed.toString(), failed.isSuccess(), is(false));
+ assertThat(failed.toString(), failed.isTransient(), is(true));
+ for (Result.Detail detail : details.values()) {
+ assertThat(detail.toString(), detail.isSuccess(), is(true));
+ assertThat(detail.toString(), detail.isTransient(), is(true));
+ }
+ }
+ }
+
+ @Test
+ public void requireThatAllInternalServerErrorWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setLocalQueueTimeOut(2000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(0)
+ .build())
+ .build())) {
+
+ writeDocument(session);
+
+ Map<String, Result> results = getResults(session, 1);
+ assertThat(results.size(), is(1));
+ Result r = results.values().iterator().next();
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ for (Result.Detail detail : r.getDetails()) {
+ assertThat(detail.toString(), detail.isSuccess(), is(false));
+ assertThat(detail.toString(), detail.isTransient(), is(true));
+ }
+ }
+ }
+
+ @Test
+ public void requireThatOneCouldNotFeedErrorWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(1)
+ .build())
+ .build())) {
+
+ writeDocuments(session);
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatAllCouldNotFeedErrorWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(1)
+ .build())
+ .build())) {
+
+ writeDocuments(session);
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatOneMbusErrorWorks() throws Exception {
+ final V3MockParsingRequestHandler unstableServer = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR);
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.ALL_OK), 0);
+ Server serverC = new Server(unstableServer, 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(100)
+ .build())
+ .build())) {
+
+ writeDocuments(session);
+
+ // Make it fail, but it should still retry since it is a MBUS error that is transitive
+ // for the client even though fatal for message bus.
+ Thread.sleep(1000);
+ assertThat(session.results().size(), is(0));
+ unstableServer.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK);
+
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(true));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(true));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(true));
+
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatAllMbusErrorWorks() throws Exception {
+ try (Server serverA = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0);
+ Server serverB = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0);
+ Server serverC = new Server(new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build()
+ )
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverC.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .build()
+ )
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(0)
+ .build()
+ )
+ .build())) {
+
+ writeDocuments(session); //cannot fail here, they are just enqueued
+
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(3));
+
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverC.getPort(), false)).isSuccess(), is(false));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatBadVipBehaviorDoesNotFailBadly() throws Exception {
+ V3MockParsingRequestHandler handlerA = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.BAD_REQUEST);
+ V3MockParsingRequestHandler handlerB = new V3MockParsingRequestHandler(200, V3MockParsingRequestHandler.Scenario.BAD_REQUEST);
+
+ try (Server serverA = new Server(handlerA, 0);
+ Server serverB = new Server(handlerB, 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverA.getPort(), false))
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", serverB.getPort(), false))
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setMaxSleepTimeMs(0)
+ .setMaxChunkSizeBytes(1)
+ .setLocalQueueTimeOut(1000)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(1)
+ .build())
+ .build())) {
+
+ //Set A in bad state => A returns bad request.
+ handlerA.badRequestScenarioShouldReturnBadRequest.set(true);
+
+ //write one document, should fail
+ writeDocument(session);
+ Map<String, Result> results = getResults(session, 1);
+ assertThat(results.size(), is(1));
+ Result r = results.values().iterator().next();
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.getDetails().size(), is(2));
+ Map<Endpoint, Result.Detail> details = new HashMap<>();
+ for (Result.Detail detail : r.getDetails()) {
+ details.put(detail.getEndpoint(), detail);
+ }
+ assertThat(details.get(Endpoint.create("localhost", serverA.getPort(), false)).isSuccess(), is(false));
+ assertThat(details.get(Endpoint.create("localhost", serverB.getPort(), false)).isSuccess(), is(true));
+
+
+ //Set B in bad state => B returns bad request.
+ handlerB.badRequestScenarioShouldReturnBadRequest.set(true);
+ } //try to close session
+
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
new file mode 100644
index 00000000000..bd894e84ed4
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
@@ -0,0 +1,190 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import com.yahoo.vespa.http.client.handlers.V3MockParsingRequestHandler;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.yahoo.vespa.http.client.TestUtils.getResults;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Only runs on screwdriver to save time!
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.27
+ */
+public class V3HttpAPITest extends TestOnCiBuildingSystemOnly {
+
+ public static final List<TestDocument> documents;
+
+ static {
+ List<TestDocument> docs = new ArrayList<>();
+ docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bobdylan/BestOf",
+ ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n" +
+ " <title>Best of Bob Dylan</title>\n" +
+ "</document>\n").getBytes(StandardCharsets.UTF_8)));
+ docs.add(new TestDocument("id:music:music::http://music.yahoo.com/oleivars/BestOf",
+ ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/oleivars/BestOf\">\n" +
+ " <title>Best of Ole Ivars</title>\n" +
+ "</document>\n").getBytes(StandardCharsets.UTF_8)));
+ docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf",
+ ("<document documenttype=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bjarnefritjofs/BestOf\">\n" +
+ " <title>Best of Bjarne Fritjofs</title>\n" +
+ "</document>\n").getBytes(StandardCharsets.UTF_8)));
+ documents = Collections.unmodifiableList(docs);
+ }
+
+ private void writeDocuments(Session session) throws IOException {
+ TestUtils.writeDocuments(session, documents);
+ }
+
+ private void writeDocument(Session session) throws IOException {
+ TestUtils.writeDocuments(session, Collections.<TestDocument>singletonList(documents.get(0)));
+ }
+
+ private void testServerWithMock(V3MockParsingRequestHandler serverMock, boolean failFast) throws Exception {
+ try (Server server = new Server(serverMock, 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(0)
+ .build())
+ .setFeedParams(new FeedParams.Builder()
+ .setLocalQueueTimeOut(failFast ? 0 : 120000)
+ .setServerTimeout(120, TimeUnit.SECONDS)
+ .setClientTimeout(120, TimeUnit.SECONDS)
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", server.getPort(), false))
+ .build())
+ .build())) {
+
+ writeDocument(session);
+ Map<String, Result> results = getResults(session, 1);
+ assertThat(results.size(), is(1));
+
+ TestDocument document = documents.get(0);
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatSingleDestinationWorks() throws Exception {
+ try (Server server = new Server(new V3MockParsingRequestHandler(), 0);
+ Session session = SessionFactory.create(Endpoint.create("localhost", server.getPort(), false))) {
+
+ writeDocuments(session);
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(true));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatBadResponseCodeFails() throws Exception {
+ testServerWithMock(new V3MockParsingRequestHandler(407), true);
+ }
+
+ @Test
+ public void requireThatUnexpectedVersionIsHandledProperly() throws Exception {
+ testServerWithMock(new V3MockParsingRequestHandler(
+ 200, V3MockParsingRequestHandler.Scenario.RETURN_UNEXPECTED_VERSION), true);
+ }
+
+ @Test
+ public void requireThatNonAcceptedVersionIsHandledProperly() throws Exception {
+ testServerWithMock(new V3MockParsingRequestHandler(
+ 200, V3MockParsingRequestHandler.Scenario.DONT_ACCEPT_VERSION), true);
+ }
+
+ @Test
+ public void requireThatNon200OkIsHandledProperly() throws Exception {
+ testServerWithMock(new V3MockParsingRequestHandler(
+ 200, V3MockParsingRequestHandler.Scenario.INTERNAL_SERVER_ERROR), true);
+ }
+
+ @Test
+ public void requireThatMbusErrorIsHandledProperly() throws Exception {
+ testServerWithMock(new V3MockParsingRequestHandler(
+ 200, V3MockParsingRequestHandler.Scenario.MBUS_RETURNED_ERROR), false);
+ }
+
+ @Test
+ public void requireThatTimeoutWorks() throws Exception {
+ try (Server server = new Server(new V3MockParsingRequestHandler(
+ 200, V3MockParsingRequestHandler.Scenario.NEVER_RETURN_ANY_RESULTS), 0);
+ Session session = SessionFactory.create(
+ new SessionParams.Builder()
+ .setFeedParams(new FeedParams.Builder()
+ .setLocalQueueTimeOut(0)
+ .build())
+ .setConnectionParams(
+ new ConnectionParams.Builder()
+ .setNumPersistentConnectionsPerEndpoint(1)
+ .setMaxRetries(2)
+ .build())
+ .setFeedParams(
+ new FeedParams.Builder()
+ .setServerTimeout(500, TimeUnit.MILLISECONDS)
+ .setClientTimeout(500, TimeUnit.MILLISECONDS)
+ .build())
+ .addCluster(
+ new Cluster.Builder()
+ .addEndpoint(Endpoint.create("localhost", server.getPort(), false))
+ .build())
+ .build())) {
+
+ writeDocuments(session);
+
+ Map<String, Result> results = getResults(session, documents.size());
+ assertThat(results.size(), is(documents.size()));
+
+ for (TestDocument document : documents) {
+ Result r = results.remove(document.getDocumentId());
+ assertThat(r, not(nullValue()));
+ assertThat(r.getDetails().toString(), r.isSuccess(), is(false));
+ assertThat(r.isTransient(), is(true));
+ }
+ assertThat(results.isEmpty(), is(true));
+ }
+ }
+
+ @Test
+ public void requireThatCouldNotFeedErrorIsHandledProperly() throws Exception {
+ testServerWithMock(new V3MockParsingRequestHandler(
+ 200, V3MockParsingRequestHandler.Scenario.COULD_NOT_FEED), false);
+ }
+
+ @Test
+ public void requireThatImmediateDisconnectIsHandledProperly() throws Exception {
+ testServerWithMock(new V3MockParsingRequestHandler(
+ 200, V3MockParsingRequestHandler.Scenario.DISCONNECT_IMMEDIATELY), true);
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/ClusterTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/ClusterTest.java
new file mode 100644
index 00000000000..f80f8db33a3
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/ClusterTest.java
@@ -0,0 +1,38 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.config;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.22
+ */
+public class ClusterTest {
+
+ @Test
+ public void testSimple() {
+ Cluster cluster = new Cluster.Builder().build();
+
+ assertThat(cluster.getEndpoints().size(), is(0));
+ assertThat(cluster.getRoute(), nullValue());
+ }
+
+ @Test
+ public void testConfig() {
+ Cluster cluster = new Cluster.Builder()
+ .addEndpoint(Endpoint.create("a"))
+ .addEndpoint(Endpoint.create("b"))
+ .setRoute("blah")
+ .build();
+
+ assertThat(cluster.getEndpoints().size(), is(2));
+ assertThat(cluster.getEndpoints().get(0).getHostname(), equalTo("a"));
+ assertThat(cluster.getEndpoints().get(1).getHostname(), equalTo("b"));
+ assertThat(cluster.getRoute(), equalTo("blah"));
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/ConnectionParamsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/ConnectionParamsTest.java
new file mode 100644
index 00000000000..49ffabbf1d0
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/ConnectionParamsTest.java
@@ -0,0 +1,60 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.config;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.security.NoSuchAlgorithmException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.22
+ */
+public class ConnectionParamsTest {
+
+ @Test
+ public void testDefaults() {
+ ConnectionParams params = new ConnectionParams.Builder().build();
+
+ assertThat(params.getHeaders().isEmpty(), is(true));
+ assertThat(params.getNumPersistentConnectionsPerEndpoint(), is(8));
+ assertThat(params.getSslContext(), nullValue());
+ }
+
+ @Test
+ public void testSetters() throws NoSuchAlgorithmException {
+ ConnectionParams params = new ConnectionParams.Builder()
+ .addHeader("Foo", "Bar")
+ .addHeader("Foo", "Baz")
+ .addHeader("Banana", "Apple")
+ .setNumPersistentConnectionsPerEndpoint(2)
+ .setSslContext(SSLContext.getDefault())
+ .build();
+
+ assertThat(params.getNumPersistentConnectionsPerEndpoint(), is(2));
+
+ assertThat(params.getHeaders().isEmpty(), is(false));
+ assertThat(params.getHeaders().size(), is(3));
+ //Iteration order seems stable, let's keep it like this for now
+ Iterator<Map.Entry<String, String>> headers = params.getHeaders().iterator();
+ Map.Entry<String, String> header1 = headers.next();
+ assertThat(header1.getKey(), equalTo("Foo"));
+ assertThat(header1.getValue(), equalTo("Bar"));
+ Map.Entry<String, String> header2 = headers.next();
+ assertThat(header2.getKey(), equalTo("Foo"));
+ assertThat(header2.getValue(), equalTo("Baz"));
+ Map.Entry<String, String> header3 = headers.next();
+ assertThat(header3.getKey(), equalTo("Banana"));
+ assertThat(header3.getValue(), equalTo("Apple"));
+ }
+
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/EndpointTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/EndpointTest.java
new file mode 100644
index 00000000000..e5718a4ba20
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/EndpointTest.java
@@ -0,0 +1,51 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.config;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.22
+ */
+public class EndpointTest {
+
+ @Test
+ public void testBasic() {
+ Endpoint endpoint = Endpoint.create("foo");
+
+ assertThat(endpoint.getHostname(), equalTo("foo"));
+ assertThat(endpoint.getPort(), equalTo(4080));
+ assertThat(endpoint.isUseSsl(), is(false));
+ }
+
+ @Test
+ public void testAdvanced() {
+ Endpoint endpoint = Endpoint.create("bar", 1234, true);
+
+ assertThat(endpoint.getHostname(), equalTo("bar"));
+ assertThat(endpoint.getPort(), equalTo(1234));
+ assertThat(endpoint.isUseSsl(), is(true));
+ }
+
+ @Test
+ public void testMethods() {
+ Endpoint a = Endpoint.create("a");
+ Endpoint b = Endpoint.create("b");
+
+ assertThat(a, not(equalTo(b)));
+ assertThat(a.hashCode(), not(equalTo(b.hashCode())));
+
+ Endpoint a2 = Endpoint.create("a");
+
+ assertThat(a, equalTo(a2));
+ assertThat(a.hashCode(), equalTo(a2.hashCode()));
+
+ a.toString();
+ }
+
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/FeedParamsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/FeedParamsTest.java
new file mode 100644
index 00000000000..2865935b10f
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/config/FeedParamsTest.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.config;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.22
+ */
+public class FeedParamsTest {
+
+ @Test
+ public void testDefaults() {
+ FeedParams params = new FeedParams.Builder().build();
+
+ assertThat(params.getDataFormat(), equalTo(FeedParams.DataFormat.XML_UTF8));
+ assertThat(params.getMaxChunkSizeBytes(), is(50 * 1024));
+ assertThat(params.getRoute(), nullValue());
+ assertThat(params.getServerTimeout(TimeUnit.SECONDS), is(180L));
+ assertThat(params.getClientTimeout(TimeUnit.SECONDS), is(20L));
+ }
+
+ @Test
+ public void testConfig() {
+ FeedParams params = new FeedParams.Builder()
+ .setDataFormat(FeedParams.DataFormat.XML_UTF8)
+ .setMaxChunkSizeBytes(123)
+ .setRoute("abc")
+ .setClientTimeout(321, TimeUnit.SECONDS)
+ .build();
+
+ assertThat(params.getDataFormat(), equalTo(FeedParams.DataFormat.XML_UTF8));
+ assertThat(params.getMaxChunkSizeBytes(), is(123));
+ assertThat(params.getRoute(), equalTo("abc"));
+ assertThat(params.getServerTimeout(TimeUnit.SECONDS), is(180L));
+ assertThat(params.getClientTimeout(TimeUnit.SECONDS), is(321L));
+
+ params = new FeedParams.Builder()
+ .setServerTimeout(333L, TimeUnit.SECONDS)
+ .setClientTimeout(222L, TimeUnit.SECONDS)
+ .build();
+
+ assertThat(params.getServerTimeout(TimeUnit.SECONDS), is(333L));
+ assertThat(params.getClientTimeout(TimeUnit.SECONDS), is(222L));
+ }
+
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java
new file mode 100644
index 00000000000..598bf6d7a4b
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java
@@ -0,0 +1,38 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class DocumentTest {
+ @Test
+ public void simpleCaseOk() throws Document.DocumentException {
+ String docId = "doc id";
+ String docContent = "foo";
+ Document document = new Document(docId, docContent.getBytes(), null /* context */);
+ assertThat(document.getDocumentId(), is(docId));
+ assertThat(document.getData(), is(ByteBuffer.wrap(docContent.getBytes())));
+ assertThat(document.getDataAsString().toString(), is(docContent));
+ // Make sure that data is not modified on retrieval.
+ assertThat(document.getDataAsString().toString(), is(docContent));
+ assertThat(document.getData(), is(ByteBuffer.wrap(docContent.getBytes())));
+ assertThat(document.getDocumentId(), is(docId));
+ }
+
+ @Test(expected = ReadOnlyBufferException.class)
+ public void notMutablePutTest() {
+ Document document = new Document("id", "data", null /* context */);
+ document.getData().put("a".getBytes());
+ }
+
+ @Test(expected = ReadOnlyBufferException.class)
+ public void notMutableCompactTest() {
+ Document document = new Document("id", "data", null /* context */);
+ document.getData().compact();
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/EncoderTestCase.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/EncoderTestCase.java
new file mode 100644
index 00000000000..85507d99d47
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/EncoderTestCase.java
@@ -0,0 +1,240 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Functional tests for encoding Encoder, i.e. encoding scheme only producing
+ * ASCII and never containing white space or control characters.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class EncoderTestCase {
+ private final String basic = "abc123";
+ private final String basic2 = "abc{20}123";
+ private final String quotedIsLast = "abc{20}";
+ private final String quotedIsLastDecoded = "abc ";
+ private final String basic2Decoded = "abc 123";
+ private final String unterminated = "abc{33";
+ private final String unterminated2 = "abc{";
+ private final String emptyQuoted = "abc{}123";
+ private final String outsideUnicode = "abc{7fffffff}";
+ private final String noise = "abc{7fff{||\\ffff}";
+ private final String fullAsciiEncoded = "{0}{1}{2}{3}{4}{5}{6}{7}{8}{9}"
+ + "{a}{b}{c}{d}{e}{f}{10}{11}{12}{13}{14}{15}{16}{17}"
+ + "{18}{19}{1a}{1b}{1c}{1d}{1e}{1f}{20}"
+ + "!\"#$%&'()*+,-./0123456789:;<=>?@"
+ + "ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`"
+ + "abcdefghijklmnopqrstuvwxyz{7b}|{7d}~{7f}";
+ private final int[] testCodepoints = { 0x0, '\n', ' ', 'a', '{', '}', 0x7f, 0x80,
+ 0x7ff, 0x800, 0xd7ff, 0xe000, 0xffff, 0x10000, 0x10ffff, 0x34,
+ 0x355, 0x2567, 0xfff, 0xe987, 0x100abc };
+ private final String semiNastyEncoded = "{0}{a}{20}a{7b}{7d}{7f}{80}"
+ + "{7ff}{800}{d7ff}{e000}{ffff}{10000}{10ffff}4"
+ + "{355}{2567}{fff}{e987}{100abc}";
+ private final String invalidUnicode = "abc\ud812";
+ private final String invalidUnicodeEncoded = "abc{d812}";
+
+ StringBuilder s;
+
+ @Before
+ public void setUp() {
+ s = new StringBuilder();
+ }
+
+ @After
+ public void tearDown() {
+ s = null;
+ }
+
+ @Test
+ public final void testBasic() {
+ Encoder.encode(basic, s);
+ assertEquals(basic, s.toString());
+ }
+
+ @Test
+ public final void testBasic2() {
+ Encoder.encode(basic2Decoded, s);
+ assertEquals(basic2, s.toString());
+ }
+
+ @Test
+ public final void testEncodeAscii() {
+ Encoder.encode(fullAscii(), s);
+ assertEquals(fullAsciiEncoded, s.toString());
+ }
+
+ @Test
+ public final void testEncodeMixed() {
+ Encoder.encode(semiNasty(), s);
+ assertEquals(semiNastyEncoded, s.toString());
+ }
+
+ @Test
+ public final void testEncodeQuotedIsLast() {
+ Encoder.encode(quotedIsLastDecoded, s);
+ assertEquals(quotedIsLast, s.toString());
+ }
+
+ @Test
+ public final void testInvalidUnicode() {
+ Encoder.encode(invalidUnicode, s);
+ assertEquals(invalidUnicodeEncoded, s.toString());
+ }
+
+
+ @Test
+ public final void testDecodeBasic() {
+ Encoder.decode(basic, s);
+ assertEquals(basic, s.toString());
+ }
+
+ @Test
+ public final void testDecodeBasic2() {
+ Encoder.decode(basic2, s);
+ assertEquals(basic2Decoded, s.toString());
+ }
+
+ @Test
+ public final void testDecodeAscii() {
+ Encoder.decode(fullAsciiEncoded, s);
+ assertEquals(fullAscii(), s.toString());
+ }
+
+ @Test
+ public final void testDecodeMixed() {
+ Encoder.decode(semiNastyEncoded, s);
+ assertEquals(semiNasty(), s.toString());
+ }
+
+
+
+ @Test
+ public final void testDecodeQuotedIsLast() {
+ Encoder.decode(quotedIsLast, s);
+ assertEquals(quotedIsLastDecoded, s.toString());
+ }
+
+
+ @Test
+ public final void testDecodeUnterminated() {
+ try {
+ Encoder.decode(unterminated, s);
+ } catch (IllegalArgumentException e) {
+ return;
+ }
+ fail("Expected IllegalArgumentException");
+ }
+
+ @Test
+ public final void testDecodeUnterminated2() {
+ try {
+ Encoder.decode(unterminated2, s);
+ } catch (IllegalArgumentException e) {
+ return;
+ }
+ fail("Expected IllegalArgumentException");
+
+ }
+
+ @Test
+ public final void testEmptyQuoted() {
+ try {
+ Encoder.decode(emptyQuoted, s);
+ } catch (IllegalArgumentException e) {
+ return;
+ }
+ fail("Expected IllegalArgumentException");
+ }
+
+ @Test
+ public final void testOutsideUnicode() {
+ try {
+ Encoder.decode(outsideUnicode, s);
+ } catch (IllegalArgumentException e) {
+ return;
+ }
+ fail("Expected IllegalArgumentException");
+ }
+
+
+ @Test
+ public final void testNoise() {
+ try {
+ Encoder.decode(noise, s);
+ } catch (IllegalArgumentException e) {
+ return;
+ }
+ fail("Expected IllegalArgumentException");
+ }
+
+ @Test
+ public final void testIllegalInputCharacter() {
+ try {
+ Encoder.decode("abc\u00e5", s);
+ } catch (IllegalArgumentException e) {
+ return;
+ }
+ fail("Expected IllegalArgumentException");
+ }
+
+
+ @Test
+ public final void testNoIllegalCharactersInOutputForAscii() {
+ Encoder.encode(fullAscii(), s);
+ checkNoNonAscii(s.toString());
+ }
+
+ @Test
+ public final void testNoIllegalCharactersInOutputForMixedInput() {
+ Encoder.encode(semiNasty(), s);
+ checkNoNonAscii(s.toString());
+ }
+
+ @Test
+ public final void testSymmetryAscii() {
+ StringBuilder forDecoding = new StringBuilder();
+ Encoder.encode(fullAscii(), s);
+ Encoder.decode(s.toString(), forDecoding);
+ assertEquals(fullAscii(), forDecoding.toString());
+ }
+
+ @Test
+ public final void testSymmetryMixed() {
+ StringBuilder forDecoding = new StringBuilder();
+ Encoder.encode(semiNasty(), s);
+ Encoder.decode(s.toString(), forDecoding);
+ assertEquals(semiNasty(), forDecoding.toString());
+ }
+
+
+ private void checkNoNonAscii(String input) {
+ for (int i = 0; i < input.length(); ++i) {
+ char c = input.charAt(i);
+ if (c > '~' || c <= ' ') {
+ fail("Encoded data contained character ordinal " + Integer.toHexString(c));
+ }
+ }
+ }
+
+ private String fullAscii() {
+ StringBuilder s = new StringBuilder();
+ for (int i = 0; i <= 0x7f; ++i) {
+ s.append((char) i);
+ }
+ return s.toString();
+ }
+
+ private String semiNasty() {
+ StringBuilder s = new StringBuilder();
+ for (int i = 0; i < testCodepoints.length; ++i) {
+ s.append(Character.toChars(testCodepoints[i]));
+ }
+ return s.toString();
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/ThrottlePolicyTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/ThrottlePolicyTest.java
new file mode 100644
index 00000000000..c1ca6f44c04
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/ThrottlePolicyTest.java
@@ -0,0 +1,76 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+public class ThrottlePolicyTest {
+
+ private final ThrottlePolicy throttlePolicy = new ThrottlePolicy();
+ // Default values for tests.
+ private int numOk = 1000;
+ private int prevOk = 1000;
+ private int prevMax = 100;
+ private int max = 100;
+ private boolean queued = true;
+ private double dynamicFactor = 0.1;
+
+ @Test
+ public void samePerformanceShouldTuneDown() {
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(95));
+ }
+
+ @Test
+ public void improvedPerformanceSameSizeShouldTuneDown() {
+ numOk += 200;
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(89));
+ }
+
+ @Test
+ public void improvedPerformanceSmallerSizeTuneDownFurther() {
+ numOk += 200;
+ max = 70;
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(63));
+ }
+
+ @Test
+ public void improvedPerformanceLargerSizeIncrease() {
+ numOk += 200;
+ max = 130;
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(143));
+ dynamicFactor = 100;
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(156));
+ }
+
+ @Test
+ public void improvedPerformanceLargerSizeButQueuedFalse() {
+ numOk += 200;
+ max = 130;
+ queued = false;
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(128));
+ }
+
+ @Test
+ public void lowerPerformanceSameSizeShouldIncrease() {
+ numOk -= 200;
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(110));
+ }
+
+ @Test
+ public void lowerPerformanceSmallerSizeShouldIncreaseSize() {
+ numOk -= 200;
+ max = 30;
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(33));
+ dynamicFactor = 100;
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(36));
+ }
+
+ @Test
+ public void lowerPerformanceLargerSizeTuneDownFurther() {
+ numOk -= 200;
+ max = 130;
+ assertThat(throttlePolicy.calcNewMaxInFlight(dynamicFactor, numOk, prevOk, prevMax, max, queued), is(116));
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/XmlFeedReaderTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/XmlFeedReaderTest.java
new file mode 100644
index 00000000000..b263580df9c
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/XmlFeedReaderTest.java
@@ -0,0 +1,259 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core;
+
+import com.yahoo.vespa.http.client.FeedClient;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.xml.sax.SAXParseException;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+public class XmlFeedReaderTest {
+ private final static String feedResource = "/vespacorpfeed-prod-sample.xml";
+
+ private final static String feedResource2 = "/xml-challenge.xml";
+ private final static String feedResource3 = "/xml-challenge2.xml";
+ private final static String feedResource4 = "/xml-challenge3.xml";
+
+ private final String updateDocUpdate =
+ "<?xml version=\"1.0\"?>\n" +
+ "<vespafeed>\n" +
+ "<update documentid=\"id:banana:banana::complex\" documenttype=\"banana\">\n" +
+ " <add fieldpath=\"structarr\">\n" +
+ " <item>\n" +
+ " <bytearr>\n" +
+ " <item>30</item>\n" +
+ " <item>55</item>\n" +
+ " </bytearr>\n" +
+ " </item>\n" +
+ " </add>\n" +
+ "</update>\n" +
+ "</vespafeed>\n";
+
+ @Test
+ public void testReadUpdate() throws Exception {
+ InputStream stream = new ByteArrayInputStream(updateDocUpdate.getBytes(StandardCharsets.UTF_8));
+ AtomicInteger numSent = new AtomicInteger(0);
+ FeedClient feedClient = mock(FeedClient.class);
+ XmlFeedReader.read(stream, feedClient, numSent);
+ assertThat(numSent.get(), is(1));
+ }
+
+ private final String updateDocRemove =
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
+ "\n" +
+ "<vespafeed>\n" +
+ " <remove documentid=\"id:music:music::http://music.yahoo.com/Bob0/BestOf\" />\n" +
+ " <remove documentid=\"id:music:music::http://music.yahoo.com/Bob9/BestOf\" />\n" +
+ "</vespafeed>";
+
+ @Test
+ public void testReadRemove() throws Exception {
+ InputStream stream = new ByteArrayInputStream(updateDocRemove.getBytes(StandardCharsets.UTF_8));
+ AtomicInteger numSent = new AtomicInteger(0);
+ FeedClient feedClient = mock(FeedClient.class);
+ XmlFeedReader.read(stream, feedClient, numSent);
+ assertThat(numSent.get(), is(2));
+ }
+
+ private final String insertDocOperation = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"+
+ "<vespafeed>\n"+
+ "\n"+
+ " <document type=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n"+
+ " <title>Best of Bob Dylan</title>\n"+
+ " </document>\n"+
+ "\n"+
+ " <document type=\"music\" documentid=\"id:music:music::http://music.yahoo.com/metallica/BestOf\">\n"+
+ " <title>Best of Metallica</title>\n"+
+ " </document>\n"+
+ "</vespafeed>";
+
+ @Test
+ public void testInsert() throws Exception {
+ InputStream stream = new ByteArrayInputStream(insertDocOperation.getBytes(StandardCharsets.UTF_8));
+ AtomicInteger numSent = new AtomicInteger(0);
+ FeedClient feedClient = mock(FeedClient.class);
+ XmlFeedReader.read(stream, feedClient, numSent);
+ assertThat(numSent.get(), is(2));
+ }
+
+ private final String badperation = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"+
+ "<vespafeed>\n"+
+ " <badtag type=\"music\" documentid=\"id:music:music::http://music.yahoo.com/bobdylan/BestOf\">\n"+
+ " <title>Best of Bob Dylan</title>\n"+
+ " </badtag>\n"+
+ "</vespafeed>";
+
+ @Test
+ public void testNonDocument() throws Exception {
+ InputStream stream = new ByteArrayInputStream(badperation.getBytes(StandardCharsets.UTF_8));
+ AtomicInteger numSent = new AtomicInteger(0);
+ FeedClient feedClient = mock(FeedClient.class);
+ XmlFeedReader.read(stream, feedClient, numSent);
+ assertThat(numSent.get(), is(0));
+ }
+
+ @Test(expected=SAXParseException.class)
+ public void testGarbage() throws Exception {
+ InputStream stream = new ByteArrayInputStream("eehh".getBytes(StandardCharsets.UTF_8));
+ AtomicInteger numSent = new AtomicInteger(0);
+ FeedClient feedClient = mock(FeedClient.class);
+ XmlFeedReader.read(stream, feedClient, numSent);
+ }
+
+ @Test
+ public void testEncoding() throws Exception {
+ InputStream stream = new ByteArrayInputStream("<?xml version=\"1.0\" encoding=\"utf8\"?><vespafeed><remove documentid=\"id:&amp;\"/></vespafeed>"
+ .getBytes(StandardCharsets.UTF_8));
+ AtomicInteger numSent = new AtomicInteger(0);
+ FeedClient feedClient = mock(FeedClient.class);
+
+ doAnswer(new Answer<Object>() {
+ public Object answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ String docId = (String) args[0];
+ CharSequence value = (CharSequence)args[1];
+ assertThat(value.toString(), is("<remove documentid=\"id:&amp;\"></remove>"));
+ assertThat(docId, is("id:&"));
+ return null;
+ }
+ }).when(feedClient).stream(anyString(), anyObject());
+ XmlFeedReader.read(stream, feedClient, numSent);
+ assertThat(numSent.get(), is(1));
+ }
+
+ private final String characterDocs = "" +
+ "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" +
+ "<!-- GENERATED VESPA-XML BY YSSTOXML -->\n" +
+ "<!-- USE ONLY FOR BATCH INDEXING -->\n" +
+ "<vespafeed>\n" +
+ " <document documenttype=\"simple\" documentid=\"id:test::&amp;http://www.e.no/matprat\">\n" +
+ " <language><![CDATA[ja]]></language>\n" +
+ " <title><![CDATA[test document1]]></title>\n" +
+ " <description><![CDATA[Bjørnen' blåbær på øy nærheten.]]></description>\n" +
+ " <date>1091356845</date>\n" +
+ " <surl><![CDATA[http://www.eventyr.no/matprat]]></surl>\n" +
+ " </document>\n" +
+ "\n" +
+ "</vespafeed>\n";
+
+ @Test
+ public void testCharacterEndcoding() throws Exception {
+ InputStream stream = new ByteArrayInputStream(characterDocs.getBytes(StandardCharsets.UTF_8));
+ AtomicInteger numSent = new AtomicInteger(0);
+ FeedClient feedClient = mock(FeedClient.class);
+ final AtomicBoolean success = new AtomicBoolean(false);
+ doAnswer(new Answer<Object>() {
+ public Object answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ String docId = (String) args[0];
+ CharSequence value = (CharSequence)args[1];
+ assertThat(value.toString(), is(
+ "<document documenttype=\"simple\" documentid=\"id:test::&amp;http://www.e.no/matprat\">\n" +
+ " <language><![CDATA[ja]]></language>\n" +
+ " <title><![CDATA[test document1]]></title>\n" +
+ " <description><![CDATA[Bjørnen' blåbær på øy nærheten.]]></description>\n" +
+ " <date>1091356845</date>\n" +
+ " <surl><![CDATA[http://www.eventyr.no/matprat]]></surl>\n" +
+ " </document>"));
+ success.set(true);
+ return null;
+ }
+ }).when(feedClient).stream(anyString(), anyObject());
+ XmlFeedReader.read(stream, feedClient, numSent);
+ assertThat(numSent.get(), is(1));
+ assert(success.get());
+ }
+
+ @Test
+ public void testRealData() throws Exception {
+ InputStream inputStream = XmlFeedReaderTest.class.getResourceAsStream(feedResource);
+ BufferedInputStream bis = new BufferedInputStream(inputStream);
+ AtomicInteger numSent = new AtomicInteger(0);
+ FeedClient feedClient = mock(FeedClient.class);
+
+ XmlFeedReader.read(bis, feedClient, numSent);
+ assertThat(numSent.get(), is(6));
+ }
+
+ private static class XmlTestFeedClient implements FeedClient {
+
+ public List<String> documentIds = new ArrayList<>();
+ public List<CharSequence> datas = new ArrayList<>();
+ public List<Object> contexts = new ArrayList<>();
+
+ @Override
+ public void stream(String documentId, CharSequence documentData) {
+ stream(documentId, documentData, null);
+ }
+
+ @Override
+ public void stream(String documentId, CharSequence documentData, Object context) {
+ documentIds.add(documentId.toString());
+ datas.add(documentData);
+ contexts.add(context);
+ }
+
+
+ @Override
+ public void close() { }
+
+ @Override
+ public String getStatsAsJson() { return null; }
+ }
+
+ // Only for xml with single doc.
+ private void verifyNoTransformationOfXml(String filename) throws Exception {
+ InputStream inputStream = XmlFeedReaderTest.class.getResourceAsStream(filename);
+ BufferedInputStream bis = new BufferedInputStream(inputStream);
+ AtomicInteger numSent = new AtomicInteger(0);
+ XmlTestFeedClient feedClient = new XmlTestFeedClient();
+ XmlFeedReader.read(bis, feedClient, numSent);
+ assertThat(numSent.get(), is(1));
+ String document = feedClient.datas.get(0).toString();
+
+ InputStream inputStream2 = XmlFeedReaderTest.class.getResourceAsStream(filename);
+ String rawXML = new java.util.Scanner(inputStream2, "UTF-8").useDelimiter("\\A").next();
+
+ String rawDoc = rawXML.toString().split("<document")[1].split("</document>")[0];
+ assertThat(rawDoc.length() > 30, is(true));
+
+ String decodedRawXml = StringEscapeUtils.unescapeXml(rawDoc);
+ String decodedDoc = StringEscapeUtils.unescapeXml(document);
+
+ assertThat(decodedDoc, containsString(decodedRawXml));
+ }
+
+ @Test public void testCData() throws Exception {
+ verifyNoTransformationOfXml(feedResource2);
+ }
+
+ @Test public void testPCData() throws Exception {
+ verifyNoTransformationOfXml(feedResource3);
+ }
+
+ @Test public void testAposData() throws Exception {
+ verifyNoTransformationOfXml(feedResource4);
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
new file mode 100644
index 00000000000..fa4ad8fa175
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
@@ -0,0 +1,318 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.yahoo.vespa.http.client.core.Headers;
+import com.yahoo.vespa.http.client.TestUtils;
+import com.yahoo.vespa.http.client.core.Document;
+import org.apache.http.Header;
+import org.apache.http.HeaderElement;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.ParseException;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.InputStreamEntity;
+import org.junit.Test;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+
+public class ApacheGatewayConnectionTest {
+
+ @Test
+ public void testProtocolV3() throws Exception {
+ final Endpoint endpoint = Endpoint.create("hostname", 666, false);
+ final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
+ final String clusterSpecificRoute = "";
+ final ConnectionParams connectionParams = new ConnectionParams.Builder()
+ .setEnableV3Protocol(true)
+ .build();
+ final List<Document> documents = new ArrayList<>();
+
+ final ApacheGatewayConnection.HttpClientFactory mockFactory =
+ mock(ApacheGatewayConnection.HttpClientFactory.class);
+ final HttpClient httpClientMock = mock(HttpClient.class);
+ when(mockFactory.createClient()).thenReturn(httpClientMock);
+
+ final CountDownLatch verifyContentSentLatch = new CountDownLatch(1);
+
+ final String vespaDocContent ="Hello, I a JSON doc.";
+ final String docId = "42";
+
+ final AtomicInteger requestsReceived = new AtomicInteger(0);
+
+ // This is the fake server, takes header client ID and uses this as session Id.
+ stub(httpClientMock.execute(any())).toAnswer((Answer) invocation -> {
+ final Object[] args = invocation.getArguments();
+ final HttpPost post = (HttpPost) args[0];
+ final Header clientIdHeader = post.getFirstHeader(Headers.CLIENT_ID);
+ verifyContentSentLatch.countDown();
+ return httpResponse(clientIdHeader.getValue(), "3");
+ });
+
+ ApacheGatewayConnection apacheGatewayConnection =
+ new ApacheGatewayConnection(
+ endpoint,
+ feedParams,
+ clusterSpecificRoute,
+ connectionParams,
+ mockFactory,
+ "clientId");
+ apacheGatewayConnection.connect();
+ apacheGatewayConnection.handshake();
+ documents.add(createDoc(docId, vespaDocContent, true));
+
+ apacheGatewayConnection.writeOperations(documents);
+ assertTrue(verifyContentSentLatch.await(10, TimeUnit.SECONDS));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testServerReturnsBadSessionInV3() throws Exception {
+ final Endpoint endpoint = Endpoint.create("hostname", 666, false);
+ final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
+ final String clusterSpecificRoute = "";
+ final ConnectionParams connectionParams = new ConnectionParams.Builder()
+ .setEnableV3Protocol(true)
+ .build();
+
+ final ApacheGatewayConnection.HttpClientFactory mockFactory =
+ mock(ApacheGatewayConnection.HttpClientFactory.class);
+ final HttpClient httpClientMock = mock(HttpClient.class);
+ when(mockFactory.createClient()).thenReturn(httpClientMock);
+
+ // This is the fake server, returns wrong session Id.
+ stub(httpClientMock.execute(any())).toAnswer(invocation -> {
+ return httpResponse("Wrong Id from server", "3");
+ });
+
+ ApacheGatewayConnection apacheGatewayConnection =
+ new ApacheGatewayConnection(
+ endpoint,
+ feedParams,
+ clusterSpecificRoute,
+ connectionParams,
+ mockFactory,
+ "clientId");
+ apacheGatewayConnection.connect();
+ final List<Document> documents = new ArrayList<>();
+ apacheGatewayConnection.writeOperations(documents);
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testBadConfigParameters() throws Exception {
+ final Endpoint endpoint = Endpoint.create("hostname", 666, false);
+ final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
+ final String clusterSpecificRoute = "";
+ final ConnectionParams connectionParams = new ConnectionParams.Builder()
+ .setEnableV3Protocol(true)
+ .build();
+
+ final ApacheGatewayConnection.HttpClientFactory mockFactory =
+ mock(ApacheGatewayConnection.HttpClientFactory.class);
+
+ new ApacheGatewayConnection(
+ endpoint,
+ feedParams,
+ clusterSpecificRoute,
+ connectionParams,
+ mockFactory,
+ null);
+ }
+
+ @Test
+ public void testJsonDocumentHeader() throws Exception {
+ final Endpoint endpoint = Endpoint.create("hostname", 666, false);
+ final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
+ final String clusterSpecificRoute = "";
+ final ConnectionParams connectionParams = new ConnectionParams.Builder()
+ .setUseCompression(true)
+ .build();
+ final List<Document> documents = new ArrayList<>();
+
+ final ApacheGatewayConnection.HttpClientFactory mockFactory =
+ mock(ApacheGatewayConnection.HttpClientFactory.class);
+ final HttpClient httpClientMock = mock(HttpClient.class);
+ when(mockFactory.createClient()).thenReturn(httpClientMock);
+
+ final CountDownLatch verifyContentSentLatch = new CountDownLatch(1);
+
+ final String vespaDocContent ="Hello, I a JSON doc.";
+ final String docId = "42";
+
+ final AtomicInteger requestsReceived = new AtomicInteger(0);
+
+ // This is the fake server, checks that DATA_FORMAT header is set properly.
+ stub(httpClientMock.execute(any())).toAnswer((Answer) invocation -> {
+ final Object[] args = invocation.getArguments();
+ final HttpPost post = (HttpPost) args[0];
+ final Header header = post.getFirstHeader(Headers.DATA_FORMAT);
+ if (requestsReceived.incrementAndGet() == 1) {
+ // This is handshake, it is not json.
+ assert(header == null);
+ return httpResponse("clientId", "3");
+ }
+ assertNotNull(header);
+ assertThat(header.getValue(), is(FeedParams.DataFormat.JSON_UTF8.name()));
+ // Test is done.
+ verifyContentSentLatch.countDown();
+ return httpResponse("clientId", "3");
+ });
+
+ ApacheGatewayConnection apacheGatewayConnection =
+ new ApacheGatewayConnection(
+ endpoint,
+ feedParams,
+ clusterSpecificRoute,
+ connectionParams,
+ mockFactory,
+ "clientId");
+ apacheGatewayConnection.connect();
+ apacheGatewayConnection.handshake();
+
+ documents.add(createDoc(docId, vespaDocContent, true));
+
+ apacheGatewayConnection.writeOperations(documents);
+ assertTrue(verifyContentSentLatch.await(10, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testZipAndCreateEntity() throws IOException {
+ final String testString = "Hello world";
+ InputStream stream = new ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8));
+ // Send in test data to method.
+ InputStreamEntity inputStreamEntity = ApacheGatewayConnection.zipAndCreateEntity(stream);
+ // Verify zipped data by comparing unzipped data with test data.
+ final String rawContent = TestUtils.zipStreamToString(inputStreamEntity.getContent());
+ assert(testString.equals(rawContent));
+ }
+
+ /**
+ * Mocks the HttpClient, and verifies that the compressed data is sent.
+ */
+ @Test
+ public void testCompressedWriteOperations() throws Exception {
+ final Endpoint endpoint = Endpoint.create("hostname", 666, false);
+ final FeedParams feedParams = new FeedParams.Builder().build();
+ final String clusterSpecificRoute = "";
+ final ConnectionParams connectionParams = new ConnectionParams.Builder()
+ .setUseCompression(true)
+ .build();
+ final List<Document> documents = new ArrayList<>();
+
+ final ApacheGatewayConnection.HttpClientFactory mockFactory =
+ mock(ApacheGatewayConnection.HttpClientFactory.class);
+ final HttpClient httpClientMock = mock(HttpClient.class);
+ when(mockFactory.createClient()).thenReturn(httpClientMock);
+
+ final CountDownLatch verifyContentSentLatch = new CountDownLatch(1);
+
+ final String vespaDocContent ="Hello, I am the document data.";
+ final String docId = "42";
+
+ final Document doc = createDoc(docId, vespaDocContent, false);
+
+ // When sending data on http client, check if it is compressed. If compressed, unzip, check result,
+ // and count down latch.
+ stub(httpClientMock.execute(any())).toAnswer((Answer) invocation -> {
+ final Object[] args = invocation.getArguments();
+ final HttpPost post = (HttpPost) args[0];
+ final Header header = post.getFirstHeader("Content-Encoding");
+ if (header != null && header.getValue().equals("gzip")) {
+ final String rawContent = TestUtils.zipStreamToString(post.getEntity().getContent());
+ final String vespaHeaderText = "<vespafeed>\n";
+ final String vespaFooterText = "</vespafeed>\n";
+
+ assertThat(rawContent, is(
+ doc.getOperationId() + " 38\n" + vespaHeaderText + vespaDocContent + "\n"
+ + vespaFooterText));
+ verifyContentSentLatch.countDown();
+
+ }
+ return httpResponse("clientId", "3");
+ });
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(statusLineMock.getStatusCode()).thenReturn(200);
+
+ ApacheGatewayConnection apacheGatewayConnection =
+ new ApacheGatewayConnection(
+ endpoint,
+ feedParams,
+ clusterSpecificRoute,
+ connectionParams,
+ mockFactory,
+ "clientId");
+ apacheGatewayConnection.connect();
+ apacheGatewayConnection.handshake();
+
+ documents.add(doc);
+
+ apacheGatewayConnection.writeOperations(documents);
+ assertTrue(verifyContentSentLatch.await(10, TimeUnit.SECONDS));
+ }
+
+ private Document createDoc(final String docId, final String content, boolean useJson) throws IOException {
+ return new Document(docId, content.getBytes(), null /* context */);
+ }
+
+ private void addMockedHeader(
+ final HttpResponse httpResponseMock,
+ final String name,
+ final String value,
+ HeaderElement[] elements) {
+ final Header header = new Header() {
+ @Override
+ public String getName() {
+ return name;
+ }
+ @Override
+ public String getValue() {
+ return value;
+ }
+ @Override
+ public HeaderElement[] getElements() throws ParseException {
+ return elements;
+ }
+ };
+ when(httpResponseMock.getFirstHeader(name)).thenReturn(header);
+ }
+
+ private HttpResponse httpResponse(String sessionIdInResult, String version) throws IOException {
+ final HttpResponse httpResponseMock = mock(HttpResponse.class);
+
+ StatusLine statusLineMock = mock(StatusLine.class);
+ when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+ when(statusLineMock.getStatusCode()).thenReturn(200);
+
+ addMockedHeader(httpResponseMock, Headers.SESSION_ID, sessionIdInResult, null);
+ addMockedHeader(httpResponseMock, Headers.VERSION, version, null);
+ HeaderElement[] headerElements = new HeaderElement[1];
+ headerElements[0] = mock(HeaderElement.class);
+
+ final HttpEntity httpEntityMock = mock(HttpEntity.class);
+ when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
+
+ final InputStream inputs = new ByteArrayInputStream("fake response data".getBytes());
+
+ when(httpEntityMock.getContent()).thenReturn(inputs);
+ return httpResponseMock;
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStreamTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStreamTest.java
new file mode 100644
index 00000000000..1c0b29a9cfe
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStreamTest.java
@@ -0,0 +1,134 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+import com.yahoo.vespa.http.client.core.communication.ByteBufferInputStream;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.20
+ */
+public class ByteBufferInputStreamTest {
+
+ private static ByteBuffer[] getAbcde() {
+ ByteBuffer[] buffers = new ByteBuffer[5];
+ buffers[0] = ByteBuffer.wrap("a".getBytes(StandardCharsets.UTF_8));
+ buffers[1] = ByteBuffer.wrap("b".getBytes(StandardCharsets.UTF_8));
+ buffers[2] = ByteBuffer.wrap("c".getBytes(StandardCharsets.UTF_8));
+ buffers[3] = ByteBuffer.wrap("d".getBytes(StandardCharsets.UTF_8));
+ buffers[4] = ByteBuffer.wrap("e".getBytes(StandardCharsets.UTF_8));
+ return buffers;
+ }
+
+ @Test
+ public void requireThatExhaustedBufferWorks() throws IOException {
+ ByteBuffer[] buffers = getAbcde();
+ buffers[2].get();
+ ByteBufferInputStream in = new ByteBufferInputStream(buffers);
+
+ byte[] out = new byte[100];
+ int pos = 0;
+
+ final int GUARD = 1000;
+ int i;
+ for (i = 0; i < GUARD; i++) {
+ int r = in.read();
+ if (r == -1) {
+ break;
+ }
+ out[pos] = (byte) (0xFF & r);
+ ++pos;
+ }
+ assertTrue(i < GUARD);
+ assertThat(pos, is(4));
+
+ String outString = new String(out, 0, pos, StandardCharsets.UTF_8);
+ assertThat(outString, equalTo("abde"));
+
+ }
+
+ @Test
+ public void requireThatBulkReadWorks() throws IOException {
+ ByteBuffer[] buffers = getAbcde();
+ ByteBufferInputStream in = new ByteBufferInputStream(buffers);
+
+ byte[] out = new byte[100];
+ int pos = 0;
+
+ final int GUARD = 1000;
+ int i;
+ for (i = 0; i < GUARD; i++) {
+ int numReadNow;
+ if (i == 0) {
+ numReadNow = in.read(out);
+ } else {
+ numReadNow = in.read(out, pos, (out.length - pos));
+ }
+ if (numReadNow == -1) {
+ break;
+ }
+ pos += numReadNow;
+ }
+ assertTrue(i < GUARD);
+ assertThat(pos, is(5));
+
+ String outString = new String(out, 0, pos, StandardCharsets.UTF_8);
+ assertThat(outString, equalTo("abcde"));
+ }
+
+ @Test
+ public void requireThatSingleByteReadWorks() throws IOException {
+ ByteBuffer[] buffers = getAbcde();
+ ByteBufferInputStream in = new ByteBufferInputStream(buffers);
+
+ byte[] out = new byte[100];
+ int pos = 0;
+
+ final int GUARD = 1000;
+ int i;
+ for (i = 0; i < GUARD; i++) {
+ int r = in.read();
+ if (r == -1) {
+ break;
+ }
+ out[pos] = (byte) (0xFF & r);
+ ++pos;
+ }
+ assertTrue(i < GUARD);
+ assertThat(pos, is(5));
+
+ String outString = new String(out, 0, pos, StandardCharsets.UTF_8);
+ assertThat(outString, equalTo("abcde"));
+ }
+
+ @Test
+ public void requireThatMarkIsNotSupported() throws IOException {
+ ByteBuffer[] buffers = getAbcde();
+ ByteBufferInputStream in = new ByteBufferInputStream(buffers);
+ assertThat(in.markSupported(), is(false));
+ in.mark(0); //a no-op
+ }
+
+ @Test(expected = IOException.class)
+ public void requireThatResetFails() throws IOException {
+ ByteBuffer[] buffers = getAbcde();
+ ByteBufferInputStream in = new ByteBufferInputStream(buffers);
+ in.reset();
+ }
+
+ @Test(expected = IOException.class)
+ public void requireThatSkipFails() throws IOException {
+ ByteBuffer[] buffers = getAbcde();
+ ByteBufferInputStream in = new ByteBufferInputStream(buffers);
+ in.skip(1L);
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java
new file mode 100644
index 00000000000..c234e524774
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java
@@ -0,0 +1,38 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+import com.yahoo.vespa.http.client.core.Document;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public class CloseableQTestCase {
+ @Test
+ public void requestThatPutIsInterruptedOnClose() throws InterruptedException {
+ final DocumentQueue q = new DocumentQueue(1);
+ q.put(new Document("id", "data", null /* context */));
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+
+ }
+ q.close();
+ q.clear();
+ }
+ });
+ t.start();
+ try {
+ q.put(new Document("id2", "data2", null /* context */));
+ fail("This shouldn't have worked.");
+ } catch (IllegalStateException ise) {
+ // ok!
+ }
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ }
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
new file mode 100644
index 00000000000..711933c9595
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
@@ -0,0 +1,89 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.22
+ */
+public class EndpointResultQueueTest {
+
+ @Test
+ public void testBasics() {
+ Endpoint endpoint = Endpoint.create("a");
+
+ OperationProcessor mockAggregator = mock(OperationProcessor.class);
+ final AtomicInteger resultCount = new AtomicInteger(0);
+
+ doAnswer(invocationOnMock -> {
+ resultCount.getAndIncrement();
+ return null;
+ }).when(mockAggregator).resultReceived(anyObject(), eq(0));
+
+ EndpointResultQueue q = new EndpointResultQueue(
+ mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L * 1000L);
+
+ q.operationSent("op1");
+ assertThat(q.getPendingSize(), is(1));
+ q.operationSent("op2");
+ assertThat(q.getPendingSize(), is(2));
+ q.operationSent("op3");
+ assertThat(q.getPendingSize(), is(3));
+ q.resultReceived(new EndpointResult("op1", new Result.Detail(endpoint)), 0);
+ assertThat(q.getPendingSize(), is(2));
+ q.resultReceived(new EndpointResult("op2", new Result.Detail(endpoint)), 0);
+ assertThat(q.getPendingSize(), is(1));
+ q.resultReceived(new EndpointResult("op3", new Result.Detail(endpoint)), 0);
+ assertThat(q.getPendingSize(), is(0));
+ q.resultReceived(new EndpointResult("op1", new Result.Detail(endpoint)), 0);
+ assertThat(q.getPendingSize(), is(0));
+ q.resultReceived(new EndpointResult("abc", new Result.Detail(endpoint)), 0);
+ assertThat(q.getPendingSize(), is(0));
+
+ assertThat(resultCount.get(), is(5));
+
+ q.operationSent("op4");
+ assertThat(q.getPendingSize(), is(1));
+ q.operationSent("op5");
+ assertThat(q.getPendingSize(), is(2));
+
+ q.failPending(new RuntimeException());
+
+ assertThat(resultCount.get(), is(7));
+ }
+
+
+ @Test
+ public void testTimeout() throws InterruptedException {
+ Endpoint endpoint = Endpoint.create("a");
+
+ OperationProcessor mockAggregator = mock(OperationProcessor.class);
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ latch.countDown();
+ return null;
+ }).when(mockAggregator).resultReceived(anyObject(), eq(0));
+ EndpointResultQueue q = new EndpointResultQueue(
+ mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L);
+ q.operationSent("1234");
+ assert(latch.await(120, TimeUnit.SECONDS));
+ }
+
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottlerTest.java
new file mode 100644
index 00000000000..d1ef6c0d6af
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottlerTest.java
@@ -0,0 +1,56 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+
+
+public class GatewayThrottlerTest {
+
+ GatewayThrottler gatewayThrottler;
+ long lastSleepValue = 0;
+
+ @Before
+ public void before() {
+ gatewayThrottler = new GatewayThrottler(900) {
+ @Override
+ protected void sleepMs(long sleepTime) {
+ lastSleepValue = sleepTime;
+ }
+ };
+ }
+
+ @Test
+ public void noSleepOnNormalCase() {
+ gatewayThrottler.handleCall(0);
+ gatewayThrottler.handleCall(0);
+ assertThat(lastSleepValue, is(0L));
+ }
+
+ @Test
+ public void increastingSleepTime() {
+ gatewayThrottler.handleCall(1);
+ long sleepTime1 = lastSleepValue;
+ gatewayThrottler.handleCall(1);
+ long sleepTime2 = lastSleepValue;
+ assertTrue(sleepTime1 > 0);
+ assertTrue(sleepTime2 > sleepTime1);
+ int x;
+ // Check for max value of sleep time.
+ for (x = 0 ; x < 10000; x++) {
+ long prevSleepTime = lastSleepValue;
+ gatewayThrottler.handleCall(1);
+ if (prevSleepTime == lastSleepValue) break;
+ }
+ assertTrue(x < 5000);
+ // Check that it goes down back to zero when no errors.
+ for (x = 0 ; x < 10000; x++) {
+ gatewayThrottler.handleCall(0);
+ if (lastSleepValue == 0) break;
+ }
+ assertTrue(x < 5000);
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
new file mode 100644
index 00000000000..61965fa68b1
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -0,0 +1,126 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+import com.yahoo.vespa.http.client.V3HttpAPITest;
+import com.yahoo.vespa.http.client.core.Document;
+import com.yahoo.vespa.http.client.core.EndpointResult;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+public class IOThreadTest {
+ final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class);
+ final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class);
+ final String exceptionMessage = "SOME EXCEPTION FOO";
+ CountDownLatch latch = new CountDownLatch(1);
+ String docId1 = V3HttpAPITest.documents.get(0).getDocumentId();
+ Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(),
+ V3HttpAPITest.documents.get(0).getContents(), null /* context */);
+ String docId2 = V3HttpAPITest.documents.get(1).getDocumentId();
+ Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(),
+ V3HttpAPITest.documents.get(1).getContents(), null /* context */);
+ DocumentQueue documentQueue = new DocumentQueue(4);
+
+ /**
+ * Set up mock so that it can handle both failDocument() and resultReceived().
+ * @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail.
+ * @param expectedDocIdOk on ok, this has to be the doc id, or the mock will fail.
+ * @param isTransient checked on failure, if different, the mock will fail.
+ * @param expectedException checked on failure, if exception toString is different, the mock will fail.
+ */
+ void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk,boolean isTransient, String expectedException) {
+
+ doAnswer(invocation -> {
+ EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0];
+ assertThat(endpointResult.getOperationId(), is(expectedDocIdFail));
+ assertThat(endpointResult.getDetail().isSuccess(), is(false));
+ assertThat(endpointResult.getDetail().getException().toString(),
+ containsString(expectedException));
+ assertThat(endpointResult.getDetail().isTransient(), is(isTransient));
+ latch.countDown();
+ return null;
+ }).when(endpointResultQueue).failOperation(anyObject(), eq(0));
+
+ doAnswer(invocation -> {
+ EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0];
+ assertThat(endpointResult.getOperationId(), is(expectedDocIdOk));
+ assertThat(endpointResult.getDetail().isSuccess(), is(true));
+ assertThat(endpointResult.getDetail().isTransient(), is(isTransient));
+ latch.countDown();
+ return null;
+ }).when(endpointResultQueue).resultReceived(anyObject(), eq(0));
+ }
+
+ @Test
+ public void singleDocumentSuccess() throws Exception {
+ when(apacheGatewayConnection.connect()).thenReturn(true);
+ InputStream serverResponse = new ByteArrayInputStream(
+ (docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
+ when(apacheGatewayConnection.writeOperations(anyObject())).thenReturn(serverResponse);
+ setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage);
+ try (IOThread ioThread = new IOThread(
+ endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
+ ioThread.post(doc1);
+ assert (latch.await(120, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void requireThatSingleDocumentWriteErrorIsHandledProperly() throws Exception {
+ when(apacheGatewayConnection.connect()).thenReturn(true);
+ when(apacheGatewayConnection.writeOperations(anyObject())).thenThrow(new IOException(exceptionMessage));
+ setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage);
+ try (IOThread ioThread = new IOThread(
+ endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
+ ioThread.post(doc1);
+ assert (latch.await(120, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void requireThatTwoDocumentsFirstWriteErrorSecondOkIsHandledProperly() throws Exception {
+ when(apacheGatewayConnection.connect()).thenReturn(true);
+ InputStream serverResponse = new ByteArrayInputStream(
+ (docId2 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
+ when(apacheGatewayConnection.writeOperations(anyObject()))
+ .thenThrow(new IOException(exceptionMessage))
+ .thenReturn(serverResponse);
+ latch = new CountDownLatch(2);
+ setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage);
+
+ try (IOThread ioThread = new IOThread(
+ endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
+ ioThread.post(doc1);
+ ioThread.post(doc2);
+ assert (latch.await(120, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void testQueueTimeOutNoNoConnectionToServer() throws Exception {
+ when(apacheGatewayConnection.connect()).thenReturn(false);
+ InputStream serverResponse = new ByteArrayInputStream(
+ ("").getBytes(StandardCharsets.UTF_8));
+ when(apacheGatewayConnection.writeOperations(anyObject()))
+ .thenReturn(serverResponse);
+ setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true,
+ "java.lang.Exception: Not sending document operation, timed out in queue after");
+ try (IOThread ioThread = new IOThread(
+ endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
+ ioThread.post(doc1);
+ assert (latch.await(120, TimeUnit.SECONDS));
+ }
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlockerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlockerTest.java
new file mode 100644
index 00000000000..12d258b2478
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlockerTest.java
@@ -0,0 +1,63 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.operationProcessor;
+
+import com.yahoo.vespa.http.client.core.operationProcessor.ConcurrentDocumentOperationBlocker;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+public class ConcurrentDocumentOperationBlockerTest {
+
+ final ConcurrentDocumentOperationBlocker blocker = new ConcurrentDocumentOperationBlocker();
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ @Before
+ public void setup() throws InterruptedException {
+ blocker.setMaxConcurrency(2);
+ blocker.startOperation();
+ assertThat(blocker.availablePermits(), is(1));
+ blocker.startOperation();
+ }
+
+ private void spawnThreadPushOperationThenCountDown() {
+ new Thread(() -> {
+ try {
+ blocker.startOperation();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ latch.countDown();
+ }).start();
+ }
+
+ @Test
+ public void testBasics() throws InterruptedException {
+ spawnThreadPushOperationThenCountDown();
+ assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
+ blocker.operationDone();
+ assertTrue(latch.await(120, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testResizeLarger() throws InterruptedException {
+ spawnThreadPushOperationThenCountDown();
+ assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
+ blocker.setMaxConcurrency(3);
+ assertTrue(latch.await(120, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testResizeSmaller() throws InterruptedException {
+ spawnThreadPushOperationThenCountDown();
+ blocker.setMaxConcurrency(1);
+ blocker.operationDone();
+ assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
+ blocker.operationDone();
+ assertTrue(latch.await(120, TimeUnit.SECONDS));
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
new file mode 100644
index 00000000000..f63971262e0
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
@@ -0,0 +1,269 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.operationProcessor;
+
+import com.yahoo.collections.Tuple2;
+import com.yahoo.vespa.http.client.core.ThrottlePolicy;
+import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class IncompleteResultsThrottlerTest {
+ @Test
+ public void simpleStaticQueueSizeTest() {
+ IncompleteResultsThrottler incompleteResultsThrottler = new IncompleteResultsThrottler(2, 2, null, null);
+ assertThat(incompleteResultsThrottler.waitingThreads(), is(0));
+ incompleteResultsThrottler.operationStart();
+ incompleteResultsThrottler.operationStart();
+ assertThat(incompleteResultsThrottler.waitingThreads(), is(2));
+ incompleteResultsThrottler.resultReady(true);
+ assertThat(incompleteResultsThrottler.waitingThreads(), is(1));
+ incompleteResultsThrottler.resultReady(true);
+ assertThat(incompleteResultsThrottler.waitingThreads(), is(0));
+ }
+
+ /**
+ * A mock 'gateway' this is slower with more requests in-flight. It starts to become really much slower at
+ * 'breakPoint' number of parallel requests.
+ */
+ class MockServer {
+ final LinkedList<Tuple2<Long, IncompleteResultsThrottler> > messageDoneByTime = new LinkedList<>();
+ final int breakPoint;
+ final Random random = new Random();
+ long time = 0;
+
+ MockServer(int breakPoint) {
+ this.breakPoint = breakPoint;
+ }
+
+ /**
+ * Figures out when next processed data will be ready.
+ * @return time in ms for next request to be finished.
+ */
+ long nextRequestFinished() {
+ if (messageDoneByTime.isEmpty()) {
+ return Integer.MAX_VALUE;
+ }
+ return messageDoneByTime.peek().first;
+ }
+
+ /**
+ * Advance simulation time and call finished on any requests.
+ * @param time to move to
+ */
+ void moveTime(long time) {
+ this.time = time;
+ while (!messageDoneByTime.isEmpty() && messageDoneByTime.peek().first <= time) {
+ messageDoneByTime.pop().second.resultReady(true);
+ }
+ }
+
+ /**
+ * New request.
+ * @param blocker do callback on blocker when request is done.
+ */
+ void newRequest(IncompleteResultsThrottler blocker) {
+ long nextTime = (long)(20 + 0.1 * messageDoneByTime.size());
+
+ if (messageDoneByTime.size() > breakPoint) {
+ nextTime += (long) (40 + (random.nextDouble()) * 0.01 * messageDoneByTime.size()* messageDoneByTime.size());
+ }
+ nextTime += time + random.nextInt()%4;
+ messageDoneByTime.push(new Tuple2<>(nextTime, blocker));
+ }
+ }
+
+ /**
+ * Simulate running requests.
+ * @param clientCount number of parallel clients.
+ * @param breakPoint how many requests the server should handle in parallel before it gets slower.
+ * @param simulationTimeMs how many ms to simulate.
+ * @return median queue length.
+ */
+ int getAverageQueue(int clientCount, int breakPoint, int simulationTimeMs) {
+ final AtomicLong timeMs = new AtomicLong(0);
+
+ ArrayList<IncompleteResultsThrottler> incompleteResultsThrottlers = new ArrayList<>();
+
+ MockServer mockServer = new MockServer(breakPoint);
+ for (int x = 0; x < clientCount; x++) {
+ IncompleteResultsThrottler incompleteResultsThrottler =
+ new IncompleteResultsThrottler(10, 50000, () -> timeMs.get(), new ThrottlePolicy());
+ incompleteResultsThrottlers.add(incompleteResultsThrottler);
+ }
+ long sum = 0;
+ long samples = 0;
+
+ for (long time = 0; time < simulationTimeMs; time++) {
+ // Fast forward, if we can. If all clients are blocked, we can move to the time when the server has a
+ // request that is finished.
+ boolean fastForward = true;
+ for (int x = 0; x < clientCount; x++) {
+ if (incompleteResultsThrottlers.get(x).availableCapacity() > 0 ) {
+ fastForward = false;
+ break;
+ }
+ }
+ if (fastForward) {
+ time = mockServer.nextRequestFinished();
+ }
+ timeMs.set(time);
+ mockServer.moveTime(timeMs.get());
+ for (int y = 0; y < clientCount; y++) {
+ // Fill up, but don't block as that would stop the simulation.
+ while (incompleteResultsThrottlers.get(y).availableCapacity() > 0) {
+ incompleteResultsThrottlers.get(y).operationStart();
+ mockServer.newRequest(incompleteResultsThrottlers.get(y));
+ }
+ }
+ // Don't take the first iterations into account as the system is eagerly learning.
+ if (time > 60*1000) {
+ sum += mockServer.messageDoneByTime.size();
+ samples ++;
+ }
+ }
+ return (int)(sum/samples);
+ }
+
+ private void testAndPrintVariousClientSizes(int breakPoint) {
+ final int sampleRuns = 6;
+ final int maxParallelClients = 4;
+ final int minParallelClients = 1;
+ final int simulationTimeMs = 400000;
+ System.out.print("\nBreakpoint is " + breakPoint + ", average queue on server:");
+ int[][] resultQueuesAverage = new int[maxParallelClients][sampleRuns];
+ for (int clientNo = minParallelClients; clientNo <= maxParallelClients; clientNo++) {
+ System.out.print("\nNow with " + clientNo + " parallel clients:");
+ long sum = 0;
+ for (int x = 0; x < sampleRuns; x++) {
+ resultQueuesAverage[clientNo-minParallelClients][x] = getAverageQueue(1 + x, breakPoint, simulationTimeMs);
+ System.out.print(" " + resultQueuesAverage[clientNo-minParallelClients][x]);
+ sum += resultQueuesAverage[clientNo-minParallelClients][x];
+ }
+ System.out.print(" average is " + sum/sampleRuns);
+ Arrays.sort(resultQueuesAverage[clientNo - minParallelClients]);
+ int median = resultQueuesAverage[clientNo - minParallelClients][sampleRuns/2];
+ System.out.print(" median is " + median);
+ System.out.print(" min " + resultQueuesAverage[clientNo - minParallelClients][0]);
+ System.out.print(" max " + resultQueuesAverage[clientNo - minParallelClients][sampleRuns - 1]);
+ assertTrue(median < 2 * breakPoint + 200);
+ assertTrue(median > breakPoint / 10);
+ }
+ }
+
+ @Test
+ public void testVariousBreakpoints() {
+ testAndPrintVariousClientSizes(200);
+ testAndPrintVariousClientSizes(1000);
+ }
+
+ List<Thread> threads = new ArrayList<>();
+
+ private void postOperations(int count, final IncompleteResultsThrottler throttler) {
+ for (int i = 0; i < count; i++) {
+ Thread thread = new Thread(()->throttler.operationStart());
+ thread.start();
+ threads.add(thread);
+ }
+ }
+
+ private void waitForThreads() throws InterruptedException {
+ while(!threads.isEmpty()) {
+ threads.remove(0).join();
+ }
+ }
+
+ private void postSuccesses(int count, final IncompleteResultsThrottler throttler) {
+ for (int i = 0; i < count; i++) {
+ throttler.resultReady(true);
+ }
+ }
+
+ private void moveToNextCycle(final IncompleteResultsThrottler throttler, AtomicLong timeMs)
+ throws InterruptedException {
+ waitForThreads();
+ // Enter an adaption phase, we don't care about this phase.
+ timeMs.addAndGet(throttler.phaseSizeMs);
+ throttler.operationStart();
+ throttler.resultReady(false);
+ // Now enter the real next phase.
+ timeMs.addAndGet(throttler.phaseSizeMs);
+ throttler.operationStart();
+ throttler.resultReady(false);
+ }
+
+ @Test
+ public void testInteractionWithPolicyByMockingPolicy() throws InterruptedException {
+ final int MAX_SIZE = 1000;
+ final int MORE_THAN_MAX_SIZE = MAX_SIZE + 20;
+ final int SIZE_AFTER_CYCLE_FIRST = 30;
+ final int SIZE_AFTER_CYCLE_SECOND = 5000;
+ ThrottlePolicy policy = mock(ThrottlePolicy.class);
+ final AtomicLong timeMs = new AtomicLong(0);
+ IncompleteResultsThrottler incompleteResultsThrottler =
+ new IncompleteResultsThrottler(2, MAX_SIZE, ()->timeMs.get(), policy);
+ long bucketSizeMs = incompleteResultsThrottler.phaseSizeMs;
+
+ // Cycle 1 - Algorithm has fixed value for max-in-flight: INITIAL_MAX_IN_FLIGHT_VALUE.
+ // We post a few operations, not all finishing in this cycle. We explicitly do not fill the window
+ // size to test the argument about any requests blocked.
+ assertThat(incompleteResultsThrottler.availableCapacity(),
+ is(IncompleteResultsThrottler.INITIAL_MAX_IN_FLIGHT_VALUE));
+ postOperations(20, incompleteResultsThrottler);
+ postSuccesses(15, incompleteResultsThrottler);
+ moveToNextCycle(incompleteResultsThrottler, timeMs);
+
+
+ // Cycle 2 - Algorithm has fixed value also for second iteration: SECOND_MAX_IN_FLIGHT_VALUE.
+ // Test verifies that this value is used, and insert a value to be used for next phase SIZE_AFTER_CYCLE_FIRST.
+ assertThat(incompleteResultsThrottler.availableCapacity(),
+ is(IncompleteResultsThrottler.SECOND_MAX_IN_FLIGHT_VALUE - 5)); // 5 slots already taken earlier
+ postSuccesses(5, incompleteResultsThrottler);
+ when(policy.calcNewMaxInFlight(
+ anyDouble(), // Max performance change
+ eq(5), //numOk
+ eq(15), // previousNumOk
+ eq(IncompleteResultsThrottler.INITIAL_MAX_IN_FLIGHT_VALUE), // previous size
+ eq(IncompleteResultsThrottler.SECOND_MAX_IN_FLIGHT_VALUE), // current size
+ eq(false))) // is any request blocked, should be false since we only posted 20 docs.
+ .thenReturn(SIZE_AFTER_CYCLE_FIRST);
+ moveToNextCycle(incompleteResultsThrottler, timeMs);
+
+ // Cycle 3 - Test that value set in previous phase is used. Now return a very large number.
+ // However, this number should be cropped by the system (tested in next cycle).
+ assertThat(incompleteResultsThrottler.availableCapacity(),
+ is(SIZE_AFTER_CYCLE_FIRST));
+ postOperations(MORE_THAN_MAX_SIZE, incompleteResultsThrottler);
+ postSuccesses(MORE_THAN_MAX_SIZE, incompleteResultsThrottler);
+ when(policy.calcNewMaxInFlight(
+ anyDouble(), // Max performance change
+ eq(MORE_THAN_MAX_SIZE), //numOk
+ eq(5), // previousNumOk
+ eq(IncompleteResultsThrottler.SECOND_MAX_IN_FLIGHT_VALUE), // previous size
+ eq(SIZE_AFTER_CYCLE_FIRST),// current size
+ eq(true))) // is any request blocked, should be true since we posted MORE_THAN_MAX_SIZE docs.
+ .thenReturn(SIZE_AFTER_CYCLE_SECOND);
+ moveToNextCycle(incompleteResultsThrottler, timeMs);
+
+ // Cycle 4 - Test that the large number from previous cycle is cropped and that max value is used instead.
+ assertThat(incompleteResultsThrottler.availableCapacity(),
+ is(MAX_SIZE));
+ }
+
+ private long inversesU(int size, int sweetSpot) {
+ // Peak performance at sweetSPot.
+ int distance = Math.abs(sweetSpot - size);
+ return 1 + 20 * distance;
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
new file mode 100644
index 00000000000..c87385ec2ce
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
@@ -0,0 +1,370 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.operationProcessor;
+
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import com.yahoo.vespa.http.client.core.Document;
+import com.yahoo.vespa.http.client.core.EndpointResult;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.20
+ */
+public class OperationProcessorTest {
+
+ final Queue<Result> queue = new ArrayDeque<>();
+ final Document doc1 = new Document("doc:a:b", "data doc 1", null /* context */);
+ final Document doc1b = new Document("doc:a:b", "data doc 1b", null /* context */);
+ final Document doc2 = new Document("doc:a:b2", "data doc 2", null /* context */);
+ final Document doc3 = new Document("doc:a:b3", "data doc 3", null /* context */);
+
+ @Test
+ public void testBasic() {
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .build();
+
+ OperationProcessor q = new OperationProcessor(
+ new IncompleteResultsThrottler(1000, 1000, null, null),
+ (docId, documentResult) -> queue.add(documentResult),
+ sessionParams, null);
+
+
+ q.resultReceived(new EndpointResult("foo", new Result.Detail(null)), 0);
+ assertThat(queue.size(), is(0));
+
+
+ q.sendDocument(doc1);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("a"))), 0);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("b"))), 1);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("c"))), 2);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("d"))), 3);
+ assertThat(queue.size(), is(1));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("e"))), 0);
+ assertThat(queue.size(), is(1));
+
+ //check a, b, c, d
+ Result aggregated = queue.poll();
+ assertThat(aggregated.getDocumentId(), equalTo("doc:a:b"));
+ assertThat(aggregated.getDetails().size(), is(4));
+ assertThat(aggregated.getDetails().get(0).getEndpoint().getHostname(), equalTo("a"));
+ assertThat(aggregated.getDetails().get(1).getEndpoint().getHostname(), equalTo("b"));
+ assertThat(aggregated.getDetails().get(2).getEndpoint().getHostname(), equalTo("c"));
+ assertThat(aggregated.getDetails().get(3).getEndpoint().getHostname(), equalTo("d"));
+ assertThat(aggregated.getDocumentDataAsCharSequence().toString(), is("data doc 1"));
+
+ assertThat(queue.size(), is(0));
+
+
+ q.sendDocument(doc2);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(Endpoint.create("a"))), 0);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(Endpoint.create("b"))), 1);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(Endpoint.create("c"))), 2);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(Endpoint.create("d"))), 3);
+ assertThat(queue.size(), is(1));
+
+ q.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(Endpoint.create("e"))), 0);
+ assertThat(queue.size(), is(1));
+
+ //check a, b, c, d
+ aggregated = queue.poll();
+ assertThat(aggregated.getDocumentId(), equalTo("doc:a:b2"));
+ assertThat(aggregated.getDetails().size(), is(4));
+ assertThat(aggregated.getDetails().get(0).getEndpoint().getHostname(), equalTo("a"));
+ assertThat(aggregated.getDetails().get(1).getEndpoint().getHostname(), equalTo("b"));
+ assertThat(aggregated.getDetails().get(2).getEndpoint().getHostname(), equalTo("c"));
+ assertThat(aggregated.getDetails().get(3).getEndpoint().getHostname(), equalTo("d"));
+ assertThat(aggregated.getDocumentDataAsCharSequence().toString(), is("data doc 2"));
+
+ assertThat(queue.size(), is(0));
+ }
+
+ @Test
+ public void testBlockingOfOperationsTwoEndpoints() {
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .setConnectionParams(new ConnectionParams.Builder().setEnableV3Protocol(true).build())
+ .build();
+ OperationProcessor operationProcessor = new OperationProcessor(
+ new IncompleteResultsThrottler(1000, 1000, null, null),
+ (docId, documentResult) -> queue.add(documentResult),
+ sessionParams, null);
+
+ operationProcessor.sendDocument(doc1);
+ operationProcessor.sendDocument(doc1b);
+
+ assertThat(queue.size(), is(0));
+ // Only one operations should be in flight.
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
+ assertThat(queue.size(), is(0));
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 1);
+ assertThat(queue.size(), is(1));
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
+ assertThat(queue.size(), is(1));
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 1);
+ assertThat(queue.size(), is(2));
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(0));
+ // This should have no effect.
+ operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
+ operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
+ operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 1);
+ operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 1);
+ assertThat(queue.size(), is(2));
+ }
+
+ @Test
+ public void testBlockingOfOperationsToSameDocIdWithTwoOperations() {
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .setConnectionParams(new ConnectionParams.Builder().setEnableV3Protocol(true).build())
+ .build();
+
+ OperationProcessor operationProcessor = new OperationProcessor(
+ new IncompleteResultsThrottler(1000, 1000, null, null),
+ (docId, documentResult) -> queue.add(documentResult),
+ sessionParams, null);
+
+ operationProcessor.sendDocument(doc1);
+ operationProcessor.sendDocument(doc1b);
+
+ assertThat(queue.size(), is(0));
+ // Only one operations should be in flight.
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
+ assertThat(queue.size(), is(1));
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
+ assertThat(queue.size(), is(2));
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(0));
+ // This should have no effect.
+ operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
+ operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
+ assertThat(queue.size(), is(2));
+ }
+
+ @Test
+ public void testBlockingOfOperationsToSameDocIdMany() {
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .setConnectionParams(new ConnectionParams.Builder().setEnableV3Protocol(true).build())
+ .build();
+
+ OperationProcessor operationProcessor = new OperationProcessor(
+ new IncompleteResultsThrottler(1000, 1000, null, null),
+ (docId, documentResult) -> queue.add(documentResult),
+ sessionParams, null);
+
+ Queue<Document> documentQueue = new ArrayDeque<>();
+ for (int x = 0; x < 100; x++) {
+ Document document = new Document("doc:a:b", String.valueOf(x), null /* context */);
+ operationProcessor.sendDocument(document);
+ documentQueue.add(document);
+ }
+
+ for (int x = 0; x < 100; x++) {
+ assertThat(queue.size(), is(x));
+ // Only one operations should be in flight.
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ Document document = documentQueue.poll();
+ operationProcessor.resultReceived(new EndpointResult(document.getOperationId(), new Result.Detail(Endpoint.create("host"))), 0);
+ assertThat(queue.size(), is(x + 1));
+ if (x < 99) {
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ } else {
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(0));
+ }
+ }
+ }
+
+ @Test
+ public void testMixOfBlockingAndNonBlocking() {
+ Endpoint endpoint = Endpoint.create("host");
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(endpoint).build())
+ .setConnectionParams(new ConnectionParams.Builder().setEnableV3Protocol(true).build())
+ .build();
+
+ OperationProcessor operationProcessor = new OperationProcessor(
+ new IncompleteResultsThrottler(1000, 1000, null, null),
+ (docId, documentResult) -> queue.add(documentResult),
+ sessionParams, null);
+
+ operationProcessor.sendDocument(doc1);
+ operationProcessor.sendDocument(doc1b); // Blocked
+ operationProcessor.sendDocument(doc2);
+ operationProcessor.sendDocument(doc3);
+
+ assertThat(queue.size(), is(0));
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(3));
+ // This should have no effect since it should not be sent.
+ operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(endpoint)), 0);
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(3));
+
+ operationProcessor.resultReceived(new EndpointResult(doc3.getOperationId(), new Result.Detail(endpoint)), 0);
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(2));
+ operationProcessor.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(endpoint)), 0);
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ operationProcessor.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(endpoint)), 0);
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(1));
+ operationProcessor.resultReceived(new EndpointResult(doc1b.getOperationId(), new Result.Detail(endpoint)), 0);
+ assertThat(operationProcessor.getIncompleteResultQueueSize(), is(0));
+ }
+
+ @Test
+ public void assertThatDuplicateResultsFromOneClusterWorks() {
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .build();
+
+ OperationProcessor q = new OperationProcessor(
+ new IncompleteResultsThrottler(1000, 1000, null, null),
+ (docId, documentResult) -> queue.add(documentResult),
+ sessionParams, null);
+
+ q.sendDocument(doc1);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("a"))), 0);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("b"))), 0);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("c"))), 0);
+ assertThat(queue.size(), is(0));
+ }
+
+ @Test
+ public void testMultipleDuplicateDocIds() {
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .build();
+
+ OperationProcessor q = new OperationProcessor(
+ new IncompleteResultsThrottler(1000, 1000, null, null),
+ (docId, documentResult) -> queue.add(documentResult),
+ sessionParams, null);
+
+ q.sendDocument(doc1);
+ assertThat(queue.size(), is(0));
+ q.sendDocument(doc2);
+ assertThat(queue.size(), is(0));
+ q.sendDocument(doc3);
+
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("a"))), 0);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("a"))), 0);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("b"))), 1);
+ assertThat(queue.size(), is(0));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("c"))), 2);
+ assertThat(queue.size(), is(1));
+
+ q.resultReceived(new EndpointResult(doc3.getOperationId(), new Result.Detail(Endpoint.create("a"))), 0);
+ assertThat(queue.size(), is(1));
+
+ q.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(Endpoint.create("a"))), 0);
+ assertThat(queue.size(), is(1));
+
+ q.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(Endpoint.create("b"))), 1);
+ assertThat(queue.size(), is(1));
+
+ q.resultReceived(new EndpointResult(doc2.getOperationId(), new Result.Detail(Endpoint.create("c"))), 2);
+ assertThat(queue.size(), is(2));
+
+ q.resultReceived(new EndpointResult(doc3.getOperationId(), new Result.Detail(Endpoint.create("c"))), 2);
+ assertThat(queue.size(), is(2));
+
+ q.resultReceived(new EndpointResult(doc3.getOperationId(), new Result.Detail(Endpoint.create("c"))), 2);
+ assertThat(queue.size(), is(2));
+
+ q.resultReceived(new EndpointResult(doc3.getOperationId(), new Result.Detail(Endpoint.create("b"))), 1);
+ assertThat(queue.size(), is(3));
+
+ q.resultReceived(new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("b"))), 1);
+ assertThat(queue.size(), is(3));
+ assertThat(queue.remove().getDocumentDataAsCharSequence().toString(), is("data doc 1"));
+ assertThat(queue.remove().getDocumentDataAsCharSequence().toString(), is("data doc 2"));
+ assertThat(queue.remove().getDocumentDataAsCharSequence().toString(), is("data doc 3"));
+
+ }
+
+ @Test
+ public void testWaitBlocks() throws InterruptedException {
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("host")).build())
+ .build();
+
+ OperationProcessor operationProcessor = new OperationProcessor(
+ new IncompleteResultsThrottler(1, 1, null, null),
+ (docId, documentResult) -> {},
+ sessionParams, null);
+
+ operationProcessor.sendDocument(doc1);
+
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ Thread shouldWait = new Thread(()-> {
+ started.countDown();
+ operationProcessor.sendDocument(doc2);
+ done.countDown();
+ });
+ shouldWait.start();
+ started.await();
+ // We want the test to pass fast so we only wait 40mS to see that it is blocking. This might lead to
+ // some false positives, but that is ok.
+ assertThat(done.await(40, TimeUnit.MILLISECONDS), is(false));
+ operationProcessor.resultReceived(
+ new EndpointResult(doc1.getOperationId(), new Result.Detail(Endpoint.create("d"))), 0);
+ assertThat(done.await(120, TimeUnit.SECONDS), is(true));
+
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/handlers/V3MockParsingRequestHandler.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/handlers/V3MockParsingRequestHandler.java
new file mode 100644
index 00000000000..c43feef088b
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/handlers/V3MockParsingRequestHandler.java
@@ -0,0 +1,391 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.handlers;
+
+import com.yahoo.vespa.http.client.core.Encoder;
+import com.yahoo.vespa.http.client.core.ErrorCode;
+import com.yahoo.vespa.http.client.core.Headers;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.27
+ */
+public class V3MockParsingRequestHandler extends AbstractHandler {
+ private final int responseCode;
+ private volatile Scenario scenario;
+ private final BlockingQueue<CountDownLatch> delayedRequests = new LinkedBlockingQueue<>();
+ private final AtomicBoolean delayedResponseShouldBlock = new AtomicBoolean(true);
+ public final AtomicBoolean badRequestScenarioShouldReturnBadRequest = new AtomicBoolean(false);
+ private final String name;
+ private static final AtomicInteger sessionIdGenerator = new AtomicInteger(0);
+ private AtomicInteger internalCounter = new AtomicInteger(0);
+
+ public enum Scenario {
+ ALL_OK, RETURN_WRONG_SESSION_ID,
+ DISCONNECT_IMMEDIATELY, DONT_ACCEPT_VERSION, RETURN_UNEXPECTED_VERSION,
+ INTERNAL_SERVER_ERROR, COULD_NOT_FEED, MBUS_RETURNED_ERROR,
+ NEVER_RETURN_ANY_RESULTS, DELAYED_RESPONSE, BAD_REQUEST, SERVER_ERROR_TWICE_THEN_OK,
+ EXPECT_HIGHEST_PRIORITY_AND_TRACELEVEL_123
+ }
+
+ public V3MockParsingRequestHandler() {
+ this("", HttpServletResponse.SC_OK, Scenario.ALL_OK);
+ }
+
+ public V3MockParsingRequestHandler(String name) {
+ this(name, HttpServletResponse.SC_OK, Scenario.ALL_OK);
+ }
+
+ public V3MockParsingRequestHandler(int responseCode) {
+ this("", responseCode, Scenario.ALL_OK);
+ }
+
+ public V3MockParsingRequestHandler(int responseCode, Scenario scenario) {
+ this("", responseCode, scenario);
+ }
+
+ public V3MockParsingRequestHandler(String name, int responseCode, Scenario scenario) {
+ this.name = name;
+ this.responseCode = responseCode;
+ this.scenario = scenario;
+ }
+
+ public void setScenario(Scenario scenario) {
+ this.scenario = scenario;
+ }
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ System.err.println("Server " + name + " got request from: " + request.getHeader(Headers.SESSION_ID));
+ switch (scenario) {
+ case ALL_OK:
+ allOk(baseRequest, request, response);
+ break;
+ case RETURN_WRONG_SESSION_ID:
+ wrongSessionId(baseRequest, request, response);
+ break;
+ case DISCONNECT_IMMEDIATELY:
+ disconnect(baseRequest, response);
+ break;
+ case DONT_ACCEPT_VERSION:
+ dontAcceptVersion(baseRequest, request, response);
+ break;
+ case RETURN_UNEXPECTED_VERSION:
+ unexpectedVersion(baseRequest, request, response);
+ break;
+ case INTERNAL_SERVER_ERROR:
+ internalServerError(baseRequest, request, response);
+ break;
+ case COULD_NOT_FEED:
+ couldNotFeed(baseRequest, request, response);
+ break;
+ case MBUS_RETURNED_ERROR:
+ mbusReturnedError(baseRequest, request, response);
+ break;
+ case NEVER_RETURN_ANY_RESULTS:
+ neverReturnAnyResults(baseRequest, request, response);
+ break;
+ case DELAYED_RESPONSE:
+ delayedResponse(baseRequest, request, response);
+ break;
+ case BAD_REQUEST:
+ badRequest(baseRequest, request, response);
+ break;
+ case SERVER_ERROR_TWICE_THEN_OK:
+ int state = internalCounter.getAndIncrement();
+ if (state >= 2) {
+ allOk(baseRequest, request, response);
+ } else {
+ couldNotFeed(baseRequest, request, response);
+ }
+ break;
+ case EXPECT_HIGHEST_PRIORITY_AND_TRACELEVEL_123:
+ checkIfSessionThenHighPriorityAndTraceLevel123(request);
+ allOk(baseRequest, request, response);
+ break;
+ default:
+ throw new IllegalArgumentException("Test scenario " + scenario + " not supported.");
+ }
+ }
+
+ private void checkIfSessionThenHighPriorityAndTraceLevel123(HttpServletRequest request) {
+ if (request.getHeader(Headers.SESSION_ID) != null) {
+ assert (request.getHeader(Headers.PRIORITY).equals("HIGHEST"));
+ assert (request.getHeader(Headers.TRACE_LEVEL).equals("123"));
+ }
+ }
+
+ private void badRequest(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ if (badRequestScenarioShouldReturnBadRequest.get()) {
+ response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(request.getInputStream()));
+ while (reader.readLine() != null) {
+ //consume input, not really needed?
+ }
+ reader.close();
+ closeChannel(responseWriter);
+ } else {
+ allOk(baseRequest, request, response);
+ }
+ }
+
+ private void delayedResponse(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ if (delayedResponseShouldBlock.get()) {
+ CountDownLatch latch = new CountDownLatch(1);
+ delayedRequests.add(latch);
+ try {
+ latch.await(120, TimeUnit.SECONDS); //wait "forever"
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ if (latch.getCount() != 0L) {
+ throw new RuntimeException("Delayed request handler did not get poke()d.");
+ }
+ } else {
+ }
+ allOk(baseRequest, request, response);
+ }
+
+ public void poke() throws InterruptedException {
+ CountDownLatch latch = delayedRequests.poll(10, TimeUnit.SECONDS);
+ latch.countDown();
+ }
+
+ public void pokeAllAndUnblockFromNowOn() {
+ delayedResponseShouldBlock.set(false);
+ while (!delayedRequests.isEmpty()) {
+ CountDownLatch latch = delayedRequests.remove();
+ latch.countDown();
+ }
+ }
+
+ private void allOk(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String sessionId = getSessionId(request);
+ setHeaders(response, sessionId);
+ response.setStatus(responseCode);
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ String operationId;
+ while ((operationId = readOperationId(request.getInputStream())) != null) {
+ long lengthToSkip = readByteLength(request.getInputStream());
+ while (lengthToSkip > 0) {
+ long skipped = request.getInputStream().skip(lengthToSkip);
+ lengthToSkip -= skipped;
+ }
+ respondOK(responseWriter, operationId);
+ }
+ closeChannel(responseWriter);
+ }
+
+ private void wrongSessionId(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String sessionId = generateMockSessionId();
+ setHeaders(response, sessionId);
+ response.setStatus(responseCode);
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ String operationId;
+ while ((operationId = readOperationId(request.getInputStream())) != null) {
+ long lengthToSkip = readByteLength(request.getInputStream());
+ while (lengthToSkip > 0) {
+ long skipped = request.getInputStream().skip(lengthToSkip);
+ lengthToSkip -= skipped;
+ }
+ respondOK(responseWriter, operationId);
+ }
+ closeChannel(responseWriter);
+ }
+
+ private void disconnect(Request baseRequest, HttpServletResponse response) throws IOException {
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ closeChannel(responseWriter);
+ }
+
+ private void dontAcceptVersion(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String sessionId = getSessionId(request);
+ setHeaders(response, sessionId);
+ response.setStatus(Headers.HTTP_NOT_ACCEPTABLE);
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ responseWriter.write("Go away, no such version.");
+ responseWriter.flush();
+ closeChannel(responseWriter);
+ }
+
+ private void unexpectedVersion(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String sessionId = getSessionId(request);
+ response.setHeader(Headers.SESSION_ID, sessionId);
+ response.setHeader(Headers.VERSION, "12345678");
+ response.setStatus(responseCode);
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ String operationId;
+ while ((operationId = readOperationId(request.getInputStream())) != null) {
+ long lengthToSkip = readByteLength(request.getInputStream());
+ while (lengthToSkip > 0) {
+ long skipped = request.getInputStream().skip(lengthToSkip);
+ lengthToSkip -= skipped;
+ }
+ respondOK(responseWriter, operationId);
+ }
+ closeChannel(responseWriter);
+ }
+
+ private void internalServerError(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String sessionId = getSessionId(request);
+ setHeaders(response, sessionId);
+ response.setStatus(500);
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ responseWriter.write("boom");
+ responseWriter.flush();
+ closeChannel(responseWriter);
+ }
+
+ private void couldNotFeed(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String sessionId = getSessionId(request);
+ setHeaders(response, sessionId);
+ response.setStatus(responseCode);
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ String operationId;
+ while ((operationId = readOperationId(request.getInputStream())) != null) {
+ long lengthToSkip = readByteLength(request.getInputStream());
+ while (lengthToSkip > 0) {
+ long skipped = request.getInputStream().skip(lengthToSkip);
+ lengthToSkip -= skipped;
+ }
+ respondTransientFailed(responseWriter, operationId);
+ }
+ closeChannel(responseWriter);
+ }
+
+ private void mbusReturnedError(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String sessionId = getSessionId(request);
+ setHeaders(response, sessionId);
+ response.setStatus(responseCode);
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ String operationId;
+ while ((operationId = readOperationId(request.getInputStream())) != null) {
+ long lengthToSkip = readByteLength(request.getInputStream());
+ while (lengthToSkip > 0) {
+ long skipped = request.getInputStream().skip(lengthToSkip);
+ lengthToSkip -= skipped;
+ }
+ respondFailedWithTransitiveErrorSeenFromClient(responseWriter, operationId);
+ }
+ closeChannel(responseWriter);
+ }
+
+ private void neverReturnAnyResults(Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String sessionId = getSessionId(request);
+ setHeaders(response, sessionId);
+ response.setStatus(responseCode);
+ baseRequest.setHandled(true);
+ PrintWriter responseWriter = response.getWriter();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(request.getInputStream()));
+ while (reader.readLine() != null) {
+ //consume input, not really needed?
+ }
+ reader.close();
+ closeChannel(responseWriter);
+ }
+
+ void closeChannel(PrintWriter responseWriter) {
+ System.err.println("Mock server " + name + " closing channel.");
+ responseWriter.close();
+ }
+
+ private String readOperationId(InputStream requestInputStream) throws IOException {
+ StringBuilder idBuf = new StringBuilder(100);
+ int c;
+ while ((c = requestInputStream.read()) != -1) {
+ if (c == 32) {
+ break;
+ }
+ idBuf.append((char) c); //it's ASCII
+ }
+ if (c == -1) {
+ return null;
+ }
+ return Encoder.decode(idBuf.toString(), new StringBuilder(idBuf.length())).toString();
+ }
+
+ private int readByteLength(InputStream requestInputStream) throws IOException {
+ StringBuilder lenBuf = new StringBuilder(8);
+ int c;
+ while ((c = requestInputStream.read()) != -1) {
+ if (c == 10) {
+ break;
+ }
+ lenBuf.append((char) c); //it's ASCII
+ }
+ if (lenBuf.length() == 0) {
+ throw new IllegalStateException("Operation length missing.");
+ }
+ return Integer.valueOf(lenBuf.toString(), 16);
+ }
+
+ private static void setHeaders(HttpServletResponse response, String sessionId) {
+ response.setHeader(Headers.SESSION_ID, sessionId);
+ response.setHeader(Headers.VERSION, "3");
+ }
+
+ private void respondFailed(PrintWriter responseWriter, String docId) {
+ final OperationStatus operationStatus =
+ new OperationStatus("mbus returned boom", docId, ErrorCode.ERROR, "trace");
+ writeResponse(responseWriter, operationStatus);
+ }
+
+ private void respondTransientFailed(PrintWriter responseWriter, String docId) {
+ final OperationStatus operationStatus = new OperationStatus(
+ "Could not put", docId, ErrorCode.TRANSIENT_ERROR, "");
+ writeResponse(responseWriter, operationStatus);
+ }
+
+ private void respondFailedWithTransitiveErrorSeenFromClient(PrintWriter responseWriter, String docId) {
+ final OperationStatus operationStatus =
+ new OperationStatus("NETWORK_ERROR", docId, ErrorCode.ERROR, "trace");
+ writeResponse(responseWriter, operationStatus);
+ }
+
+ private void respondOK(PrintWriter responseWriter, String docId) {
+ final OperationStatus operationStatus = new OperationStatus("Doc fed", docId, ErrorCode.OK, "Trace message");
+ writeResponse(responseWriter, operationStatus);
+ }
+
+ private void writeResponse(PrintWriter responseWriter,
+ final OperationStatus operationStatus) {
+ responseWriter.print(operationStatus.render());
+ responseWriter.flush();
+ System.err.println("Mock " + name + " server wrote: " + operationStatus.render());
+ }
+
+ private String getSessionId(HttpServletRequest request) {
+ return request.getHeader(Headers.CLIENT_ID);
+ }
+
+ private String generateMockSessionId() {
+ return String.valueOf(sessionIdGenerator.getAndIncrement());
+ }
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/CommandLineArgumentsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/CommandLineArgumentsTest.java
new file mode 100644
index 00000000000..fef1249e1a0
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/CommandLineArgumentsTest.java
@@ -0,0 +1,144 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.runner;
+
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.*;
+
+public class CommandLineArgumentsTest {
+
+ private String[] asArray() {
+ String[] array = new String[args.size()];
+ args.toArray(array);
+ return array;
+ }
+
+ private void add(String key, String value) {
+ args.add("--" + key);
+ args.add(value);
+ }
+
+ private void addMinimum() {
+ add("host", "hostValue");
+ add("file", "fileValue");
+ }
+ private ArrayList<String> args = new ArrayList<>();
+
+ @Test
+ public void testRequiredFlags() {
+ assertThat(CommandLineArguments.build(asArray()), is(nullValue()));
+ add("host", "hostValue");
+ assertThat(CommandLineArguments.build(asArray()), is(nullValue()));
+ args.clear();
+ add("file", "fileValue");
+ assertThat(CommandLineArguments.build(asArray()), is(nullValue()));
+ args.clear();
+ addMinimum();
+ assertThat(CommandLineArguments.build(asArray()), is(not(nullValue())));
+ }
+
+ @Test
+ public void testStreaming() {
+ add("host", "hostValue");
+ add("file", null); // Not yet implemented support for streaming
+ assertThat(CommandLineArguments.build(asArray()), is(nullValue()));
+ }
+
+ @Test
+ public void testBrokenFlags() {
+ addMinimum();
+ args.add("FOO");
+ assertThat(CommandLineArguments.build(asArray()), is(nullValue()));
+ }
+
+ @Test
+ public void testBadPriority() {
+ addMinimum();
+ add("priority", "non existing");
+ assertThat(CommandLineArguments.build(asArray()), is(nullValue()));
+ }
+
+ @Test
+ public void testOkPriority() {
+ addMinimum();
+ add("priority", "HIGHEST");
+ assertThat(CommandLineArguments.build(asArray()).createSessionParams(false).getFeedParams().getPriority(),
+ is("HIGHEST"));
+ }
+
+ @Test
+ public void testDefaults() {
+ addMinimum();
+ CommandLineArguments arguments = CommandLineArguments.build(asArray());
+ SessionParams params = arguments.createSessionParams(false /* use json */);
+ assertThat(params.getClientQueueSize(), is(10000));
+ assertThat(params.getThrottlerMinSize(), is(0));
+ assertThat(params.getClusters().size(), is(1));
+ assertThat(params.getClusters().get(0).getEndpoints().size(), is(1));
+ assertThat(params.getClusters().get(0).getEndpoints().get(0).getHostname(), is("hostValue"));
+ assertThat(params.getClusters().get(0).getEndpoints().get(0).getPort(), is(4080));
+ assertThat(params.getConnectionParams().getUseCompression(), is(false));
+ assertThat(params.getConnectionParams().getNumPersistentConnectionsPerEndpoint(), is(16));
+ assertThat(params.getFeedParams().getRoute(), is("default"));
+ assertThat(params.getFeedParams().getDataFormat(), is(FeedParams.DataFormat.XML_UTF8));
+ assertThat(params.getFeedParams().getLocalQueueTimeOut(), is(180000L));
+ assertThat(params.getFeedParams().getMaxInFlightRequests(), is(10000));
+ assertThat(params.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS), is(180000L));
+ }
+
+ @Test
+ public void testAllImplementedFlags() {
+ add("file", "fileValue.json");
+ add("route", "routeValue");
+ add("host", "hostValue");
+ add("port", "1234");
+ add("timeout", "2345");
+ args.add("--useCompression");
+ args.add("--useDynamicThrottling");
+ add("maxpending", "3456");
+ add("debugport", "7890");
+ args.add("--verbose");
+ CommandLineArguments arguments = CommandLineArguments.build(asArray());
+ SessionParams params = arguments.createSessionParams(true /* use json */);
+ assertThat(params.getClientQueueSize(), is(3456));
+ assertThat(params.getThrottlerMinSize(), is(10));
+ assertThat(params.getClusters().get(0).getEndpoints().get(0).getPort(), is(1234));
+ assertThat(params.getConnectionParams().getUseCompression(), is(true));
+ assertThat(params.getFeedParams().getRoute(), is("routeValue"));
+ assertThat(params.getFeedParams().getDataFormat(), is(FeedParams.DataFormat.JSON_UTF8));
+ assertThat(params.getFeedParams().getLocalQueueTimeOut(), is(2345000L));
+ assertThat(params.getFeedParams().getMaxInFlightRequests(), is(3456));
+ assertThat(params.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS), is(2345000L));
+ }
+
+ @Test
+ public void testMultiHost() {
+ add("file", "fileValue.json");
+ add("port", "1234");
+ add("host", "hostValue1,hostValue2, hostValue3");
+ CommandLineArguments arguments = CommandLineArguments.build(asArray());
+ SessionParams params = arguments.createSessionParams(true /* use json */);
+ assertThat(params.getClusters().size(), is(3));
+ final Set<String> hosts = new HashSet<>();
+ for (Cluster cluster : params.getClusters()) {
+ assertThat(cluster.getEndpoints().size(), is(1));
+ hosts.add(cluster.getEndpoints().get(0).getHostname());
+ assertThat(cluster.getEndpoints().get(0).getPort(), is(1234));
+ }
+ assertThat(hosts, hasItem("hostValue1"));
+ assertThat(hosts, hasItem("hostValue2"));
+ assertThat(hosts, hasItem("hostValue3"));
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/JsonReaderTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/JsonReaderTest.java
new file mode 100644
index 00000000000..ab7dca0d5fb
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/JsonReaderTest.java
@@ -0,0 +1,272 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.runner;
+
+import com.google.common.base.Joiner;
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.core.JsonReader;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+public class JsonReaderTest {
+
+ private static String doc1_id = "id:unittest:testMapStringToArrayOfInt::whee";
+
+ private static String doc1 = inputJson(
+ "{",
+ " 'update': '"+ doc1_id + "',",
+ " 'fields': {",
+ " 'actualMapStringToArrayOfInt': {",
+ " 'assign': [",
+ " { 'key': 'bamse', 'value': [ 2, 1, 3] }",
+ " ]",
+ " }",
+ " }",
+ "}");
+
+ private static String doc2_id = "id:unittest:smoke::whee";
+
+ private static String doc2 = inputJson(
+ "{",
+ " 'put': '" + doc2_id + "',",
+ " 'fields': {",
+ " 'something': 'smoketest',",
+ " 'nalle': 'bamse'",
+ " }",
+ "}");
+
+ private static String doc3 = inputJson(
+ "{",
+ " 'update': 'id:unittest:testarray::whee',",
+ " 'fields': {",
+ " 'actualarray': {",
+ " 'add': [",
+ " 'person naïve',",
+ " 'another person'",
+ " ]",
+ " }",
+ " }",
+ "}");
+
+ private static String doc4 = inputJson(
+ "{",
+ " 'remove': '" + doc2_id + "'",
+ "}");
+
+ private static String doc5_id = "id:unittest:smoking::wheels";
+
+ private static String doc5 = inputJson(
+ "{",
+ " 'id': '" + doc5_id + "',",
+ " 'fields': {",
+ " 'something': 'smoketest',",
+ " 'nalle': 'bamse'",
+ " }",
+ "}");
+
+ private static class TestFeedClient implements FeedClient {
+
+ public List<String> documentIds = new ArrayList<>();
+ public List<CharSequence> datas = new ArrayList<>();
+ public List<Object> contexts = new ArrayList<>();
+
+ @Override
+ public void stream(String documentId, CharSequence documentData) {
+ stream(documentId, documentData, null);
+ }
+
+ @Override
+ public void stream(String documentId, CharSequence documentData, Object context) {
+ documentIds.add(documentId.toString());
+ datas.add(documentData);
+ contexts.add(context);
+ }
+
+ @Override
+ public void close() { }
+
+ @Override
+ public String getStatsAsJson() { return null; }
+ }
+
+ final TestFeedClient session = new TestFeedClient();
+ final AtomicInteger numSent = new AtomicInteger(0);
+
+ @Test
+ public void testReadNoocument() throws Exception {
+ InputStream inputStream = new ByteArrayInputStream(
+ (" " ).getBytes(StandardCharsets.UTF_8));
+ JsonReader.read(inputStream, session, numSent);
+ inputStream.close();
+ assertThat(session.documentIds.size(), is(0));
+ }
+
+ @Test
+ public void testReadOneDocument() throws Exception {
+ InputStream inputStream = new ByteArrayInputStream(
+ ("["+ doc1 + "]" ).getBytes(StandardCharsets.UTF_8));
+ JsonReader.read(inputStream, session, numSent);
+ inputStream.close();
+ assertThat(session.documentIds.size(), is(1));
+ assertThat(session.documentIds.get(0), is(doc1_id));
+ assertThat(session.datas.size(), is(1));
+ assertThat(session.datas.get(0), is(doc1));
+ }
+
+ @Test
+ public void testReadFourDocuments() throws Exception {
+ InputStream inputStream = new ByteArrayInputStream(
+ (" [ "+ doc1 + " , " + doc2 + ", " + doc3 + "," + doc4 + " ] ").getBytes(StandardCharsets.UTF_8));
+ JsonReader.read(inputStream, session, numSent);
+ inputStream.close();
+ assertThat(session.documentIds.size(), is(4));
+ assertThat(session.documentIds.get(0), is(doc1_id));
+ assertThat(session.documentIds.get(1), is(doc2_id));
+ assertThat(session.datas.size(), is(4));
+ assertThat(session.datas.get(0), is(doc1));
+ assertThat(session.datas.get(1).toString(), is(doc2));
+ assertThat(session.datas.get(2).toString(), is(doc3));
+ assertThat(session.datas.get(3).toString(), is(doc4));
+ }
+
+ @Test
+ public void testDocWithIdAndNotPut() throws Exception {
+ InputStream inputStream = new ByteArrayInputStream(
+ (" [ "+ doc5 + " ] ").getBytes(StandardCharsets.UTF_8));
+ JsonReader.read(inputStream, session, numSent);
+ inputStream.close();
+ assertThat(session.documentIds.size(), is(1));
+ assertThat(session.documentIds.get(0), is(doc5_id));
+ }
+
+ @Test
+ public void simpleMicroBenchmarkTest() throws Exception {
+ StringBuilder stream = new StringBuilder();
+ stream.append("[");
+ int docsInStream = 15000;
+ for (int x = 0; x < docsInStream -1; x++) {
+ if (x % 10 == 0) {
+ stream.append(doc1 + ", ");
+ } else {
+ // Add some randomness to the layout to trigger potential bugs in parsing.
+ stream.append("{\"remove\": \"id:unittest:smoke::whee");
+ for (int y = 0 ; y < x % 277 ; y++) {
+ stream.append("X");
+ }
+ stream.append("\"}, ");
+ }
+ }
+ stream.append(doc3);
+ stream.append("]");
+
+ InputStream inputStream = new ByteArrayInputStream(stream.toString().getBytes(StandardCharsets.UTF_8));
+ long startTime = System.currentTimeMillis();
+ JsonReader.read(inputStream, session, numSent);
+ // At time of writing, it took about 200 ms on my mac.
+ System.err.println("Run time is " + (System.currentTimeMillis() - startTime) + " ms");
+ inputStream.close();
+
+ // Verify that content is not rubbish.
+ for (int x = 0; x < docsInStream - 1; x++) {
+ if (x % 10 == 0) {
+ assertThat(session.datas.get(x).toString(), is(doc1));
+ assertThat(session.documentIds.get(x), is(doc1_id));
+ }
+ }
+ assertThat(session.datas.get(docsInStream-1).toString(), is(doc3));
+ assertThat(numSent.get(), is(docsInStream));
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testBadJsonCommaAfterLastElement() throws Exception {
+ InputStream inputStream = new ByteArrayInputStream(
+ ("["+ doc1 + ",]" ).getBytes(StandardCharsets.UTF_8));
+ JsonReader.read(inputStream, session, numSent);
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testTotalGarbage() throws Exception {
+ InputStream inputStream = new ByteArrayInputStream(("garbage" ).getBytes(StandardCharsets.UTF_8));
+ JsonReader.read(inputStream, session, numSent);
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testTwoDocIds() throws Exception {
+ InputStream inputStream = new ByteArrayInputStream(("[{\"remove\": \"id\", \"update\": \"id:\"}]"
+ .getBytes(StandardCharsets.UTF_8)));
+ JsonReader.read(inputStream, session, numSent);
+ }
+
+ @Test
+ public void testFullDocument() throws Exception {
+ InputStream inputStream = new ByteArrayInputStream((
+ "[{\n" +
+ " \"update\": \"id:foo:music:doc:foo:bar\",\n" +
+ " \n" +
+ " \"fields\": {\n" +
+ " \"artist\": {\n" +
+ " \"assign\": null" +
+ " },\n" +
+ " \n" +
+ " \"albums\": {\n" +
+ " \"assign\": [\n" +
+ " \"Kramgoda laatar 4\",\n" +
+ " \"Kramgoda laatar 5\",\n" +
+ " \"Kramgoda laatar 6\"\n" +
+ " ],\n" +
+ " \"add\": [\n" +
+ " \"Kramgoda laatar 7\",\n" +
+ " \"Kramgoda laatar 8\"\n" +
+ " ]\n" +
+ " },\n" +
+ " \"inceptionYear\": {\n" +
+ " \"increment\": 4\n" +
+ " },\n" +
+ " \"concerts\": {\n" +
+ " \"assign\": {\n" +
+ " \"Torsby 1993\": 1000,\n" +
+ " \"Uddevalla 2000\": 34\n" +
+ " },\n" +
+ " \"match\": {\n" +
+ " \"element\": \"Sundsvall 1980\",\n" +
+ " \"increment\": 5392\n" +
+ " },\n" +
+ " \"add\": {\n" +
+ " \"Kiruna 1999\": 200,\n" +
+ " \"Oslo 1998\": 2000\n" +
+ " }\n" +
+ " },\n" +
+ " \"scores\": {\n" +
+ " \"match\": {\n" +
+ " \"element\": \"Sven Ingvars\",\n" +
+ " \"match\": {\n" +
+ " \"element\": 0,\n" +
+ " \"increment\": 78\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ "}]\n").getBytes(StandardCharsets.UTF_8));
+ JsonReader.read(inputStream, session, numSent);
+ inputStream.close();
+ assertThat(session.documentIds.size(), is(1));
+ assertThat(session.documentIds.get(0), is("id:foo:music:doc:foo:bar"));
+ }
+
+ /**
+ * Convenience method to input JSON without escaping double quotes and newlines
+ * Each parameter represents a line of JSON encoded data
+ * The lines are joined with newline and single quotes are replaced with double quotes
+ */
+ static String inputJson(String... lines) {
+ return Joiner.on("\n").join(lines).replaceAll("'", "\"");
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/RunnerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/RunnerTest.java
new file mode 100644
index 00000000000..3d8edfbb141
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/RunnerTest.java
@@ -0,0 +1,35 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.runner;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+
+
+public class RunnerTest {
+
+ @Test
+ public void testAddFeedTag() throws IOException {
+ InputStream stream = new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8));
+ InputStream streamProcessed = Runner.addVespafeedTag(stream);
+ assertThat(convertStreamToString(streamProcessed), is("<vespafeed>foo</vespafeed>"));
+ }
+
+ private static String convertStreamToString(java.io.InputStream inputStream) throws IOException {
+ StringBuilder builder = new StringBuilder();
+ while (true) {
+ int character = inputStream.read();
+ if (character == -1) {
+ inputStream.close();
+ return builder.toString();
+ }
+ builder.append((char)character);
+ }
+ }
+} \ No newline at end of file
diff --git a/vespa-http-client/src/test/resources/vespacorpfeed-prod-sample.xml b/vespa-http-client/src/test/resources/vespacorpfeed-prod-sample.xml
new file mode 100644
index 00000000000..d67c6df1c53
--- /dev/null
+++ b/vespa-http-client/src/test/resources/vespacorpfeed-prod-sample.xml
@@ -0,0 +1,187 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<vespafeed>
+<document documenttype="vespacorp" documentid="id:vespacorp:vespacorp::7138bc793a096a78b86a6501ae0c6e7b">
+ <threadId>f5078d76c7541ab15387ab62fef22a01</threadId>
+ <contentsHash>186e631cb3b09ac33e5c124a79e20915</contentsHash>
+ <authors>
+ <item>bratseth</item>
+ </authors>
+ <urlPath>/vespa-users/msg/4498622c3f131e1067c4e7bc18ac96db</urlPath>
+ <contents>This was done to make it simpler for people to get started with jars loaded through OSGi on the assumption that most wouldn't need to import anything I'm not sure it actually gives a net saving though for the reason you point out We'll probably make the existence of global packages optional somehow later Jon Den 21 mai 2011 kl 03.21 skrev LG Just out of curiosity why motivated the choice of exposing them as global packages instead of exported packages I had personally a hard time figuring out which packages I had to import and to not import javax com.yahoo.vespa etc and I'm wondering if it means I need to repackage some 3rdparty component if they decide to import some of these global packages like javax.foo lg geoinformatics software engineer mail@host.com direct y!im 701 first avenue sunnyvale ca 94089-0703 us phone fax Le 5/10/11 12:10 AM Jon S Bratseth a écrit The packages in the Vespa public api is always available global That's the ones included here http://vespa/javadoc/5.0/all In addition the packages belonging to the Java SE API javax etc are global The exported list is not quite as well defined though You can always just try to import when some package is not in the global category above if it cannot be resolved then we do not export it Or check the source as Francois mentions </contents>
+ <parentId>8bce4afebf9dca1aab84651db62d4269</parentId>
+ <title>How to load JSON library in vespa 5.0.8</title>
+ <my_contents>This was done to make it simpler for people to get started with jars loaded through OSGi on the assumption that most wouldn't need to import anything I'm not sure it actually gives a net saving though for the reason you point out We'll probably make the existence of global packages optional somehow later Jon Den 21 mai 2011 kl 03.21 skrev LG Just out of curiosity why motivated the choice of exposing them as global packages instead of exported packages I had personally a hard time figuring out which packages I had to import and to not import javax com.yahoo.vespa etc and I'm wondering if it means I need to repackage some 3rdparty component if they decide to import some of these global packages like javax.foo laurent goujon geoinformatics software engineer mail@host.com direct y!im 701 first avenue sunnyvale ca 94089-0703 us phone fax Le 5/10/11 12:10 AM Jon S Bratseth a écrit The packages in the Vespa public api is always available global That's the ones included here http://vespa/javadoc/5.0/all In addition the packages belonging to the Java SE API javax etc are global The exported list is not quite as well defined though You can always just try to import when some package is not in the global category above if it cannot be resolved then we do not export it Or check the source as Francois mentions </my_contents>
+ <lastUpdate>1306231886</lastUpdate>
+ <pagerank>0</pagerank>
+ <date>1306228138</date>
+ <headers></headers>
+ <articleType>email</articleType>
+ <emailProcessedContents binaryencoding="base64"></emailProcessedContents>
+ <attachments></attachments>
+ <documentAbstract></documentAbstract>
+ <audience>(NULL)</audience>
+ <docDirId>(NULL)</docDirId>
+ <visibility>1</visibility>
+ <headings>
+ <item></item>
+ </headings>
+ <my_title>How to load JSON library in vespa 5.0.8</my_title>
+ <messageId>4498622c3f131e1067c4e7bc18ac96db</messageId>
+ <origContents binaryencoding="base64">VGhpcyB3YXMgZG9uZSB0byBtYWtlIGl0IHNpbXBsZXIgZm9yIHBlb3BsZSB0byBnZXQgc3RhcnRlZCB3aXRoIGphcnMgbG9hZGVkIHRocm91Z2ggT1NHaSwgb24gdGhlIGFzc3VtcHRpb24gdGhhdCBtb3N0IHdvdWxkbid0IG5lZWQgdG8gaW1wb3J0IGFueXRoaW5nLgpJJ20gbm90IHN1cmUgaXQgYWN0dWFsbHkgZ2l2ZXMgYSBuZXQgc2F2aW5nIHRob3VnaCwgZm9yIHRoZSByZWFzb24geW91IHBvaW50IG91dC4gV2UnbGwgcHJvYmFibHkgbWFrZSB0aGUgZXhpc3RlbmNlIG9mIGdsb2JhbCBwYWNrYWdlcyBvcHRpb25hbCBzb21laG93IGxhdGVyLgoKLS0KSm9uCgpEZW4gMjEuIG1haSAyMDExIGtsLiAwMy4yMSBza3JldiBMYXVyZW50IEdvdWpvbjoKCkp1c3Qgb3V0IG9mIGN1cmlvc2l0eSwgd2h5IG1vdGl2YXRlZCB0aGUgY2hvaWNlIG9mIGV4cG9zaW5nIHRoZW0gYXMgZ2xvYmFsIHBhY2thZ2VzIGluc3RlYWQgb2YgZXhwb3J0ZWQgcGFja2FnZXM/CgpJIGhhZCBwZXJzb25hbGx5IGEgaGFyZCB0aW1lIGZpZ3VyaW5nIG91dCB3aGljaCBwYWNrYWdlcyBJIGhhZCB0byBpbXBvcnQgYW5kIHRvIG5vdCBpbXBvcnQgKGphdmF4LiosIGNvbS55YWhvby52ZXNwYSwgZXRjLi4uKSBhbmQgSSdtIHdvbmRlcmluZyBpZiBpdCBtZWFucyBJIG5lZWQgdG8gcmVwYWNrYWdlIHNvbWUgM3JkcGFydHkgY29tcG9uZW50IGlmIHRoZXkgZGVjaWRlIHRvIGltcG9ydCBzb21lIG9mIHRoZXNlIGdsb2JhbCBwYWNrYWdlcyBsaWtlIGphdmF4LmZvby4KCmxhdXJlbnQKZ291am9uCgpnZW9pbmZvcm1hdGljcyBzb2Z0d2FyZSBlbmdpbmVlcgoKZ291am9ubEB5YWhvby1pbmMuY29tPG1haWx0bzpnb3Vqb25sQHlhaG9vLWluYy5jb20+CmRpcmVjdCAoNDA4KSAzNDkgOTMwMgp5IWltIHlsYXVyZW50Z28KCjcwMSBmaXJzdCBhdmVudWUsIHN1bm55dmFsZSwgY2EsIDk0MDg5LTA3MDMsIHVzCnBob25lICg0MDgpIDM0OSAzMzAwICAgIGZheCAoNDA4KSAzNDkgMzMwMQoKPHlhaG9vLmdpZj4KCgpMZSA1LzEwLzExIDEyOjEwIEFNLCBKb24gUyBCcmF0c2V0aCBhIMOpY3JpdCA6CgpUaGUgcGFja2FnZXMgaW4gdGhlIFZlc3BhIHB1YmxpYyBhcGkgaXMgYWx3YXlzIGF2YWlsYWJsZSAoZ2xvYmFsKS4KVGhhdCdzIHRoZSBvbmVzIGluY2x1ZGVkIGhlcmU6IGh0dHA6Ly92ZXNwYS5jb3JwLnlhaG9vLmNvbS9qYXZhZG9jLzUuMC9hbGwvCkluIGFkZGl0aW9uLCB0aGUgcGFja2FnZXMgYmVsb25naW5nIHRvIHRoZSBKYXZhIFNFIEFQSSAoamF2YXggZXRjLikgYXJlIGdsb2JhbC4KClRoZSAiZXhwb3J0ZWQiIGxpc3QgaXMgbm90IHF1aXRlIGFzIHdlbGwgZGVmaW5lZCB0aG91Z2guIFlvdSBjYW4gYWx3YXlzIGp1c3QgdHJ5IHRvIGltcG9ydCB3aGVuIHNvbWUgcGFja2FnZSBpcyBub3QgaW4gdGhlICJnbG9iYWwiIGNhdGVnb3J5IGFib3ZlLCBpZiBpdCBjYW5ub3QgYmUgcmVzb2x2ZWQgdGhlbiB3ZSBkbyBub3QgZXhwb3J0IGl0LiBPciBjaGVjayB0aGUgc291cmNlIGFzIEZyYW5jb2lzIG1lbnRpb25zLg==</origContents>
+ <threadUrl>/2011/05/09/how_to_load_json_library_in_vespa_5_0_8</threadUrl>
+ <level>(NULL)</level>
+</document>
+<document documenttype="vespacorp" documentid="id:vespacorp:vespacorp::11b6e7f86d650b995289dcadaaf902fc">
+ <threadId>f4467ca341409984b9c3ab4c431f0a7c</threadId>
+ <contentsHash>c0d00a07cc1486404f5e23a2cc3cc5ed</contentsHash>
+ <authors>
+ <item>bratseth</item>
+ </authors>
+ <urlPath>/vespa-users/msg/2538973d3c06d9d5163bf9bd069d0bd6</urlPath>
+ <contents>The unit test and doc I pointed to had some examples were you looking for something else The unit tests are using processors from this set http://vespa/view/vespa/trunk/container/processing/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java?view=markup From this you might be interested in Federator forks the execution to multiple chains executed in parallel FutureDataSource creates and returns response containing future data BlockingSplitter An example of waiting for some future data to complete before using them for some processing of course don t do this if you want full async for some reason AsyncDataProcessingInitiator An example of registering future processing when a list of data is completed i.e process the data without blocking StreamProcessingInitiator An example of registering future processing on every additional piece of data entering a list None of this shows how to create a real async data source which gets its data from the network the FutureDataSource above is of course just a mock I have an example of that somewhere which I can dig up if you need it Jon On 6 feb 2014 at 02:40 GOs wrote Is there an example which uses the futures We have async working but it's not sending all of the data until the request is marked as completed Best GGO Tech Yahoo Software Systems Development Engineer M 701 First Avenue Sunnyvale CA 94089 http://forgood.zenfs.com/logos/yahoo.png On Wednesday February 5 2014 2:02 AM Jon Bratseth wrote See the processing framework in particular http://vespa/5/documentation/jdisc/processing.html#asynchronous-processing and the example AsyncDataProducer towards the end of the page In short the Processing framework supports this use case out of the box just return a Response with data futures and the renderer will render as much as possible at once and then the future data whenever it becomes available See http://vespa/view/vespa/trunk/container/core/src/test/java/com/yahoo/processing/handler/ProcessingHandlerTestCase.java?view=markup for some complete examples the tests named something with async Also note that by default the renderer will preserve the order of the future placeholders in the response such that if a Response containing future data list A and B it will wait for A before rendering further even if B is available If you want to return data as soon as possible in any order have your DataList instances implement com.yahoo.processing.response.Ordered and return false from isOrdered There s an example of that as well in the test above Jon On 5 feb 2014 at 10:43 Kristian Aune wrote vespa-users is the best list K On 5 feb 2014 at 01:24 GO wrote Hey Kristian Not sure what the ilist is for JDisc user support so if you know just reply and I'll forward this mail that way Basically here's the scenario I'm looking at We get a request from client and receive it in JDisc Then we do a bunch of processing and serve responses as they come this way the client can immediately render finished responses while still waiting for pending responses What is the easiest way to do this in a JDisc container I was looking at websockets but not sure how much they are supported Best GE desde mi iPhone Vespa Information http://vespa </contents>
+ <parentId>1c809fe04d3807dc72be71cf1aa559d6</parentId>
+ <title>JDisc streaming / websockets</title>
+ <my_contents>The unit test and doc I pointed to had some examples were you looking for something else The unit tests are using processors from this set http://vespa/view/vespa/trunk/container/processing/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java?view=markup From this you might be interested in Federator forks the execution to multiple chains executed in parallel FutureDataSource creates and returns response containing future data BlockingSplitter An example of waiting for some future data to complete before using them for some processing of course don t do this if you want full async for some reason AsyncDataProcessingInitiator An example of registering future processing when a list of data is completed i.e process the data without blocking StreamProcessingInitiator An example of registering future processing on every additional piece of data entering a list None of this shows how to create a real async data source which gets its data from the network the FutureDataSource above is of course just a mock I have an example of that somewhere which I can dig up if you need it Jon On 6 feb 2014 at 02:40 GO wrote Is there an example which uses the futures We have async working but it's not sending all of the data until the request is marked as completed Best GGO Tech Yahoo Software Systems Development Engineer MFirst Avenue Sunnyvale CA 94089 http://forgood.zenfs.com/logos/yahoo.png On Wednesday February 5 2014 2:02 AM Jon Bratseth wrote See the processing framework in particular http://vespa/5/documentation/jdisc/processing.html#asynchronous-processing and the example AsyncDataProducer towards the end of the page In short the Processing framework supports this use case out of the box just return a Response with data futures and the renderer will render as much as possible at once and then the future data whenever it becomes available See http://vespa/view/vespa/trunk/container/core/src/test/java/com/yahoo/processing/handler/ProcessingHandlerTestCase.java?view=markup for some complete examples the tests named something with async Also note that by default the renderer will preserve the order of the future placeholders in the response such that if a Response containing future data list A and B it will wait for A before rendering further even if B is available If you want to return data as soon as possible in any order have your DataList instances implement com.yahoo.processing.response.Ordered and return false from isOrdered There s an example of that as well in the test above Jon On 5 feb 2014 at 10:43 Kristian Aune wrote vespa-users is the best list K On 5 feb 2014 at 01:24 GO wrote Hey Kristian Not sure what the ilist is for JDisc user support so if you know just reply and I'll forward this mail that way Basically here's the scenario I'm looking at We get a request from client and receive it in JDisc Then we do a bunch of processing and serve responses as they come this way the client can immediately render finished responses while still waiting for pending responses What is the easiest way to do this in a JDisc container I was looking at websockets but not sure how much they are supported Best GE desde mi iPhone Vespa Information http://vespa</my_contents>
+ <lastUpdate>1391683805</lastUpdate>
+ <pagerank>0</pagerank>
+ <date>1391680103</date>
+ <headers></headers>
+ <articleType>email</articleType>
+ <emailProcessedContents binaryencoding="base64"></emailProcessedContents>
+ <attachments></attachments>
+ <documentAbstract></documentAbstract>
+ <audience>(NULL)</audience>
+ <docDirId>(NULL)</docDirId>
+ <visibility>1</visibility>
+ <headings>
+ <item></item>
+ </headings>
+ <my_title>JDisc streaming / websockets</my_title>
+ <messageId>2538973d3c06d9d5163bf9bd069d0bd6</messageId>
+ <origContents binaryencoding="base64">VGhlIHVuaXQgdGVzdCBhbmQgZG9jIEkgcG9pbnRlZCB0byBoYWQgc29tZSBleGFtcGxlcywgd2VyZSB5b3UgbG9va2luZyBmb3Igc29tZXRoaW5nIGVsc2U/CgpUaGUgdW5pdCB0ZXN0cyBhcmUgdXNpbmcgcHJvY2Vzc29ycyBmcm9tIHRoaXMgc2V0OgpodHRwOi8vc3ZuLnRyb25kaGVpbS5jb3JwLnlhaG9vLmNvbS92aWV3L3Zlc3BhL3RydW5rL2NvbnRhaW5lci9wcm9jZXNzaW5nL3NyYy9tYWluL2phdmEvY29tL3lhaG9vL3Byb2Nlc3NpbmcvdGVzdC9Qcm9jZXNzb3JMaWJyYXJ5LmphdmE/dmlldz1tYXJrdXAKRnJvbSB0aGlzLiB5b3UgbWlnaHQgYmUgaW50ZXJlc3RlZCBpbgpGZWRlcmF0b3IgLSBmb3JrcyB0aGUgZXhlY3V0aW9uIHRvIG11bHRpcGxlIGNoYWlucyBleGVjdXRlZCBpbiBwYXJhbGxlbC4KRnV0dXJlRGF0YVNvdXJjZSAtIGNyZWF0ZXMgYW5kIHJldHVybnMgcmVzcG9uc2UgY29udGFpbmluZyBmdXR1cmUgZGF0YS4KQmxvY2tpbmdTcGxpdHRlciAtIEFuIGV4YW1wbGUgb2Ygd2FpdGluZyBmb3Igc29tZSBmdXR1cmUgZGF0YSB0byBjb21wbGV0ZSBiZWZvcmUgdXNpbmcgdGhlbSBmb3Igc29tZSBwcm9jZXNzaW5nIChvZiwgY291cnNlLCBkb27CknQgZG8gdGhpcyBpZiB5b3Ugd2FudCBmdWxsIGFzeW5jIGZvciBzb21lIHJlYXNvbikKQXN5bmNEYXRhUHJvY2Vzc2luZ0luaXRpYXRvciAtIEFuIGV4YW1wbGUgb2YgcmVnaXN0ZXJpbmcgZnV0dXJlIHByb2Nlc3Npbmcgd2hlbiBhIGxpc3Qgb2YgZGF0YSBpcyBjb21wbGV0ZWQgKGkuZSBwcm9jZXNzIHRoZSBkYXRhIHdpdGhvdXQgYmxvY2tpbmcpClN0cmVhbVByb2Nlc3NpbmdJbml0aWF0b3IgLSBBbiBleGFtcGxlIG9mIHJlZ2lzdGVyaW5nIGZ1dHVyZSBwcm9jZXNzaW5nIG9uIGV2ZXJ5IGFkZGl0aW9uYWwgcGllY2Ugb2YgZGF0YSBlbnRlcmluZyBhIGxpc3QKCk5vbmUgb2YgdGhpcyBzaG93cyBob3cgdG8gY3JlYXRlIGEgKnJlYWwqIGFzeW5jIGRhdGEgc291cmNlIHdoaWNoIGdldHMgaXRzIGRhdGEgZnJvbSB0aGUgbmV0d29yayAodGhlIEZ1dHVyZURhdGFTb3VyY2UgYWJvdmUgaXMgb2YgY291cnNlIGp1c3QgYSBtb2NrKS4KSSBoYXZlIGFuIGV4YW1wbGUgb2YgdGhhdCBzb21ld2hlcmUgd2hpY2ggSSBjYW4gZGlnIHVwIGlmIHlvdSBuZWVkIGl0LgoKwpcKSm9uCgpPbiA2LiBmZWIuIDIwMTQsIGF0IDAyOjQwLCBHYXZpbiBPd2VucyA8Z293ZW5zQHlhaG9vLWluYy5jb208bWFpbHRvOmdvd2Vuc0B5YWhvby1pbmMuY29tPj4gd3JvdGU6CgpJcyB0aGVyZSBhbiBleGFtcGxlIHdoaWNoIHVzZXMgdGhlIGZ1dHVyZXM/CgpXZSBoYXZlIGFzeW5jIHdvcmtpbmcsIGJ1dCBpdCdzIG5vdCBzZW5kaW5nIGFsbCBvZiB0aGUgZGF0YSB1bnRpbCB0aGUgcmVxdWVzdCBpcyBtYXJrZWQgYXMgY29tcGxldGVkLgoKQmVzdCwKR2F2aW4KCgpHYXZpbiBPd2VucwpUZWNoIFlhaG9vLCBTb2Z0d2FyZSBTeXN0ZW1zIERldmVsb3BtZW50IEVuZ2luZWVyCk06ICg0MDgpIDMwNi03NTM2CjcwMSBGaXJzdCBBdmVudWUKU3Vubnl2YWxlIENBIDk0MDg5CgpbaHR0cDovL2Zvcmdvb2QuemVuZnMuY29tL2xvZ29zL3lhaG9vLnBuZ10KCgpPbiBXZWRuZXNkYXksIEZlYnJ1YXJ5IDUsIDIwMTQgMjowMiBBTSwgSm9uIEJyYXRzZXRoIDxicmF0c2V0aEB5YWhvby1pbmMuY29tPG1haWx0bzpicmF0c2V0aEB5YWhvby1pbmMuY29tPj4gd3JvdGU6ClNlZSB0aGUgcHJvY2Vzc2luZyBmcmFtZXdvcmssIGluIHBhcnRpY3VsYXIgaHR0cDovL3Zlc3BhLmNvcnAueWFob28uY29tLzUvZG9jdW1lbnRhdGlvbi9qZGlzYy9wcm9jZXNzaW5nLmh0bWwjYXN5bmNocm9ub3VzLXByb2Nlc3NpbmcKYW5kIHRoZSBleGFtcGxlIEFzeW5jRGF0YVByb2R1Y2VyIHRvd2FyZHMgdGhlIGVuZCBvZiB0aGUgcGFnZS4KCkluIHNob3J0LCB0aGUgUHJvY2Vzc2luZyBmcmFtZXdvcmsgc3VwcG9ydHMgdGhpcyB1c2UgY2FzZSBvdXQgb2YgdGhlIGJveCAtIGp1c3QgcmV0dXJuIGEgUmVzcG9uc2Ugd2l0aCBkYXRhIGZ1dHVyZXMgYW5kIHRoZSByZW5kZXJlciB3aWxsIHJlbmRlciBhcyBtdWNoIGFzIHBvc3NpYmxlIGF0IG9uY2UgYW5kIHRoZW4gdGhlIGZ1dHVyZSBkYXRhIHdoZW5ldmVyIGl0IGJlY29tZXMgYXZhaWxhYmxlLgoKClNlZSBodHRwOi8vc3ZuLnRyb25kaGVpbS5jb3JwLnlhaG9vLmNvbS92aWV3L3Zlc3BhL3RydW5rL2NvbnRhaW5lci9jb3JlL3NyYy90ZXN0L2phdmEvY29tL3lhaG9vL3Byb2Nlc3NpbmcvaGFuZGxlci9Qcm9jZXNzaW5nSGFuZGxlclRlc3RDYXNlLmphdmE/dmlldz1tYXJrdXAKZm9yIHNvbWUgY29tcGxldGUgZXhhbXBsZXMgKHRoZSB0ZXN0cyBuYW1lZCBzb21ldGhpbmcgd2l0aCBhc3luYykuCgoKQWxzbywgbm90ZSB0aGF0IGJ5IGRlZmF1bHQgdGhlIHJlbmRlcmVyIHdpbGwgcHJlc2VydmUgdGhlIG9yZGVyIG9mIHRoZSBmdXR1cmUgcGxhY2Vob2xkZXJzIGluIHRoZSByZXNwb25zZSwgc3VjaCB0aGF0IGlmIGEgUmVzcG9uc2UgY29udGFpbmluZyBmdXR1cmUgZGF0YSBsaXN0IEEgYW5kIEIsIGl0IHdpbGwgd2FpdCBmb3IgQSBiZWZvcmUgcmVuZGVyaW5nIGZ1cnRoZXIgZXZlbiBpZiBCIGlzIGF2YWlsYWJsZS4KSWYgeW91IHdhbnQgdG8gcmV0dXJuIGRhdGEgYXMgc29vbiBhcyBwb3NzaWJsZSBpbiBhbnkgb3JkZXIsIGhhdmUgeW91ciBEYXRhTGlzdCBpbnN0YW5jZXMgaW1wbGVtZW50IGNvbS55YWhvby5wcm9jZXNzaW5nLnJlc3BvbnNlLk9yZGVyZWQgYW5kIHJldHVybiBmYWxzZSBmcm9tIGlzT3JkZXJlZC4gVGhlcmXCknMgYW4gZXhhbXBsZSBvZiB0aGF0IGFzIHdlbGwgaW4gdGhlIHRlc3QgYWJvdmUuCgrClwpKb24KCk9uIDUuIGZlYi4gMjAxNCwgYXQgMTA6NDMsIEtyaXN0aWFuIEF1bmUgPEtyaXN0aWFuLkF1bmVAeWFob28taW5jLmNvbTxtYWlsdG86S3Jpc3RpYW4uQXVuZUB5YWhvby1pbmMuY29tPj4gd3JvdGU6Cgo+IHZlc3BhLXVzZXJzQCBpcyB0aGUgYmVzdCBsaXN0IDstKQo+Cj4gLUsKPgo+IE9uIDUuIGZlYi4gMjAxNCwgYXQgMDE6MjQsIEdhdmluIE93ZW5zIDxnb3dlbnNAeWFob28taW5jLmNvbTxtYWlsdG86Z293ZW5zQHlhaG9vLWluYy5jb20+PiB3cm90ZToKPgo+PiBIZXkgS3Jpc3RpYW4sCj4+Cj4+IE5vdCBzdXJlIHdoYXQgdGhlIGlsaXN0IGlzIGZvciBKRGlzYyB1c2VyIHN1cHBvcnQsIHNvIGlmIHlvdSBrbm93IGp1c3QgcmVwbHkgYW5kIEknbGwgZm9yd2FyZCB0aGlzIG1haWwgdGhhdCB3YXkuCj4+Cj4+IEJhc2ljYWxseSwgaGVyZSdzIHRoZSBzY2VuYXJpbyBJJ20gbG9va2luZyBhdC4uLgo+Pgo+PiBXZSBnZXQgYSByZXF1ZXN0IGZyb20gY2xpZW50IGFuZCByZWNlaXZlIGl0IGluIEpEaXNjLiBUaGVuIHdlIGRvIGEgYnVuY2ggb2YgcHJvY2Vzc2luZyBhbmQgc2VydmUgcmVzcG9uc2VzIGFzIHRoZXkgY29tZSAodGhpcyB3YXkgdGhlIGNsaWVudCBjYW4gaW1tZWRpYXRlbHkgcmVuZGVyIGZpbmlzaGVkIHJlc3BvbnNlcyB3aGlsZSBzdGlsbCB3YWl0aW5nIGZvciBwZW5kaW5nIHJlc3BvbnNlcykuCj4+Cj4+IFdoYXQgaXMgdGhlIGVhc2llc3Qgd2F5IHRvIGRvIHRoaXMgaW4gYSBKRGlzYyBjb250YWluZXI/IEkgd2FzIGxvb2tpbmcgYXQgd2Vic29ja2V0cyBidXQgbm90IHN1cmUgaG93IG11Y2ggdGhleSBhcmUgc3VwcG9ydGVkLgo+Pgo+PiBCZXN0LAo+PiBHYXZpbgo+Pgo+PiBFbnZpYWRvIGRlc2RlIG1pIGlQaG9uZQoKPgo+Cj4KPiBWZXNwYSBJbmZvcm1hdGlvbjoKPiAgICBodHRwOi8vdmVzcGEuY29ycC55YWhvby5jb20vCj4gICAgaHR0cDovL3R3aWtpLmNvcnAueWFob28uY29tL3ZpZXcvVmVzcGEKCj4=</origContents>
+ <threadUrl>/2014/02/05/jdisc_streaming_websockets</threadUrl>
+ <level>(NULL)</level>
+</document>
+<document documenttype="vespacorp" documentid="id:vespacorp:vespacorp::9de81494755bf8ee6940bcdf156081e2">
+ <threadId>39e8f15ad422399d1c72d3f178f956c2</threadId>
+ <contentsHash>d9d95e6364aaac73b3bfb77f377236fc</contentsHash>
+ <urlPath>/vespa-users/msg/4f12250dae5ee19458973a0afaf6967e</urlPath>
+ <contents>I don't get any error while deployment the expression using created_at directly I have filed a support BZ 3455702 for this Thanks Yi On Mar 7 2010 at 10:23 AM Jo Kristian Bergum wrote On Mar 5 2010 at 11:47 PM YZ wrote Hi folks I'm trying to confirm the range of value for nativeRank is between 0 and 1 With this rank profile rank-profile native first-phase expression nativeRank I get the following result Question is relevancy showing the ranking score If so why it's 0 Depends on your query what fields were searched and your search definition I did another test this time using rank-profile created_at first-phase expression created_at This should have failed during deploy correct is expression attribute(created_at You probably have quite a few failed blueprint compilation errors in the vespa.log on the search nodes The result is as follows relevancy is still 0 and I would expect the value of created_at which is 1267569885 1267569885 Thanks Y </contents>
+ <parentId>df5b35d538401b06cfbe803b55856a3e</parentId>
+ <title>Value of nativeRank</title>
+ <my_contents>I don't get any error while deployment the expression using created_at directly I have filed a support BZ 3455702 for this Thanks Yi On Mar 7 2010 at 10:23 AM Jo Kristian Bergum wrote On Mar 5 2010 at 11:47 PM YZ wrote Hi folks I'm trying to confirm the range of value for nativeRank is between 0 and 1 With this rank profile rank-profile native first-phase expression nativeRank I get the following result Question is relevancy showing the ranking score If so why it's 0 Depends on your query what fields were searched and your search definition I did another test this time using rank-profile created_at first-phase expression created_at This should have failed during deploy correct is expression attribute(created_at You probably have quite a few failed blueprint compilation errors in the vespa.log on the search nodes The result is as follows relevancy is still 0 and I would expect the value of created_at which is 1267569885 1267569885 Thanks Y </my_contents>
+ <lastUpdate>1267989987</lastUpdate>
+ <pagerank>0</pagerank>
+ <date>1267986312</date>
+ <headers></headers>
+ <articleType>email</articleType>
+ <emailProcessedContents binaryencoding="base64"></emailProcessedContents>
+ <attachments></attachments>
+ <documentAbstract></documentAbstract>
+ <audience>(NULL)</audience>
+ <docDirId>(NULL)</docDirId>
+ <visibility>1</visibility>
+ <headings>
+ <item></item>
+ </headings>
+ <my_title>Value of nativeRank</my_title>
+ <messageId>4f12250dae5ee19458973a0afaf6967e</messageId>
+ <origContents binaryencoding="base64">SSBkb24ndCBnZXQgYW55IGVycm9yIHdoaWxlIGRlcGxveW1lbnQgdGhlIGV4cHJlc3Npb24gdXNpbmcgY3JlYXRlZF9hdCBkaXJlY3RseS4KSSBoYXZlIGZpbGVkIGEgc3VwcG9ydCBCWiAzNDU1NzAyIGZvciB0aGlzLgoKVGhhbmtzLAotWWkKCk9uIE1hciA3LCAyMDEwLCBhdCAxMDoyMyBBTSwgSm8gS3Jpc3RpYW4gQmVyZ3VtIHdyb3RlOgoKCk9uIE1hciA1LCAyMDEwLCBhdCAxMTo0NyBQTSwgWWkgWmhhbmcgd3JvdGU6Cgo+IEhpIGZvbGtzLAo+Cj4gICBJJ20gdHJ5aW5nIHRvIGNvbmZpcm0gdGhlIHJhbmdlIG9mIHZhbHVlIGZvciBuYXRpdmVSYW5rIGlzIGJldHdlZW4KPiAwIGFuZCAxLgo+Cj4gICBXaXRoIHRoaXMgcmFuayBwcm9maWxlOgo+ICAgcmFuay1wcm9maWxlIG5hdGl2ZXsKPiAgICAgZmlyc3QtcGhhc2UgeyBleHByZXNzaW9uOiBuYXRpdmVSYW5rIH0KPiAgIH0KPiAgIEkgZ2V0IHRoZSBmb2xsb3dpbmcgcmVzdWx0Ogo+Cj4gPHJlc3VsdCB0b3RhbC1oaXQtY291bnQ9IjIiPgo+IDxoaXQgcmVsZXZhbmN5PSIwIiBzb3VyY2U9InNjMC5udW0wIj4uLi4KPgo+ICAgUXVlc3Rpb246IGlzICJyZWxldmFuY3kiIHNob3dpbmcgdGhlIHJhbmtpbmcgc2NvcmU/IElmIHNvLCB3aHkgaXQncwo+IDA/CgpEZXBlbmRzIG9uIHlvdXIgcXVlcnkgKHdoYXQgZmllbGRzIHdlcmUgc2VhcmNoZWQpIGFuZCB5b3VyIHNlYXJjaApkZWZpbml0aW9uLgoKCj4gICBJIGRpZCBhbm90aGVyIHRlc3QsIHRoaXMgdGltZSB1c2luZwo+ICAgcmFuay1wcm9maWxlIGNyZWF0ZWRfYXR7Cj4gICAgIGZpcnN0LXBoYXNlIHsgIGV4cHJlc3Npb246IGNyZWF0ZWRfYXQgfQo+ICAgfQoKVGhpcyBzaG91bGQgaGF2ZSBmYWlsZWQgZHVyaW5nIGRlcGxveSwgY29ycmVjdCBpcwoKZXhwcmVzc2lvbjogYXR0cmlidXRlKGNyZWF0ZWRfYXQpCgpZb3UgcHJvYmFibHkgaGF2ZSBxdWl0ZSBhIGZldyAiZmFpbGVkIGJsdWVwcmludCBjb21waWxhdGlvbiIgZXJyb3JzIGluCnRoZSB2ZXNwYS5sb2cgb24gdGhlIHNlYXJjaCBub2Rlcz8KCgo+ICAgVGhlIHJlc3VsdCBpcyBhcyBmb2xsb3dzLCByZWxldmFuY3kgaXMgc3RpbGwgMCwgYW5kIEkgd291bGQgZXhwZWN0Cj4gdGhlIHZhbHVlIG9mIGNyZWF0ZWRfYXQsIHdoaWNoIGlzIDEyNjc1Njk4ODUuCj4gPHJlc3VsdCB0b3RhbC1oaXQtY291bnQ9IjIiPgo+IDxoaXQgcmVsZXZhbmN5PSIwIiBzb3VyY2U9InNjMC5udW0wIj4KPiA8ZmllbGQgbmFtZT0iY3JlYXRlZF9hdCI+MTI2NzU2OTg4NTwvZmllbGQ+Cj4KPiBUaGFua3MsCj4gLVlpCj4gPG1lc3NhZ2UtZm9vdGVyLnR4dD4=</origContents>
+ <threadUrl>/2010/03/05/value_of_nativerank</threadUrl>
+ <level>(NULL)</level>
+</document>
+<document documenttype="vespacorp" documentid="id:vespacorp:vespacorp::25ec8cc793e7e49cee2b5584917f6006">
+ <threadId>1a60037c3896299b0a90dd5d62034529</threadId>
+ <contentsHash>7d08e2ecdc2a7d243a748059db0e09ee</contentsHash>
+ <authors>
+ <item>bratseth</item>
+ </authors>
+ <urlPath>/vespa-users/msg/3eac9311090c9cf7c34f9d8c897f2c4b</urlPath>
+ <contents> search=incr&amp;restrict=abc RJ skrev Hi Jo We have multiple SDs deployed in one cluster and we normally use search= in our queries How do we use it for differentiating both the cluster name and the sd name For example We have two clusters namely incr and realtime each having SDs abc xyz We need a way to query from incr cluster in abc sd Thanks RJ Jo Kristian Bergum wrote On Mon 2009-08-24 at 16:55 +0530 AS wrote On the serving side how do we indicate the cluster to get results from search= /JKB </contents>
+ <parentId>00b89ea33262247e728f7534c9eafd0d</parentId>
+ <title>Need a real time index along with an Incremental index</title>
+ <my_contents> search=incr&amp;restrict=abc RJ skrev Hi Jo We have multiple SDs deployed in one cluster and we normally use search= in our queries How do we use it for differentiating both the cluster name and the sd name For example We have two clusters namely incr and realtime each having SDs abc xyz We need a way to query from incr cluster in abc sd Thanks RJ Jo Kristian Bergum wrote On Mon 2009-08-24 at 16:55 +0530 AS wrote On the serving side how do we indicate the cluster to get results from search= /JKB </my_contents>
+ <lastUpdate>1265031079</lastUpdate>
+ <pagerank>0</pagerank>
+ <date>1251115178</date>
+ <headers></headers>
+ <articleType>email</articleType>
+ <emailProcessedContents binaryencoding="base64"></emailProcessedContents>
+ <attachments></attachments>
+ <documentAbstract></documentAbstract>
+ <audience>(NULL)</audience>
+ <docDirId>(NULL)</docDirId>
+ <visibility>1</visibility>
+ <headings>
+ <item></item>
+ </headings>
+ <my_title>Need a real time index along with an Incremental index</my_title>
+ <messageId>3eac9311090c9cf7c34f9d8c897f2c4b</messageId>
+ <origContents binaryencoding="base64">JnNlYXJjaD1pbmNyJnJlc3RyaWN0PWFiYw0KDQpSYWphdCBKYWluIHNrcmV2Og0KPiBIaSBKbywNCj4NCj4gV2UgaGF2ZSBtdWx0aXBsZSBTRHMgZGVwbG95ZWQgaW4gb25lIGNsdXN0ZXIgYW5kIHdlIG5vcm1hbGx5IHVzZSANCj4gJnNlYXJjaD08c2RuYW1lPiBpbiBvdXIgcXVlcmllcy4gSG93IGRvIHdlIHVzZSBpdCBmb3IgZGlmZmVyZW50aWF0aW5nIA0KPiBib3RoIHRoZSBjbHVzdGVyIG5hbWUgYW5kIHRoZSBzZCBuYW1lPw0KPiBGb3IgZXhhbXBsZSwNCj4gICAgIFdlIGhhdmUgdHdvIGNsdXN0ZXJzIG5hbWVseSAiaW5jciIgYW5kICJyZWFsdGltZSIsIGVhY2ggaGF2aW5nIA0KPiBTRHMsICJhYmMiICYgInh5eiIuIFdlIG5lZWQgYSB3YXkgdG8gcXVlcnkgZnJvbSBpbmNyIGNsdXN0ZXIgaW4gYWJjIHNkLg0KPg0KPiBUaGFua3MhDQo+IFJhamF0IEphaW4NCj4NCj4gSm8gS3Jpc3RpYW4gQmVyZ3VtIHdyb3RlOg0KPj4gT24gTW9uLCAyMDA5LTA4LTI0IGF0IDE2OjU1ICswNTMwLCBBbWl0IFNpbmhhIHdyb3RlOg0KPj4gICANCj4+PiBPbiB0aGUgc2VydmluZyBzaWRlIGhvdyBkbyB3ZSBpbmRpY2F0ZSB0aGUgY2x1c3RlciB0byBnZXQgcmVzdWx0cw0KPj4+IGZyb20uDQo+Pj4gICAgIA0KPj4gJnNlYXJjaD08Y2x1c3Rlcm5hbWU+IA0KPj4NCj4+IC9KS0INCj4+DQo+Pg==</origContents>
+ <threadUrl>/2009/08/20/partial_update_of_the_entire_document</threadUrl>
+ <level>(NULL)</level>
+</document>
+<document documenttype="vespacorp" documentid="id:vespacorp:vespacorp::c146d4c9f6153c4573f9a22fd5b19eb3">
+ <threadId>5b3a4526d33f45b09f63d43f476f824a</threadId>
+ <contentsHash>3700f8d3da3a8963f34f11816ffda2cc</contentsHash>
+ <authors>
+ <item>bergum</item>
+ </authors>
+ <urlPath>/vespa-users/msg/a0c55f0c7669d097e470182bd7d411cf</urlPath>
+ <contents>On Mon 2009-05-18 at 15:44 +0800 JLB wrote Hi All Am looking for a documentation on how to setup Vespa 1.1.5 I found this link http://vespa/documentation/setup/howtorun.html and it's not available anymore Am just wondering if you have it somewhere If so can you send it to me Thank you Why do you want to use Vespa 1.1.5 It's legacy and not longer supported Please consider using latest stable vespa release http://vespa Best Jo Kristian J plain text document attachment message-footer.txt Vespa Information http://vespa</contents>
+ <parentId>acca854c8876330b43143c61a1e1f32c</parentId>
+ <title>Vespa 1.1.5 Documentation</title>
+ <my_contents>On Mon 2009-05-18 at 15:44 +0800 JLB wrote Hi All Am looking for a documentation on how to setup Vespa 1.1.5 I found this link http://vespa/documentation/setup/howtorun.html and it's not available anymore Am just wondering if you have it somewhere If so can you send it to me Thank you Why do you want to use Vespa 1.1.5 It's legacy and not longer supported Please consider using latest stable vespa release http://vespa Best Jo Kristian J plain text document attachment message-footer.txt Vespa Information http://vesp </my_contents>
+ <lastUpdate>1265031079</lastUpdate>
+ <pagerank>0</pagerank>
+ <date>1242632220</date>
+ <headers></headers>
+ <articleType>email</articleType>
+ <emailProcessedContents binaryencoding="base64"></emailProcessedContents>
+ <attachments></attachments>
+ <documentAbstract></documentAbstract>
+ <audience>(NULL)</audience>
+ <docDirId>(NULL)</docDirId>
+ <visibility>1</visibility>
+ <headings>
+ <item></item>
+ </headings>
+ <my_title>Vespa 1.1.5 Documentation</my_title>
+ <messageId>a0c55f0c7669d097e470182bd7d411cf</messageId>
+ <origContents binaryencoding="base64">T24gTW9uLCAyMDA5LTA1LTE4IGF0IDE1OjQ0ICswODAwLCBKZXJvbWUgTGVtdWVsIEJhc2Egd3JvdGU6Cj4gSGkgQWxsLAo+IAo+IEFtIGxvb2tpbmcgZm9yIGEgZG9jdW1lbnRhdGlvbiBvbiBob3cgdG8gc2V0dXAgVmVzcGEgMS4xLjUsIEkgZm91bmQgdGhpcyAKPiBsaW5rOiAKPiBodHRwOi8vdmVzcGEudHJvbmRoZWltLmNvcnAueWFob28uY29tLzEuMS41L2RvY3VtZW50YXRpb24vc2V0dXAvaG93dG9ydW4uaHRtbCAKPiBhbmQgaXQncyBub3QgYXZhaWxhYmxlIGFueW1vcmUuIEFtIGp1c3Qgd29uZGVyaW5nIGlmIHlvdSBoYXZlIGl0IAo+IHNvbWV3aGVyZS4gSWYgc28sIGNhbiB5b3Ugc2VuZCBpdCB0byBtZS4gVGhhbmsgeW91LgoKV2h5IGRvIHlvdSB3YW50IHRvIHVzZSBWZXNwYSAxLjEuNT8gSXQncyBsZWdhY3kgYW5kIG5vdCBsb25nZXIKc3VwcG9ydGVkLCBQbGVhc2UgY29uc2lkZXIgdXNpbmcgbGF0ZXN0IHN0YWJsZSB2ZXNwYSByZWxlYXNlLiAKaHR0cDovL3Zlc3BhLmNvcnAueWFob28uY29tLwoKQmVzdCwKSm8gS3Jpc3RpYW4gCgoKCj4gCj4gLUplcm9tZQo+IHBsYWluIHRleHQgZG9jdW1lbnQgYXR0YWNobWVudCAobWVzc2FnZS1mb290ZXIudHh0KQo+IFZlc3BhIEluZm9ybWF0aW9uOgo+ICAgICAgaHR0cDovL3Zlc3BhLmNvcnAueWFob28uY29tLwo+ICAgICAgaHR0cDovL3R3aWtpLmNvcnAueWFob28uY29tL3ZpZXcvVmVzcGEKPg==</origContents>
+ <threadUrl>/2009/05/18/vespa_1_1_5_documentation</threadUrl>
+ <level>(NULL)</level>
+</document>
+<document documenttype="vespacorp" documentid="id:vespacorp:vespacorp::0553f1ea3af33d11aa9ab42496d11f78">
+ <threadId>cb34c66221f7188a09e6151062c14e16</threadId>
+ <contentsHash>fba771ebc63a57f191cf783b0a59298f</contentsHash>
+ <authors>
+ <item>peng</item>
+ </authors>
+ <urlPath>/vespa-users/msg/079588cac2dfd1117bef476409da724b</urlPath>
+ <contents>There is no rules to write When you enable libyell_poststemmer if a word is not known by dictionary libyell will try to do plural singular stemming according to its builtin rules Peng Original Message From BK mailto:mail@yhost.com Sent Tuesday July 03 2007 3:10 AM To mail@host.com Subject rule based stemming in vespa Hi I came across this document http://VespaStemming on plural singular stemming I want to try rule based plural singular stemming libyell_poststemmer in vespa Can someone point me to the relevant documents on how to write these rules and use them Thanks B </contents>
+ <parentId>3be4bf063c46c7e3d336dbfb4c58f6e7</parentId>
+ <title>Rule based stemming in vespa</title>
+ <my_contents>There is no rules to write When you enable libyell_poststemmer if a word is not known by dictionary libyell will try to do plural singular stemming according to its builtin rules Peng Original Message From BK mailto:mail@yhost.com Sent Tuesday July 03 2007 3:10 AM To mail@host.com Subject rule based stemming in vespa Hi I came across this document http://LocalVespaStemming on plural singular stemming I want to try rule based plural singular stemming libyell_poststemmer in vespa Can someone point me to the relevant documents on how to write these rules and use them Thanks B </my_contents>
+ <lastUpdate>1265031079</lastUpdate>
+ <pagerank>0</pagerank>
+ <date>1184187769</date>
+ <headers></headers>
+ <articleType>email</articleType>
+ <emailProcessedContents binaryencoding="base64"></emailProcessedContents>
+ <attachments></attachments>
+ <documentAbstract></documentAbstract>
+ <audience>(NULL)</audience>
+ <docDirId>(NULL)</docDirId>
+ <visibility>1</visibility>
+ <headings>
+ <item></item>
+ </headings>
+ <my_title>Rule based stemming in vespa</my_title>
+ <messageId>079588cac2dfd1117bef476409da724b</messageId>
+ <origContents binaryencoding="base64">VGhlcmUgaXMgbm8gcnVsZXMgdG8gd3JpdGUuIFdoZW4geW91IGVuYWJsZSBsaWJ5ZWxsX3Bvc3RzdGVtbWVyLCBpZiBhIHdvcmQKaXMgbm90IGtub3duIGJ5IGRpY3Rpb25hcnksIGxpYnllbGwgd2lsbCB0cnkgdG8gZG8gcGx1cmFsLT5zaW5ndWxhciBzdGVtbWluZwphY2NvcmRpbmcgdG8gaXRzIGJ1aWx0aW4gcnVsZXMuCgpQZW5nCgogCgotLS0tLU9yaWdpbmFsIE1lc3NhZ2UtLS0tLQpGcm9tOiBCYWxhamkgS2FubmFuIFttYWlsdG86a2JhbGFqaUB5YWhvby1pbmMuY29tXSAKU2VudDogVHVlc2RheSwgSnVseSAwMywgMjAwNyAzOjEwIEFNClRvOiB2ZXNwYS11c2Vyc0B5YWhvby1pbmMuY29tClN1YmplY3Q6IHJ1bGUgYmFzZWQgc3RlbW1pbmcgaW4gdmVzcGEgCgpIaSwKICBJIGNhbWUgYWNyb3NzIHRoaXMgZG9jdW1lbnQKaHR0cDovL3R3aWtpLmNvcnAueWFob28uY29tL3ZpZXcvTG9jYWxlbmcvTG9jYWxWZXNwYVN0ZW1taW5nIG9uIApwbHVyYWwtPnNpbmd1bGFyIHN0ZW1taW5nLiAgIEkgd2FudCB0byB0cnkgcnVsZSBiYXNlZCBwbHVyYWwtPnNpbmd1bGFyIApzdGVtbWluZyAobGlieWVsbF9wb3N0c3RlbW1lcikgaW4gdmVzcGEuICBDYW4gc29tZW9uZSBwb2ludCBtZSB0byB0aGUKcmVsZXZhbnQgZG9jdW1lbnRzIG9uIGhvdyB0byB3cml0ZSB0aGVzZSBydWxlcyBhbmQgdXNlIHRoZW0uCgpUaGFua3MKQmFsYWpp</origContents>
+ <threadUrl>/2007/07/03/rule_based_stemming_in_vespa</threadUrl>
+ <level>(NULL)</level>
+</document>
+</vespafeed>
diff --git a/vespa-http-client/src/test/resources/xml-challenge.xml b/vespa-http-client/src/test/resources/xml-challenge.xml
new file mode 100644
index 00000000000..36be77f07e9
--- /dev/null
+++ b/vespa-http-client/src/test/resources/xml-challenge.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<vespafeed>
+ <document documenttype="biz" transformver="5681" documentid="id:lsbe:biz::21336977"><attrlist><![CDATA[<other_urls><n>2</n><l><m><url>http://www.facebook.com/pages/City-of-Sunnyvale-California/132586463442411</url><URLTYPE>facebook</URLTYPE></m><m><url>http://www.twitter.com/CityofSunnyvale</url><URLTYPE>twitter</URLTYPE></m></l></other_urls><toc>19,22,36,42,48,74</toc><website><m><src>GRID</src><url>http://www.sunnyvale.ca.gov/</url></m></website><neighbor>Downtown|Sunnyvale Town Center</neighbor><woeId>Zip:12797147;DMA:24701119;State:2347563</woeId><consumersubmit><addbyuser>0</addbyuser></consumersubmit>]]></attrlist></document>
+
+</vespafeed>
diff --git a/vespa-http-client/src/test/resources/xml-challenge2.xml b/vespa-http-client/src/test/resources/xml-challenge2.xml
new file mode 100644
index 00000000000..ee4b05806b2
--- /dev/null
+++ b/vespa-http-client/src/test/resources/xml-challenge2.xml
@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<vespafeed>
+ <document documenttype="biz" transformver="5681" documentid="id:lsbe:biz::21336977"><version_index>1395987733</version_index><attrlist>&lt;other_urls&gt;&lt;n&gt;2&lt;/n&gt;&lt;l&gt;&lt;m&gt;&lt;url&gt;http://www.facebook.com/pages/City-of-Sunnyvale-California/132586463442411&lt;/url&gt;&lt;URLTYPE&gt;facebook&lt;/URLTYPE&gt;&lt;/m&gt;&lt;m&gt;&lt;url&gt;http://www.twitter.com/CityofSunnyvale&lt;/url&gt;&lt;URLTYPE&gt;twitter&lt;/URLTYPE&gt;&lt;/m&gt;&lt;/l&gt;&lt;/other_urls&gt;&lt;toc&gt;19,22,36,42,48,74&lt;/toc&gt;&lt;website&gt;&lt;m&gt;&lt;src&gt;GRID&lt;/src&gt;&lt;url&gt;http://www.sunnyvale.ca.gov/&lt;/url&gt;&lt;/m&gt;&lt;/website&gt;&lt;neighbor&gt;Downtown|Sunnyvale Town Center&lt;/neighbor&gt;&lt;woeId&gt;Zip:12797147;DMA:24701119;State:2347563&lt;/woeId&gt;&lt;consumersubmit&gt;&lt;addbyuser&gt;0&lt;/addbyuser&gt;&lt;/consumersubmit&gt;</attrlist><dispycat>96929308:City Hall:1|96927047:Government:1</dispycat><ip_hoo5>0|0|0|0|0|0|0|0</ip_hoo5><ip_popularity_features>0.01401179941|0.0780352748154|0.0|0.00020|0.00008</ip_popularity_features><s1>&lt;CTS&gt;96925679 96927047 96929308&lt;/CTS&gt;&lt;S_CNSUMR_SUBMT&gt;0&lt;/S_CNSUMR_SUBMT&gt;&lt;DR_PLUS4&gt;7619&lt;/DR_PLUS4&gt;</s1><ll>-122037613;37371072</ll><stat>1 3 4 6 7 8 11 12 13 14 15 16 33 34 36 41 42 43 44 46 48 49 50 51 57 58 59 60 71 72 73 74 75 76 77 80 81 82 83 85 86 87 131 132 134 137 138 139 140 141 143 144 159 181 183 184 192 193 196 197 198 204</stat><uri>21336977</uri><title>sch | sunnyvalecity | sunnyvalecityhall | hall | city | sunnyvale</title><dtitle>Sunnyvale City Hall</dtitle><ip_hoo2>0|0|0|0|0|0|0|0</ip_hoo2><citystate>Sunnyvale CA</citystate><nhelpreview>0</nhelpreview><ll_long>-122037613</ll_long><ip_custombyte>7|57|103</ip_custombyte><prior_rating>0</prior_rating><ip_hoo4>0|0|0|0|0|0|0|0</ip_hoo4><ip_ycatid2>96929308|96927047</ip_ycatid2><ftitle>20</ftitle><ip_ycat_primary_id>96929308|96927047</ip_ycat_primary_id><ip_eid>NAV=17555658|PSOXSOCIALURL=21336977|INFOUSA=102028107</ip_eid><zip>94086</zip><prior_nreview>0</prior_nreview><ip_customweights>780|33|.09102564102564102|49.732394366197184|121739|3060000|717|1860|716|1860|459|1400|457|1400|114|913|-1|-1|-1|-1|0.0862069|0.12267753</ip_customweights><type>POI</type><enhanced>1</enhanced><ip_normtitle>sunnyvale city hall</ip_normtitle><primary_url>http://www.sunnyvale.ca.gov/</primary_url><ip_hoo1>0|0|0|0|0|0|0|0</ip_hoo1><dma>24701119</dma><city>Sunnyvale</city><ip_catkey>city hall|government</ip_catkey><prior_nrating>0</prior_nrating><lcw_ext><item weight="0">0</item></lcw_ext><pop0>0</pop0><ll_lat>37371072</ll_lat><phone>4087307500</phone><dispfeaturef>17</dispfeaturef><ip_ycat_primary>City Hall|Government</ip_ycat_primary><ip_hoo6>0|0|0|0|0|0|0|0</ip_hoo6><ip_ycat_primary_synonyms>county government|marriage licenses|us government|government-office|government relations firms|government agencies|government relations|marriage license|city government departments|city government|federal government|usgovernment|council of governments|governmentoffices|government offices|government officials|government-contract consultants|governments offices</ip_ycat_primary_synonyms><ip_dcat>City Hall|Government</ip_dcat><listing_status>103</listing_status><lcw_pcat><item weight="7071">96929308</item><item weight="7071">96927047</item></lcw_pcat><pop_keyword_portion><item weight="2">3053931</item><item weight="1">99043162</item><item weight="57">-734328434</item><item weight="6">-1164754482</item><item weight="33">-1164681749</item><item weight="10000">-1491073460</item><item weight="5000">1098856921</item><item weight="5000">90458857</item><item weight="10000">-1956199191</item><item weight="10000">-2015626891</item><item weight="10000">38487508</item><item weight="1428">-1384211533</item><item weight="588">-995864441</item><item weight="6666">-350418064</item><item weight="112">-1493944221</item></pop_keyword_portion><stat2>2 3 4 5 7 8 10 11 21 24 31 42 44 45 46 47 48 50 51 53 54 56 57 69 92 112 121 123 128 129 134 154 195 205 232 238 242</stat2><desc> </desc><ip_rating>0</ip_rating><lcw><item weight="5745">96927047</item><item weight="8186">96929308</item></lcw><pop_keyword_certainty><item weight="5425">3053931</item><item weight="5344">99043162</item><item weight="7105">-734328434</item><item weight="5140">-1164754482</item><item weight="6766">-1164681749</item><item weight="10000">-1491073460</item><item weight="9587">1098856921</item><item weight="9587">90458857</item><item weight="10000">-1956199191</item><item weight="10000">-2015626891</item><item weight="10000">38487508</item><item weight="8842">-1384211533</item><item weight="9120">-995864441</item><item weight="9621">-350418064</item><item weight="7429">-1493944221</item></pop_keyword_certainty><q>9</q><isactive>1</isactive><ip_ycat2gc>96929308|96927047</ip_ycat2gc><ip_hoo0>0|0|0|0|0|0|0|0</ip_hoo0><language>en</language><nreview>0</nreview><ip_pycatnames>Government &amp; Community</ip_pycatnames><ip_catkey_click>marriage license</ip_catkey_click><country_code>us</country_code><state>CA</state><ip_hoo3>0|0|0|0|0|0|0|0</ip_hoo3><version_pub>1346889600</version_pub><ip_ycat_primary_cp_desc>CITY GOVERNMENT-EXECUTIVE OFFICES</ip_ycat_primary_cp_desc><crossst>2|All America Way|Charles St</crossst><ip_dcatkey>us government|government relations firms|marriage license|federal government|council of governments|governmentoffices|government officials|government-contract consultants|governments offices|county government|marriage licenses|government-office|government agencies|government relations|city government departments|city government|usgovernment|government offices</ip_dcatkey><dispambiancef>7</dispambiancef><ip_ycatid1>96925679</ip_ycatid1><paid_listing_status>0</paid_listing_status><lcw_norm>0.5640622</lcw_norm><webkeyword>city sunnyvale twitter jobs news contact us about codes policies charter municipal code council policy general plan maps directions map hall external link library public safety parks golf courses swimming pools tennis center transportation area resources community resource guide frequently requested english spanish budget documents data demographics business economic profile learn new resident information sheet this site privacy what hot topics plastic bag ban amendment ballot measures elections firearms retail study issue review committee medical marijuana dispensary horizon downtown redevelopment sustainability consolidation lute update manager updates meetings agendas next meeting february boards commissions latest publications fall activity winter quarterly report current job openings bid on projects open bids around cited as american top potential upcoming events issues workshop feb offices are closed more event calendar government agenda watch making presentation at arts bicycle pedestrian board trustees building appeals heritage preservation housing human recreation personnel planning other agencies county santa clara state california clerk page campaign ethics departments attorney development environmental nova workforce finance technology works living get card search catalog manage your account getting involved volunteering neighborhood associations classes activities out now playing performing shopping dining in centers columbia senior guides unemployed residents infrastructure traffic trees street maintenance garbage recycling smart station extra tags affordable assistance water supply pollution control plant police fire emergency preparedness alarms enforcement animal crime prevention records recruiting permits special residential non one stop permit checks fees encroachment tree removal doing starting facts figures bidding process shop auto row available commercial properties licenses reports links division newsroom recent releases apple occupy president day holiday closures meet local author francisco jimenez reads muslim door arrest made fatal hit run le jazz copper wire theft hits television broadcast schedule social media follow want apply license block party sign up service call tee times facility reservations compost pay my utility bill fines violation graffiti pothole web dispose old medication waste hazardous trash find department places dine volunteer access file claim help improve list something missing from that would make it even useful let know we welcome suggestions friday share comments tuesday strategic featured telephone scam claims be computer has received complaints apparent phone which caller identifies himself representing victim windows system transmitting bad does not these kind calls have if receive type desk officer gary announced will join ranks high tech businesses located square foot town office mathilda mckinley avenues important step forward said entire benefit see yet another large gain project read planned by pg along caribbean drive remove number mostly eucalyptus measure underground gas pipeline young replacement planted median controls during work or go amp presidents facilities including monday observance all parking regulations enforced except where signs specifically exempt holidays collection continue normal posted edition gives tip leading burglars vargas elementary school partnership marathon club kids led streetlight free healthy toddler workshops electronic commission change please note dates do typical due construction chambers scheduled wednesday artists applications hands festival participate may downloading click here download application acclaimed quartet performs gypsy la reinhardt version internationally recognized san perform valentine weekend concert style theatre saturday silicon valley diverse religions cultures been celebrated each year reading choices continues select provocative relevant theme off air channel broadcasts equipment upgrade allowing compliant standards begin broadcasting again no down time so missed vehicle versus accident occurred intersection sequoia reed avenue january driver black struck benjamin lin did major investigation team developed significant leads case identification seizure warrant was issued popular register november networks google divider final eir sales hazard mitigation subscribe notifications feed icon olive ave ca logo can check stay touch used maintained communications questions fine print terms use</webkeyword><ip_provider>NAV|PSOXSOCIALURL|INFOUSA</ip_provider><nrating>0</nrating><ratingfgc>0</ratingfgc><ip_ycat2>96925679</ip_ycat2><spaid>N</spaid><ip_cat>GOVERNMENTOFFICES|OFFICES|HALL|GOVERNMENT|DEPARTMENTS|GOVERNMENTS|CONSULTANTS|OFFICIALS|RELATIONS|USGOVERNMENT|OFFICE|CONTRACT|COUNCIL|CITY|COMMUNITY|FIRMS|AGENCIES|COUNTY</ip_cat><ip_neighborhood>downtownsunnyvaleca|sunnyvaletowncentersunnyvaleca</ip_neighborhood><addrhash>068B74475DB0D415</addrhash><webtext>City of Sunnyvale: Home * Twitter * | * Jobs * | * News * | * eNotify * | * RSS * | * Contact Us Home About The City Codes and Policies City Charter Municipal Code Council Policy General Plan Maps and Directions Map of Sunnyvale City Hall external link Library Public Safety City Parks Golf Courses Swimming Pools Tennis Center Public Transportation Area Resources Community Resource Guide to Frequently Requested Services: English (pdf) | Spanish (pdf) City Budget Budget Documents Data and Demographics Business Demographics Economic Profile (pdf) Learn about Sunnyvale New Resident Guide City Council Information Sheet About This Site About the Site Site Map Privacy Policy Contact Us Whats New Hot Topics Plastic Bag Ban Charter Amendment Ballot Measures City Council Elections Firearms Retail Study Issue Charter Review Committee Medical Marijuana Dispensary Study Horizon 2035 Downtown Redevelopment Onizuka / BRAC Sustainability General Plan Consolidation LUTE Update City Managers Updates Meetings and Agendas Next Council Meeting: February 7 Council Meetings Boards and Commissions Latest Publications Fall Activity Guide Winter 2012 Quarterly Report 2010 New Resident Guide Jobs Current Job Openings Bid on Projects Current Open Bids Around the City Sunnyvale Cited As American City with Top Economic Potential Upcoming Events February 3 - Council Budget/Study Issues Workshop Feb 20 - City Offices are closed More Events on the Community Event Calendar Government City Council About Council Current Council Agenda Councilmembers Council Meeting Agendas Watch Council Meetings Online Making a Presentation at Council Boards and Commissions About Boards and Commissions Current Openings Arts Bicycle and Pedestrian Board of Library Trustees Building Code Appeals Heritage Preservation Housing and Human Services Parks and Recreation Personnel Planning Sustainability Study Issues 2012 Study Issues 2011 Study Issues 2010 Study Issues 2009 Study Issues Other Agencies County of Santa Clara external link State of California external link Codes and Policies City Charter Municipal Code Council Policy General Plan Elections City Clerk Elections Page Campaign Ethics Guide Departments City Attorney City Manager Community Development Community Services Environmental Services NOVA Workforce Services Finance Human Resources Information Technology Public Safety Public Works Sunnyvale Public Library Living Library Library Home Page Get a Library Card Search the Library Catalog Manage Your Library Account Getting Involved Volunteering Neighborhood Associations Recreation Golf Tennis Parks Classes and Activities Swimming Out and About Now Playing at the Performing Arts Center Shopping and Dining in Sunnyvale Community Event Calendar Community Centers Columbia Neighborhood Center Sunnyvale Community Center Sunnyvale Senior Center Resource Guides Resources for Unemployed Residents Community Resource Guide to Frequently Requested Services: English (pdf) | Spanish (pdf) Services City Infrastructure Traffic and Transportation Trees Street Maintenance Garbage and Recycling SMaRT Station Garbage and Recycling Services Extra Garbage Tags Housing Affordable Housing and Community Assistance Water Water Supply Water Pollution Control Plant (WPCP) Public Safety Police Fire Emergency Preparedness Alarms Code Enforcement Animal Control Crime Prevention Public Safety Records Public Safety Recruiting Permits Special Event Permits Residential Permits Non-residential Permits E-Onestop One-Stop Permit Center Permits, Plan Checks and Fees Encroachment Permits Tree Removal Permits Doing Business Sunnyvale for Business Starting a Business in Sunnyvale Facts and Figures Doing Business in Sunnyvale Economic Development Downtown Development Bid on Sunnyvale Projects The Bidding Process Current Open Bids Shop Sunnyvale Sunnyvale Auto Row Business Resources Available Commercial Properties E-One Stop Permit Center Business Licenses Business News and Reports Business Links Building Division Planning Division Newsroom Recent News Releases Apple to Occupy New Downtown Sunnyvale Offices Presidents Day Holiday Closures Meet Sunnyvale Local Author Francisco Jimenez Sunnyvale Reads the Muslim Next Door Arrest Made in Fatal Sunnyvale Hit-And-Run Le Jazz Hot in Sunnyvale February 11 Copper Wire Theft Hits City of Sunnyvale More News Releases Publications Quarterly Report Activity Guide Senior Activity Guide New Resident Guide Campaign Ethics Guide On Television KSUN Broadcast Schedule Social Media Follow Us on Twitter City of Sunnyvale Facebook Page I Want To . Apply For: Job Openings Boards and Commissions Business License Permits Animal License Special Event Permit Block Party Sign Up For: Garbage Service Recycling Service On-Call Garbage/Recycling Service Recreation Classes Golf Tee Times Facility Reservations Compost Workshop Pay For: My Utility Bill Library Fines Report: Code Violation Graffiti Pothole Public Safety Issue Traffic Issue General Issue Web Site Issue Dispose of: Old Medication E-Waste Other Hazardous Waste Extra Trash Find: Building Permits City Hall external link City Parks Sunnyvale Community Center Columbia Neighborhood Center Sunnyvale Senior Center Sunnyvale Public Library Department of Public Safety Places to Shop and Dine Watch: Council Meetings Online Other: Volunteer in the City Access Public Records File a Claim Help Improve This List Is something missing from this list that would make it an even more useful resource? Let us know! We welcome your suggestions. Contact us online. Friday, February 10, 2012 Share Your Comments Next Council Meeting * Tuesday, February 7, 2012 * Council Agendas * Watch the Meeting Online * Watch the Meeting on KSUN-15 * About Council Upcoming Events * February 10 - Strategic Planning Workshop * February 20 - City Holiday - Offices closed * Community Events Calendar Recent News Releases Apple to Occupy New Downtown Sunnyvale Offices Presidents Day Holiday Closures Meet Sunnyvale Local Author Francisco Jimenez Sunnyvale Reads the Muslim Next Door Arrest Made in Fatal Sunnyvale Hit-And-Run Le Jazz Hot in Sunnyvale February 11 More News Releases Featured City News Telephone Scam Claims to be from Sunnyvale Computer Department The City has received complaints of an apparent phone scam in which the caller identifies himself as representing the Sunnyvale Computer Department. The caller claims the victims computers Windows system is transmitting bad data. City of Sunnyvale does not make these kind of calls to the public and does not have a Computer Department. If you receive this type of scam call, report the call to a Public Safety Desk Officer at (408) 730-7110.Learn More Telephone Scam Claims to be from &#34;Sunnyvale Computer Department&#34; Apple to occupy new downtown Sunnyvale offices Sunnyvale City Manager Gary Luebbers has announced that Apple will join the ranks of high-tech businesses located in downtown Sunnyvale. Apple will occupy the new 156,000 square-foot Town Center Office building at Mathilda and McKinley avenues. ?This is an important step forward,? said Luebbers. Our entire community will benefit as we see yet another large gain in the Town Center redevelopment project.? Read more Learn More Apple to occupy new downtown Sunnyvale offices Tree Removal Planned by PG&amp;E along Caribbean Drive PG&amp;E will remove a number of large trees ? mostly eucalyptus ? along Caribbean Drive, February 6 ? 15. This is a safety measure for the underground gas pipeline. Young replacement trees will planted in the median; watch for traffic controls during this work. For more information, call PG&amp;E at (800) 743-5000 or go to http://pge.com/myhome/customerservice/Learn More Tree Removal Planned by PG&amp;amp;E along Caribbean Drive City Closed for Presidents Day Sunnyvale City offices and facilities, including the Sunnyvale Public Library, Community Center, Senior Center and Columbia Neighborhood Center will be closed Monday, February 20, in observance of the Presidents Day holiday. All traffic and parking regulations will be enforced, except for parking where signs specifically exempt holidays. Trash collection will continue on a normal schedule. Learn More City Closed for Presidents Day Latest City Managers Blog Posted In the latest edition of the City Managers Blog, Gary Luebbers gives an update on the 2012 Study Issues, a public tip leading to the arrest of local burglars, the Vargas Elementary School partnership with Public Safety for a Marathon Club for kids, latest street updates and the LED Streetlight project, free healthy toddler workshops at the Library, a new electronic Job Board from NOVA, and more Learn More Latest City Managers Blog Posted Planning Commission February Meeting Schedule Change PLEASE NOTE: The February Planning Commission meeting dates do not follow the typical schedule due to construction in the Council Chambers. The February Planning Commission Meetings are scheduled for: * MONDAY, FEBRUARY 6, 2012 * WEDNESDAY, FEBRUARY 29, 2012 Learn More Planning Commission February Meeting Schedule Change Artists Applications for 2012 Hands on the Arts Festival Now Posted Applications for artists to participate in the 2012 Hands on the Arts Festival in Sunnyvale on May 19 are now available for downloading. Click here to read more and download the application Learn More Artists Applications for 2012 Hands on the Arts Festival Now Posted Le Jazz Hot in Sunnyvale February 11 Acclaimed quartet performs Gypsy jazz la Django Reinhardt Le Jazz Hot, the quartet version of internationally recognized Le Hot Club of San Francisco, will perform a Valentine?s weekend concert of Django Reinhardt-style Gypsy jazz at Sunnyvale Theatre on Saturday, February 11, at 8 p.m. Read more Learn More Le Jazz Hot in Sunnyvale February 11 Sunnyvale Reads The Muslim Next Door Silicon Valley?s diverse religions and cultures have been celebrated each year by reading choices from Silicon Valley Reads, which continues to select provocative topics relevant to Santa Clara County. The theme for Silicon Valley Reads 2012 is ?Muslim and American.? Read more Learn More Sunnyvale Reads &#34;The Muslim Next Door&#34; KSUN-15 Off Air February 8-27 for Ugrades The Citys public access channel KSUN-15, which broadcasts Council and Planning Commission meetings, will be off-air starting February 8 to get an equipment upgrade, allowing the system to be compliant with current broadcast standards. KSUN will begin broadcasting again on February 27. No meetings are planned during this down time so no broadcasts will be missed. Learn More KSUN-15 Off Air February 8-27 for Ugrades Arrest Made in Fatal Hit-and-Run in Sunnyvale A fatal vehicle versus pedestrian accident occurred at the intersection of Sequoia Drive and Reed Avenue in Sunnyvale on January 5. The driver of a black SUV struck 72-year-old Benjamin Lin and did not stop. The Major Accident Investigation Team (MAIT) developed significant leads in the case which led to the identification of the driver and seizure of the vehicle. On January 19, a $500,000 arrest warrant was issued. Read more Learn More Arrest Made in Fatal Hit-and-Run in Sunnyvale Popular Links * Jobs * Sunnyvale Public Library * Register for Classes and Activities * Recycling and Garbage Information * Volunteer * Maps and Directions * Pay Your Utility Bill Online * November 2011 Library Events Calendar in PDF Pay Your Utility Bill Online Utility Bill Online Pay Doing Business * Bid on City Projects * Economic Development * Business Licenses * Apply for Permits Online Social Networks * +1 us on Google+ * City of Sunnyvale Twitter Account * City of Sunnyvale Facebook Page divider Hot Topics * Tennis Center * Plastic Bag Ban Final EIR * Firearms Sales Study Issue * Sunnyvale Works! * Downtown Development * Sustainability * Onizuka / BRAC * Local Hazard Mitigation Plan (LMPH) Subscribe * e-Notifications * City Managers Blog * RSS Feed * RSS Feed icon City of Sunnyvale (408) 730-7500 * Sunnyvale City Hall * 456 W. Olive Ave. * Sunnyvale, CA 94086 * TDD (408) 730-7501 * Map and Directions * City of Sunnyvale Logo Cant Find It? * Or check out the SITE MAP! Stay In Touch * Contact Us * Follow us on Twitter Frequently-Used Links * Jobs with the City * Library * Garbage and Recycling * Downtown Redevelopment * Economic Development * Register for Classes and Activities About the City * Welcome to Sunnyvale * City Charter and Policies * City Council * City Departments * New Resident Guide About the Web Site The City of Sunnyvale Web Site is maintained by the Sunnyvale Communications Office and the Department of Information Technology. Questions? Contact Us. The Fine Print * Privacy Policy * Terms of Use 2010 City of Sunnyvale</webtext><ip_keyword>[ ca ]</ip_keyword><addr>456 W Olive Ave</addr></document>
+</vespafeed>
diff --git a/vespa-http-client/src/test/resources/xml-challenge3.xml b/vespa-http-client/src/test/resources/xml-challenge3.xml
new file mode 100644
index 00000000000..bec5eb960f7
--- /dev/null
+++ b/vespa-http-client/src/test/resources/xml-challenge3.xml
@@ -0,0 +1,4 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<vespafeed>
+ <document documenttype="biz" transformver="5681" documentid="id:lsbe:biz::21336977"><attrlist>'&apos;</attrlist></document></vespafeed>