diff options
author | Jon Bratseth <bratseth@oath.com> | 2018-12-05 16:45:33 -0800 |
---|---|---|
committer | Jon Bratseth <bratseth@oath.com> | 2018-12-05 16:45:33 -0800 |
commit | 65d6c4a70969e0ced79700f300d4633fd5419a87 (patch) | |
tree | 63102721848c6b0aeb02acb705211c44c6793908 /vespa-http-client | |
parent | 3b43551a5da2954643fb7534dbf801d107ff1adc (diff) |
Add simple sync wrapper client
Diffstat (limited to 'vespa-http-client')
9 files changed, 301 insertions, 37 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java index c5af7a49570..e35c0316433 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java @@ -10,8 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * API for feeding document operations (add, removes or updates) to one or many Vespa clusters. * Use the factory to configure and set up an instance of this API. - * The feedclient does automatically error recovery and reconnects to hosts when - * connections die. + * The feedclient does automatic error recovery and reconnects to hosts when connections die. * * A {@link FeedClientFactory} is provided to instantiate Sessions. * @@ -26,8 +25,8 @@ public interface FeedClient extends AutoCloseable { * Documents might time out before they are sent. Failed documents are not retried. * Don't call stream() after close is called. * - * @param documentId Document id of the document. - * @param documentData The document data as JSON or XML (as specified when using the factory to create the API) + * @param documentId the document id of the document. + * @param documentData the document data as JSON or XML (as specified when using the factory to create the API) */ void stream(String documentId, CharSequence documentData); @@ -36,9 +35,9 @@ public interface FeedClient extends AutoCloseable { * Documents might time out before they are sent. Failed documents are not retried. * Don't call stream() after close is called. * - * @param documentId Document id of the document. - * @param documentData The document data as JSON or XML (as specified when using the factory to create the API) - * @param context Any context, will be accessible in the result of the callback. + * @param documentId the document id of the document. + * @param documentData the document data as JSON or XML (as specified when using the factory to create the API) + * @param context a context object which will be accessible in the result of the callback, or null if none */ void stream(String documentId, CharSequence documentData, Object context); @@ -51,6 +50,7 @@ public interface FeedClient extends AutoCloseable { * There is an example implementation in class SimpleLoggerResultCallback. */ interface ResultCallback { + void onCompletion(String docId, Result documentResult); /** @@ -64,6 +64,7 @@ public interface FeedClient extends AutoCloseable { */ // TODO Vespa 7: Remove empty default implementation default void onEndpointException(FeedEndpointException exception) {} + } /** @@ -83,9 +84,10 @@ public interface FeedClient extends AutoCloseable { /** * Utility function that takes an array of JSON documents and calls the FeedClient for each element. * - * @param inputStream This can be a very large stream. The outer element is an array (of document operations). - * @param feedClient The feedClient that will receive the document operations. - * @param numSent increased per document sent to API (but no waiting for results). + * @param inputStream the stream to feed. This can be a very large stream. + * The outer element must be an array of document operations. + * @param feedClient the feed client that will receive the document operations + * @param numSent increased per document sent to API (but not waiting for results) */ static void feedJson(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) { JsonReader.read(inputStream, feedClient, numSent); @@ -96,9 +98,10 @@ public interface FeedClient extends AutoCloseable { * The XML document has to be formatted with line space on each line (like "regular" XML, but stricter * than the specifications of XML). * - * @param inputStream This can be a very large stream. - * @param feedClient The feedClient that will receive the document operations. - * @param numSent increased per document sent to API (but no waiting for results). + * @param inputStream the stream to feed. This can be a very large stream. Operations must be enclosed in a + * top-level <vespafeed> tag + * @param feedClient the feed client that will receive the document operations + * @param numSent increased per document sent to API (but not waiting for results) */ static void feedXml(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) { try { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java new file mode 100644 index 00000000000..5375b19afdf --- /dev/null +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java @@ -0,0 +1,177 @@ +package com.yahoo.vespa.http.client; + +import com.google.common.annotations.Beta; +import com.yahoo.vespa.http.client.config.SessionParams; + +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A utility wrapper of a FeedClient which feeds a list of documents and blocks until all responses are returned, + * before returning the results. + * + * Not multithread safe: A sync feed client instance can only be used by a single thread + * (but it can and should be reused for multiple subsequent synchronous calls). + * + * @author bratseth + */ +@Beta +public class SyncFeedClient implements AutoCloseable { + + private final FeedClient wrappedClient; + private final Callback callback; + + public SyncFeedClient(SessionParams sessionParams) { + callback = new SyncFeedClient.Callback(); + this.wrappedClient = FeedClientFactory.create(sessionParams, callback); + } + + /** + * Calls FeedClient.stream for each entry in the list, blocks until all results are ready and returns them. + * This will block for at most the time it takes to feed these operations + clientTimeout given in the + * sessions params when creating this. + * + * @param operations the Vespa write operations to stream + * @return the result of feeding all these operations + */ + public SyncResult stream(List<SyncOperation> operations) { + callback.expectResultsOf(operations); + for (SyncOperation operation : operations) + wrappedClient.stream(operation.documentId, operation.documentData, operation.context); + return callback.waitForResults(); + } + + @Override + public void close() { + wrappedClient.close(); + } + + /** Holds the arguments to a single stream operation */ + public static class SyncOperation { + + private final String documentId; + private final CharSequence documentData; + private final Object context; + + public SyncOperation(String documentId, CharSequence documentData) { + this(documentId, documentData, null); + } + + public SyncOperation(String documentId, CharSequence documentData, Object context) { + this.documentId = Objects.requireNonNull(documentId, "documentId"); + this.documentData = Objects.requireNonNull(documentData, "documentData"); + this.context = context; + } + + } + + /** + * The result of a SyncFeedClient.stream call. This always holds exactly one Result per SyncOperation + * attempted, and the results are guaranteed to be returned in the same order as in the List of SyncOperations. + */ + public static class SyncResult { + + private final Exception exception; + private final List<Result> results; + + private SyncResult(List<Result> results, Exception exception) { + this.results = results; + this.exception = exception; + } + + /** + * Returns the results of this. This has the same size and order as the List of SyncOperations that + * created this. The list returned is modifiable and owned by the client. Multiple calls to this returns the + * same list instance. + */ + public List<Result> results() { return results; } + + /** + * Returns the last exception received when attempting the operations this is the result of, or null if none. + * Even if there is an exception, results() will return one Result per operation attempted. + */ + public Exception exception() { return exception; } + + /** Returns true if all Results in this are successful */ + public boolean isSuccess() { + return results.stream().allMatch(Result::isSuccess); + } + + } + + private static class Callback implements FeedClient.ResultCallback { + + private final Object monitor = new Object(); + + // The rest of the state of this is reset each time we call expectResultsOf + + private int resultsReceived; + private Exception exception = null; + + /** + * A map from document ids to their results. This is initially populated with null values to keep track of + * which responses we are waiting for. + */ + private Map<String, Result> results = null; + + void expectResultsOf(List<SyncOperation> operations) { + synchronized (monitor) { + if (results != null) + throw new ConcurrentModificationException("A SyncFeedClient instance is used by multiple threads"); + + resultsReceived = 0; + exception = null; + results = new LinkedHashMap<>(operations.size()); + for (SyncOperation operation : operations) + results.put(operation.documentId, null); + } + } + + SyncResult waitForResults() { + try { + synchronized (monitor) { + while ( ! complete()) + monitor.wait(); + + SyncResult syncResult = new SyncResult(new ArrayList<>(results.values()), exception); + results = null; + return syncResult; + } + } + catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for feeding results", e); + } + } + + @Override + public void onCompletion(String docId, Result documentResult) { + synchronized (monitor) { + if ( ! results.containsKey(docId)) return; // Not expecting this result - ignore + + Result previousValue = results.put(docId, documentResult); + if (previousValue != null) + throw new IllegalStateException("Received duplicate result for " + docId); + + resultsReceived++; + if (complete()) + monitor.notifyAll(); + } + } + + @Override + public void onEndpointException(FeedEndpointException exception) { + this.exception = exception; // We will still receive one onCompletion per stream invocation done + } + + private boolean complete() { + return resultsReceived == results.size(); + } + + } + +} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java index d10480eb183..30cad36e98a 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java @@ -31,6 +31,7 @@ public class JsonReader { /** * Process one inputstream and send all documents to feedclient. + * * @param inputStream source of array of json document. * @param feedClient where data is sent. * @param numSent counter to be incremented for every document streamed. @@ -95,6 +96,7 @@ public class JsonReader { /** * This is for throwing away [ and spaces in front of a json object, and find the position of {. * Not for parsing much text. + * * @return position for { */ public int findNextObjectStart() { @@ -192,6 +194,7 @@ public class JsonReader { /** * Parse one document from the stream and return doc id. + * * @param jParser parser with stream. * @return doc id of document or null if no more docs. * @throws IOException on problems @@ -202,7 +205,7 @@ public class JsonReader { boolean foundObject = false; boolean valueIsDocumentId = false; while (jParser.nextToken() != null) { - final String tokenAsText = jParser.getText(); + String tokenAsText = jParser.getText(); if (valueIsDocumentId) { if (documentId != null) { throw new RuntimeException("Several document ids"); @@ -224,9 +227,9 @@ public class JsonReader { case FIELD_NAME: if (objectLevel == 1 && (tokenAsText.equals("put") - || tokenAsText.endsWith("id") - || tokenAsText.endsWith("update") - || tokenAsText.equals("remove"))) { + || tokenAsText.endsWith("id") + || tokenAsText.endsWith("update") + || tokenAsText.equals("remove"))) { valueIsDocumentId = true; } break; @@ -237,4 +240,5 @@ public class JsonReader { throw new EOFException("No more documents"); return null; } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java index 670b30f880d..2c2489cf234 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java @@ -23,7 +23,6 @@ public class XmlFeedReader { private XmlFeedReader() {} public static void read(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) throws Exception { - SAXParserFactory parserFactory = SAXParserFactory.newInstance(); // XXE prevention: parserFactory.setFeature("http://xml.org/sax/features/external-general-entities", false); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java index 16ef8a4bbd2..b60b9fa89d3 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java @@ -27,8 +27,9 @@ public class FeedClientImpl implements FeedClient { private final long closeTimeoutMs; private final long sleepTimeMs = 500; - public FeedClientImpl( - SessionParams sessionParams, ResultCallback resultCallback, ScheduledThreadPoolExecutor timeoutExecutor) { + public FeedClientImpl(SessionParams sessionParams, + ResultCallback resultCallback, + ScheduledThreadPoolExecutor timeoutExecutor) { this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) * ( sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) + sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS)); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 488fe41d7ff..7ead0c4a37f 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -211,7 +211,7 @@ public class OperationProcessor { } public void resultReceived(EndpointResult endpointResult, int clusterId) { - final Result result = process(endpointResult, clusterId); + Result result = process(endpointResult, clusterId); if (result != null) { incompleteResultsThrottler.resultReady(result.isSuccess()); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java index 8d6530d8305..33afc759f59 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java @@ -4,7 +4,6 @@ package com.yahoo.vespa.http.client; import com.yahoo.vespa.http.client.config.Cluster; import com.yahoo.vespa.http.client.config.ConnectionParams; import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; import com.yahoo.vespa.http.client.config.SessionParams; import com.yahoo.vespa.http.client.core.api.FeedClientImpl; import org.junit.Test; @@ -12,14 +11,14 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Tests for the API, using dryrun option to mock gateway. + * * @author dybis */ public class FeedClientTest { @@ -36,18 +35,18 @@ public class FeedClientTest { .build(); final AtomicInteger resultsReceived = new AtomicInteger(0); FeedClient.ResultCallback resultCallback = (docId, documentResult) -> { - assert(documentResult.isSuccess()); - assertThat(docId, is(DOCID)); + assertTrue(documentResult.isSuccess()); + assertEquals(DOCID, docId); resultsReceived.incrementAndGet(); }; FeedClient feedClient = new FeedClientImpl(sessionParams, resultCallback, SessionFactory.createTimeoutExecutor()); @Test - public void testStreamAndClose() throws Exception { + public void testStreamAndClose() { feedClient.stream(DOCID, "blob"); feedClient.close(); - assertThat(resultsReceived.get(), is(1)); + assertEquals(1, resultsReceived.get()); } @Test @@ -60,25 +59,27 @@ public class FeedClientTest { } @Test - public void testFeedJson() throws Exception { + public void testFeedJson() { InputStream stream = new ByteArrayInputStream((String.format("[{\"remove\": \"%s\"}]", DOCID) .getBytes(StandardCharsets.UTF_8))); AtomicInteger docCounter = new AtomicInteger(0); FeedClient.feedJson(stream, feedClient, docCounter); - assertThat(docCounter.get(), is(1)); + assertEquals(1, docCounter.get()); feedClient.close(); - assertThat(resultsReceived.get(), is(1)); + assertEquals(1, resultsReceived.get()); } @Test - public void testFeedXml() throws Exception { + public void testFeedXml() { InputStream stream = new ByteArrayInputStream((String.format( "<document documenttype=\"music\" documentid=\"%s\">\n</document>\n", DOCID) .getBytes(StandardCharsets.UTF_8))); AtomicInteger docCounter = new AtomicInteger(0); FeedClient.feedXml(stream, feedClient, docCounter); - assertThat(docCounter.get(), is(1)); + assertEquals(1, docCounter.get()); feedClient.close(); - assertThat(resultsReceived.get(), is(1)); + assertEquals(1, resultsReceived.get()); } + } + diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java new file mode 100644 index 00000000000..c7787b55049 --- /dev/null +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java @@ -0,0 +1,79 @@ +package com.yahoo.vespa.http.client; + +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.SessionParams; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import com.yahoo.vespa.http.client.SyncFeedClient.SyncOperation; +import com.yahoo.vespa.http.client.SyncFeedClient.SyncResult; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; + +/** + * Tests the sync wrapper to the feed client + * + * @author bratseth + */ +public class SyncFeedClientTest { + + @Test + public void testFeedJson() { + SessionParams sessionParams = new SessionParams.Builder() + .addCluster(new Cluster.Builder() + .addEndpoint(Endpoint.create("hostname")) + .build()) + .setConnectionParams(new ConnectionParams.Builder() + .setDryRun(true) + .build()) + .build(); + SyncFeedClient feedClient = new SyncFeedClient(sessionParams); + + + assertFeedSuccessful(feedClient); + assertFeedSuccessful(feedClient); // ensure the client can be reused + feedClient.close(); + } + + private void assertFeedSuccessful(SyncFeedClient feedClient) { + List<SyncOperation> operations = new ArrayList<>(); + + operations.add(new SyncOperation("id::test::1", + "{" + + " \"put\": \"id::test::1\"," + + " \"fields\": {" + + " \"title\": \"Title 1\"" + + " }" + + "}")); + operations.add(new SyncOperation("id::test::2", + "{" + + " \"put\": \"id::test::2\"," + + " \"fields\": {" + + " \"title\": \"Title 2\"" + + " }" + + "}")); + operations.add(new SyncOperation("id::test::3", + "{" + + " \"put\": \"id::test::3\"," + + " \"fields\": {" + + " \"title\": \"Title 3\"" + + " }" + + "}")); + + SyncResult result = feedClient.stream(operations); + + assertTrue(result.isSuccess()); + assertEquals(3, result.results().size()); + assertNull(result.exception()); + assertEquals("id::test::1", result.results().get(0).getDocumentId()); + assertEquals("id::test::2", result.results().get(1).getDocumentId()); + assertEquals("id::test::3", result.results().get(2).getDocumentId()); + } + +} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java index 09e8af5e113..594a49e7d91 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java @@ -25,8 +25,8 @@ import static org.junit.Assert.assertThat; /** * Only runs on screwdriver to save time! - * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> - * @since 5.1.27 + * + * @author Einar M R Rosenvinge */ public class V3HttpAPITest extends TestOnCiBuildingSystemOnly { |