summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@oath.com>2018-12-05 16:45:33 -0800
committerJon Bratseth <bratseth@oath.com>2018-12-05 16:45:33 -0800
commit65d6c4a70969e0ced79700f300d4633fd5419a87 (patch)
tree63102721848c6b0aeb02acb705211c44c6793908 /vespa-http-client
parent3b43551a5da2954643fb7534dbf801d107ff1adc (diff)
Add simple sync wrapper client
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java29
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java177
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java12
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java5
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java2
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java29
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java79
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java4
9 files changed, 301 insertions, 37 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java
index c5af7a49570..e35c0316433 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java
@@ -10,8 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* API for feeding document operations (add, removes or updates) to one or many Vespa clusters.
* Use the factory to configure and set up an instance of this API.
- * The feedclient does automatically error recovery and reconnects to hosts when
- * connections die.
+ * The feedclient does automatic error recovery and reconnects to hosts when connections die.
*
* A {@link FeedClientFactory} is provided to instantiate Sessions.
*
@@ -26,8 +25,8 @@ public interface FeedClient extends AutoCloseable {
* Documents might time out before they are sent. Failed documents are not retried.
* Don't call stream() after close is called.
*
- * @param documentId Document id of the document.
- * @param documentData The document data as JSON or XML (as specified when using the factory to create the API)
+ * @param documentId the document id of the document.
+ * @param documentData the document data as JSON or XML (as specified when using the factory to create the API)
*/
void stream(String documentId, CharSequence documentData);
@@ -36,9 +35,9 @@ public interface FeedClient extends AutoCloseable {
* Documents might time out before they are sent. Failed documents are not retried.
* Don't call stream() after close is called.
*
- * @param documentId Document id of the document.
- * @param documentData The document data as JSON or XML (as specified when using the factory to create the API)
- * @param context Any context, will be accessible in the result of the callback.
+ * @param documentId the document id of the document.
+ * @param documentData the document data as JSON or XML (as specified when using the factory to create the API)
+ * @param context a context object which will be accessible in the result of the callback, or null if none
*/
void stream(String documentId, CharSequence documentData, Object context);
@@ -51,6 +50,7 @@ public interface FeedClient extends AutoCloseable {
* There is an example implementation in class SimpleLoggerResultCallback.
*/
interface ResultCallback {
+
void onCompletion(String docId, Result documentResult);
/**
@@ -64,6 +64,7 @@ public interface FeedClient extends AutoCloseable {
*/
// TODO Vespa 7: Remove empty default implementation
default void onEndpointException(FeedEndpointException exception) {}
+
}
/**
@@ -83,9 +84,10 @@ public interface FeedClient extends AutoCloseable {
/**
* Utility function that takes an array of JSON documents and calls the FeedClient for each element.
*
- * @param inputStream This can be a very large stream. The outer element is an array (of document operations).
- * @param feedClient The feedClient that will receive the document operations.
- * @param numSent increased per document sent to API (but no waiting for results).
+ * @param inputStream the stream to feed. This can be a very large stream.
+ * The outer element must be an array of document operations.
+ * @param feedClient the feed client that will receive the document operations
+ * @param numSent increased per document sent to API (but not waiting for results)
*/
static void feedJson(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) {
JsonReader.read(inputStream, feedClient, numSent);
@@ -96,9 +98,10 @@ public interface FeedClient extends AutoCloseable {
* The XML document has to be formatted with line space on each line (like "regular" XML, but stricter
* than the specifications of XML).
*
- * @param inputStream This can be a very large stream.
- * @param feedClient The feedClient that will receive the document operations.
- * @param numSent increased per document sent to API (but no waiting for results).
+ * @param inputStream the stream to feed. This can be a very large stream. Operations must be enclosed in a
+ * top-level &lt;vespafeed&gt; tag
+ * @param feedClient the feed client that will receive the document operations
+ * @param numSent increased per document sent to API (but not waiting for results)
*/
static void feedXml(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) {
try {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java
new file mode 100644
index 00000000000..5375b19afdf
--- /dev/null
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java
@@ -0,0 +1,177 @@
+package com.yahoo.vespa.http.client;
+
+import com.google.common.annotations.Beta;
+import com.yahoo.vespa.http.client.config.SessionParams;
+
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A utility wrapper of a FeedClient which feeds a list of documents and blocks until all responses are returned,
+ * before returning the results.
+ *
+ * Not multithread safe: A sync feed client instance can only be used by a single thread
+ * (but it can and should be reused for multiple subsequent synchronous calls).
+ *
+ * @author bratseth
+ */
+@Beta
+public class SyncFeedClient implements AutoCloseable {
+
+ private final FeedClient wrappedClient;
+ private final Callback callback;
+
+ public SyncFeedClient(SessionParams sessionParams) {
+ callback = new SyncFeedClient.Callback();
+ this.wrappedClient = FeedClientFactory.create(sessionParams, callback);
+ }
+
+ /**
+ * Calls FeedClient.stream for each entry in the list, blocks until all results are ready and returns them.
+ * This will block for at most the time it takes to feed these operations + clientTimeout given in the
+ * sessions params when creating this.
+ *
+ * @param operations the Vespa write operations to stream
+ * @return the result of feeding all these operations
+ */
+ public SyncResult stream(List<SyncOperation> operations) {
+ callback.expectResultsOf(operations);
+ for (SyncOperation operation : operations)
+ wrappedClient.stream(operation.documentId, operation.documentData, operation.context);
+ return callback.waitForResults();
+ }
+
+ @Override
+ public void close() {
+ wrappedClient.close();
+ }
+
+ /** Holds the arguments to a single stream operation */
+ public static class SyncOperation {
+
+ private final String documentId;
+ private final CharSequence documentData;
+ private final Object context;
+
+ public SyncOperation(String documentId, CharSequence documentData) {
+ this(documentId, documentData, null);
+ }
+
+ public SyncOperation(String documentId, CharSequence documentData, Object context) {
+ this.documentId = Objects.requireNonNull(documentId, "documentId");
+ this.documentData = Objects.requireNonNull(documentData, "documentData");
+ this.context = context;
+ }
+
+ }
+
+ /**
+ * The result of a SyncFeedClient.stream call. This always holds exactly one Result per SyncOperation
+ * attempted, and the results are guaranteed to be returned in the same order as in the List of SyncOperations.
+ */
+ public static class SyncResult {
+
+ private final Exception exception;
+ private final List<Result> results;
+
+ private SyncResult(List<Result> results, Exception exception) {
+ this.results = results;
+ this.exception = exception;
+ }
+
+ /**
+ * Returns the results of this. This has the same size and order as the List of SyncOperations that
+ * created this. The list returned is modifiable and owned by the client. Multiple calls to this returns the
+ * same list instance.
+ */
+ public List<Result> results() { return results; }
+
+ /**
+ * Returns the last exception received when attempting the operations this is the result of, or null if none.
+ * Even if there is an exception, results() will return one Result per operation attempted.
+ */
+ public Exception exception() { return exception; }
+
+ /** Returns true if all Results in this are successful */
+ public boolean isSuccess() {
+ return results.stream().allMatch(Result::isSuccess);
+ }
+
+ }
+
+ private static class Callback implements FeedClient.ResultCallback {
+
+ private final Object monitor = new Object();
+
+ // The rest of the state of this is reset each time we call expectResultsOf
+
+ private int resultsReceived;
+ private Exception exception = null;
+
+ /**
+ * A map from document ids to their results. This is initially populated with null values to keep track of
+ * which responses we are waiting for.
+ */
+ private Map<String, Result> results = null;
+
+ void expectResultsOf(List<SyncOperation> operations) {
+ synchronized (monitor) {
+ if (results != null)
+ throw new ConcurrentModificationException("A SyncFeedClient instance is used by multiple threads");
+
+ resultsReceived = 0;
+ exception = null;
+ results = new LinkedHashMap<>(operations.size());
+ for (SyncOperation operation : operations)
+ results.put(operation.documentId, null);
+ }
+ }
+
+ SyncResult waitForResults() {
+ try {
+ synchronized (monitor) {
+ while ( ! complete())
+ monitor.wait();
+
+ SyncResult syncResult = new SyncResult(new ArrayList<>(results.values()), exception);
+ results = null;
+ return syncResult;
+ }
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for feeding results", e);
+ }
+ }
+
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ synchronized (monitor) {
+ if ( ! results.containsKey(docId)) return; // Not expecting this result - ignore
+
+ Result previousValue = results.put(docId, documentResult);
+ if (previousValue != null)
+ throw new IllegalStateException("Received duplicate result for " + docId);
+
+ resultsReceived++;
+ if (complete())
+ monitor.notifyAll();
+ }
+ }
+
+ @Override
+ public void onEndpointException(FeedEndpointException exception) {
+ this.exception = exception; // We will still receive one onCompletion per stream invocation done
+ }
+
+ private boolean complete() {
+ return resultsReceived == results.size();
+ }
+
+ }
+
+}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java
index d10480eb183..30cad36e98a 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java
@@ -31,6 +31,7 @@ public class JsonReader {
/**
* Process one inputstream and send all documents to feedclient.
+ *
* @param inputStream source of array of json document.
* @param feedClient where data is sent.
* @param numSent counter to be incremented for every document streamed.
@@ -95,6 +96,7 @@ public class JsonReader {
/**
* This is for throwing away [ and spaces in front of a json object, and find the position of {.
* Not for parsing much text.
+ *
* @return position for {
*/
public int findNextObjectStart() {
@@ -192,6 +194,7 @@ public class JsonReader {
/**
* Parse one document from the stream and return doc id.
+ *
* @param jParser parser with stream.
* @return doc id of document or null if no more docs.
* @throws IOException on problems
@@ -202,7 +205,7 @@ public class JsonReader {
boolean foundObject = false;
boolean valueIsDocumentId = false;
while (jParser.nextToken() != null) {
- final String tokenAsText = jParser.getText();
+ String tokenAsText = jParser.getText();
if (valueIsDocumentId) {
if (documentId != null) {
throw new RuntimeException("Several document ids");
@@ -224,9 +227,9 @@ public class JsonReader {
case FIELD_NAME:
if (objectLevel == 1 &&
(tokenAsText.equals("put")
- || tokenAsText.endsWith("id")
- || tokenAsText.endsWith("update")
- || tokenAsText.equals("remove"))) {
+ || tokenAsText.endsWith("id")
+ || tokenAsText.endsWith("update")
+ || tokenAsText.equals("remove"))) {
valueIsDocumentId = true;
}
break;
@@ -237,4 +240,5 @@ public class JsonReader {
throw new EOFException("No more documents");
return null;
}
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java
index 670b30f880d..2c2489cf234 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java
@@ -23,7 +23,6 @@ public class XmlFeedReader {
private XmlFeedReader() {}
public static void read(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) throws Exception {
-
SAXParserFactory parserFactory = SAXParserFactory.newInstance();
// XXE prevention:
parserFactory.setFeature("http://xml.org/sax/features/external-general-entities", false);
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
index 16ef8a4bbd2..b60b9fa89d3 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
@@ -27,8 +27,9 @@ public class FeedClientImpl implements FeedClient {
private final long closeTimeoutMs;
private final long sleepTimeMs = 500;
- public FeedClientImpl(
- SessionParams sessionParams, ResultCallback resultCallback, ScheduledThreadPoolExecutor timeoutExecutor) {
+ public FeedClientImpl(SessionParams sessionParams,
+ ResultCallback resultCallback,
+ ScheduledThreadPoolExecutor timeoutExecutor) {
this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) * (
sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) +
sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS));
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 488fe41d7ff..7ead0c4a37f 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -211,7 +211,7 @@ public class OperationProcessor {
}
public void resultReceived(EndpointResult endpointResult, int clusterId) {
- final Result result = process(endpointResult, clusterId);
+ Result result = process(endpointResult, clusterId);
if (result != null) {
incompleteResultsThrottler.resultReady(result.isSuccess());
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java
index 8d6530d8305..33afc759f59 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java
@@ -4,7 +4,6 @@ package com.yahoo.vespa.http.client;
import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.ConnectionParams;
import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.FeedParams;
import com.yahoo.vespa.http.client.config.SessionParams;
import com.yahoo.vespa.http.client.core.api.FeedClientImpl;
import org.junit.Test;
@@ -12,14 +11,14 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Tests for the API, using dryrun option to mock gateway.
+ *
* @author dybis
*/
public class FeedClientTest {
@@ -36,18 +35,18 @@ public class FeedClientTest {
.build();
final AtomicInteger resultsReceived = new AtomicInteger(0);
FeedClient.ResultCallback resultCallback = (docId, documentResult) -> {
- assert(documentResult.isSuccess());
- assertThat(docId, is(DOCID));
+ assertTrue(documentResult.isSuccess());
+ assertEquals(DOCID, docId);
resultsReceived.incrementAndGet();
};
FeedClient feedClient = new FeedClientImpl(sessionParams, resultCallback, SessionFactory.createTimeoutExecutor());
@Test
- public void testStreamAndClose() throws Exception {
+ public void testStreamAndClose() {
feedClient.stream(DOCID, "blob");
feedClient.close();
- assertThat(resultsReceived.get(), is(1));
+ assertEquals(1, resultsReceived.get());
}
@Test
@@ -60,25 +59,27 @@ public class FeedClientTest {
}
@Test
- public void testFeedJson() throws Exception {
+ public void testFeedJson() {
InputStream stream = new ByteArrayInputStream((String.format("[{\"remove\": \"%s\"}]", DOCID)
.getBytes(StandardCharsets.UTF_8)));
AtomicInteger docCounter = new AtomicInteger(0);
FeedClient.feedJson(stream, feedClient, docCounter);
- assertThat(docCounter.get(), is(1));
+ assertEquals(1, docCounter.get());
feedClient.close();
- assertThat(resultsReceived.get(), is(1));
+ assertEquals(1, resultsReceived.get());
}
@Test
- public void testFeedXml() throws Exception {
+ public void testFeedXml() {
InputStream stream = new ByteArrayInputStream((String.format(
"<document documenttype=\"music\" documentid=\"%s\">\n</document>\n", DOCID)
.getBytes(StandardCharsets.UTF_8)));
AtomicInteger docCounter = new AtomicInteger(0);
FeedClient.feedXml(stream, feedClient, docCounter);
- assertThat(docCounter.get(), is(1));
+ assertEquals(1, docCounter.get());
feedClient.close();
- assertThat(resultsReceived.get(), is(1));
+ assertEquals(1, resultsReceived.get());
}
+
}
+
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java
new file mode 100644
index 00000000000..c7787b55049
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java
@@ -0,0 +1,79 @@
+package com.yahoo.vespa.http.client;
+
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.yahoo.vespa.http.client.SyncFeedClient.SyncOperation;
+import com.yahoo.vespa.http.client.SyncFeedClient.SyncResult;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests the sync wrapper to the feed client
+ *
+ * @author bratseth
+ */
+public class SyncFeedClientTest {
+
+ @Test
+ public void testFeedJson() {
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder()
+ .addEndpoint(Endpoint.create("hostname"))
+ .build())
+ .setConnectionParams(new ConnectionParams.Builder()
+ .setDryRun(true)
+ .build())
+ .build();
+ SyncFeedClient feedClient = new SyncFeedClient(sessionParams);
+
+
+ assertFeedSuccessful(feedClient);
+ assertFeedSuccessful(feedClient); // ensure the client can be reused
+ feedClient.close();
+ }
+
+ private void assertFeedSuccessful(SyncFeedClient feedClient) {
+ List<SyncOperation> operations = new ArrayList<>();
+
+ operations.add(new SyncOperation("id::test::1",
+ "{" +
+ " \"put\": \"id::test::1\"," +
+ " \"fields\": {" +
+ " \"title\": \"Title 1\"" +
+ " }" +
+ "}"));
+ operations.add(new SyncOperation("id::test::2",
+ "{" +
+ " \"put\": \"id::test::2\"," +
+ " \"fields\": {" +
+ " \"title\": \"Title 2\"" +
+ " }" +
+ "}"));
+ operations.add(new SyncOperation("id::test::3",
+ "{" +
+ " \"put\": \"id::test::3\"," +
+ " \"fields\": {" +
+ " \"title\": \"Title 3\"" +
+ " }" +
+ "}"));
+
+ SyncResult result = feedClient.stream(operations);
+
+ assertTrue(result.isSuccess());
+ assertEquals(3, result.results().size());
+ assertNull(result.exception());
+ assertEquals("id::test::1", result.results().get(0).getDocumentId());
+ assertEquals("id::test::2", result.results().get(1).getDocumentId());
+ assertEquals("id::test::3", result.results().get(2).getDocumentId());
+ }
+
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
index 09e8af5e113..594a49e7d91 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
@@ -25,8 +25,8 @@ import static org.junit.Assert.assertThat;
/**
* Only runs on screwdriver to save time!
- * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
- * @since 5.1.27
+ *
+ * @author Einar M R Rosenvinge
*/
public class V3HttpAPITest extends TestOnCiBuildingSystemOnly {