summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client-api/src/test
diff options
context:
space:
mode:
authorMorten Tokle <mortent@verizonmedia.com>2021-12-07 12:52:42 +0100
committerMorten Tokle <mortent@verizonmedia.com>2021-12-07 12:52:42 +0100
commit5e956429169d3a733114e5f76f051167f291c786 (patch)
treefa2b9cc664c8c639482397e9a4566149dac3ae29 /vespa-feed-client-api/src/test
parentae09069f544a086af4ae02a092ec66788a3cae9e (diff)
Extract vespa-feed-client-api module from vespa-feed-client
Diffstat (limited to 'vespa-feed-client-api/src/test')
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java240
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java90
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java115
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java34
4 files changed, 479 insertions, 0 deletions
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
new file mode 100644
index 00000000000..d795678db39
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
@@ -0,0 +1,240 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class JsonFeederTest {
+
+ @Test
+ void test() throws IOException {
+ int docs = 1 << 14;
+ String json = "[\n" +
+
+ IntStream.range(0, docs).mapToObj(i ->
+ " {\n" +
+ " \"id\": \"id:ns:type::abc" + i + "\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " },\n"
+ ).collect(joining()) +
+
+ " {\n" +
+ " \"id\": \"id:ns:type::abc" + docs + "\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " }\n" +
+ "]";
+ AtomicReference<FeedException> exceptionThrow = new AtomicReference<>();
+ Path tmpFile = Files.createTempFile(null, null);
+ Files.write(tmpFile, json.getBytes(UTF_8));
+ try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) {
+ AtomicInteger resultsReceived = new AtomicInteger();
+ AtomicBoolean completedSuccessfully = new AtomicBoolean();
+ long startNanos = System.nanoTime();
+ MockClient feedClient = new MockClient();
+ JsonFeeder.builder(feedClient).build()
+ .feedMany(in, 1 << 10,
+ new JsonFeeder.ResultCallback() {
+ @Override
+ public void onNextResult(Result result, FeedException error) { resultsReceived.incrementAndGet(); }
+
+ @Override
+ public void onError(FeedException error) { exceptionThrow.set(error); }
+
+ @Override
+ public void onComplete() { completedSuccessfully.set(true); }
+ })
+ .join();
+
+ System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+ assertEquals(docs + 1, feedClient.putOperations.size());
+ assertEquals(docs + 1, resultsReceived.get());
+ assertTrue(completedSuccessfully.get());
+ assertNull(exceptionThrow.get());
+ }
+ }
+
+ @Test
+ public void multipleJsonArrayOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException {
+ MockClient client = new MockClient();
+ try (JsonFeeder feeder = JsonFeeder.builder(client).build()) {
+ String json = "[{" +
+ " \"put\": \"id:ns:type::abc1\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ "},\n" +
+ "{" +
+ " \"put\": \"id:ns:type::abc2\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ "}]\n";
+ feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8))).get();
+ client.assertPutDocumentIds("abc1", "abc2");
+ client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}");
+ client.assertPutOperation("abc2", "{\"fields\":{\n \"lul\":\"lal\"\n }}");
+ }
+ }
+
+ @Test
+ public void multipleJsonLOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException {
+ MockClient client = new MockClient();
+ try (JsonFeeder feeder = JsonFeeder.builder(client).build()) {
+ String json = "{\n" +
+ " \"remove\": \"id:ns:type::abc1\"\n" +
+ "}\n" +
+ "{\n" +
+ " \"fields\": {\n" +
+ " \"lul\": { \"assign\": \"lal\" }\n" +
+ " },\n" +
+ " \"update\": \"id:ns:type::abc2\"\n" +
+ "}\n" +
+ "{\n" +
+ " \"put\": \"id:ns:type::abc3\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\": \"lal\"\n" +
+ " }\n" +
+ "}\n";
+
+ feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8)),
+ 3, // Mini-buffer, which needs to expand.
+ new JsonFeeder.ResultCallback() { })
+ .get();
+ client.assertRemoveDocumentIds("abc1");
+ client.assertUpdateDocumentIds("abc2");
+ client.assertUpdateOperation("abc2", "{\"fields\":{\n \"lul\": { \"assign\": \"lal\" }\n }}");
+ client.assertPutDocumentIds("abc3");
+ client.assertPutOperation("abc3", "{\"fields\":{\n \"lul\": \"lal\"\n }}");
+ }
+ }
+
+ @Test
+ public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException {
+ MockClient client = new MockClient();
+ try (JsonFeeder feeder = JsonFeeder.builder(client).build()) {
+ String json = "{\"put\": \"id:ns:type::abc1\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " }\n";
+ Result result = feeder.feedSingle(json).get();
+ Assertions.assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId());
+ assertEquals(Result.Type.success, result.type());
+ assertEquals("success", result.resultMessage().get());
+ client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}");
+ }
+ }
+
+ private static class MockClient implements FeedClient {
+ final Map<DocumentId, String> putOperations = new LinkedHashMap<>();
+ final Map<DocumentId, String> updateOperations = new LinkedHashMap<>();
+ final Map<DocumentId, String> removeOperations = new LinkedHashMap<>();
+
+ @Override
+ public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
+ putOperations.put(documentId, documentJson);
+ return createSuccessResult(documentId);
+ }
+
+ @Override
+ public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
+ updateOperations.put(documentId, updateJson);
+ return createSuccessResult(documentId);
+ }
+
+ @Override
+ public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
+ removeOperations.put(documentId, null);
+ return createSuccessResult(documentId);
+ }
+
+ @Override
+ public OperationStats stats() { return null; }
+
+ @Override
+ public CircuitBreaker.State circuitBreakerState() { return null; }
+
+ @Override
+ public void close(boolean graceful) { }
+
+ private CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
+ return CompletableFuture.completedFuture(new Result(){
+ @Override public Type type() { return Type.success; }
+ @Override public DocumentId documentId() { return documentId; }
+ @Override public Optional<String> resultMessage() { return Optional.of("success"); }
+ @Override public Optional<String> traceMessage() { return Optional.empty(); }
+ });
+ }
+
+ void assertDocumentIds(Collection<DocumentId> keys, String... expectedUserSpecificIds) {
+ List<String> expected = Arrays.stream(expectedUserSpecificIds)
+ .map(userSpecific -> "id:ns:type::" + userSpecific)
+ .sorted()
+ .collect(Collectors.toList());
+ List<String> actual = keys.stream()
+ .map(DocumentId::toString).sorted()
+ .collect(Collectors.toList());
+ assertEquals(expected, actual, "Document ids must match");
+ }
+
+ void assertPutDocumentIds(String... expectedUserSpecificIds) {
+ assertDocumentIds(putOperations.keySet(), expectedUserSpecificIds);
+ }
+
+ void assertUpdateDocumentIds(String... expectedUserSpecificIds) {
+ assertDocumentIds(updateOperations.keySet(), expectedUserSpecificIds);
+ }
+
+ void assertRemoveDocumentIds(String... expectedUserSpecificIds) {
+ assertDocumentIds(removeOperations.keySet(), expectedUserSpecificIds);
+ }
+
+ void assertPutOperation(String userSpecificId, String expectedJson) {
+ DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId);
+ String json = putOperations.get(docId);
+ assertNotNull(json);
+ assertEquals(expectedJson.trim(), json.trim());
+ }
+
+ void assertUpdateOperation(String userSpecificId, String expectedJson) {
+ DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId);
+ String json = updateOperations.get(docId);
+ assertNotNull(json);
+ assertEquals(expectedJson.trim(), json.trim());
+ }
+
+ }
+
+}
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
new file mode 100644
index 00000000000..b951fb62fb5
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
@@ -0,0 +1,90 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.FeedException;
+import ai.vespa.feed.client.JsonFeeder;
+import ai.vespa.feed.client.Result;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+/**
+ * Sample feeder demonstrating how to programmatically feed to a Vespa cluster.
+ */
+class JsonFileFeederExample implements Closeable {
+
+ private final static Logger log = Logger.getLogger(JsonFileFeederExample.class.getName());
+
+ private final JsonFeeder jsonFeeder;
+ private final URI endpoint;
+
+ static class ResultCallBack implements JsonFeeder.ResultCallback {
+
+ final AtomicInteger resultsReceived = new AtomicInteger(0);
+ final AtomicInteger errorsReceived = new AtomicInteger(0);
+ final long startTimeMillis = System.currentTimeMillis();;
+
+ @Override
+ public void onNextResult(Result result, FeedException error) {
+ resultsReceived.incrementAndGet();
+ if (error != null) {
+ log.warning("Problems with feeding document "
+ + error.documentId().map(DocumentId::toString).orElse("<unknown>")
+ + ": " + error);
+ errorsReceived.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onError(FeedException error) {
+ log.severe("Feeding failed fatally: " + error.getMessage());
+ }
+
+ @Override
+ public void onComplete() {
+ log.info("Feeding completed");
+ }
+
+ void dumpStatsToLog() {
+ log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors.");
+ log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms.");
+ }
+
+ }
+
+ JsonFileFeederExample(URI endpoint) {
+ this.endpoint = endpoint;
+ FeedClient feedClient = FeedClientBuilder.create(endpoint)
+ .build();
+ this.jsonFeeder = JsonFeeder.builder(feedClient)
+ .withTimeout(Duration.ofSeconds(30))
+ .build();
+ }
+
+ /**
+ * Feed all operations from a stream.
+ *
+ * @param stream The input stream to read operations from (JSON array containing one or more document operations).
+ */
+ void batchFeed(InputStream stream, String batchId) {
+ ResultCallBack callback = new ResultCallBack();
+ log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'");
+ CompletableFuture<Void> promise = jsonFeeder.feedMany(stream, callback);
+ promise.join(); // wait for feeding to complete
+ callback.dumpStatsToLog();
+ }
+
+ @Override
+ public void close() throws IOException {
+ jsonFeeder.close();
+ }
+}
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
new file mode 100644
index 00000000000..3d4ce150fcf
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
@@ -0,0 +1,115 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.Result;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Simple Streaming feeder implementation which will send operations to a Vespa endpoint.
+ * Other threads communicate with the feeder by adding new operations on the BlockingQueue
+ */
+
+class JsonStreamFeederExample extends Thread implements AutoCloseable {
+
+ static class Operation {
+ final String type;
+ final String documentId;
+ final String documentFieldsJson;
+
+ Operation(String type, String id, String fields) {
+ this.type = type;
+ this.documentId = id;
+ this.documentFieldsJson = fields;
+ }
+ }
+
+ private final static Logger log = Logger.getLogger(JsonStreamFeederExample.class.getName());
+
+ private final BlockingQueue<Operation> operations;
+ private final FeedClient feedClient;
+ private final AtomicBoolean drain = new AtomicBoolean(false);
+ private final CountDownLatch finishedDraining = new CountDownLatch(1);
+ private final AtomicInteger resultCounter = new AtomicInteger();
+
+ /**
+ * Constructor
+ * @param operations The shared blocking queue where other threads can put document operations to.
+ * @param endpoint The endpoint to feed to
+ */
+ JsonStreamFeederExample(BlockingQueue<JsonStreamFeederExample.Operation> operations, URI endpoint) {
+ this.operations = operations;
+ this.feedClient = FeedClientBuilder.create(endpoint).build();
+ }
+
+ /**
+ * Shutdown this feeder, waits until operations on queue is drained
+ */
+ @Override
+ public void close() {
+ log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size());
+ drain.set(true);
+ try {
+ finishedDraining.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!drain.get() || !operations.isEmpty()) {
+ try {
+ JsonStreamFeederExample.Operation op = operations.poll(1, TimeUnit.SECONDS);
+ if(op == null) // no operations available
+ continue;
+ log.info("Put document " + op.documentId);
+ CompletableFuture<Result> promise;
+ DocumentId docId = DocumentId.of(op.documentId);
+ OperationParameters params = OperationParameters.empty();
+ String json = op.documentFieldsJson;
+ switch (op.type) {
+ case "put":
+ promise = feedClient.put(docId, json, params);
+ break;
+ case "remove":
+ promise = feedClient.remove(docId, params);
+ break;
+ case "update":
+ promise = feedClient.update(docId, json, params);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid operation: " + op.type);
+ }
+ promise.whenComplete((result, throwable) -> {
+ if (resultCounter.getAndIncrement() % 10 == 0) {
+ System.err.println(feedClient.stats());
+ }
+ if (throwable != null) {
+ System.err.printf("Failure for '%s': %s", docId, throwable);
+ throwable.printStackTrace();
+ }
+ });
+ } catch (InterruptedException e) {
+ log.log(Level.SEVERE, "Got interrupt exception.", e);
+ break;
+ }
+ }
+ log.info("Shutting down feeding thread");
+ this.feedClient.close();
+ finishedDraining.countDown();
+ }
+
+} \ No newline at end of file
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java
new file mode 100644
index 00000000000..4e6473a6568
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.Result;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+class SimpleExample {
+
+ public static void main(String[] args) {
+ try (FeedClient client = FeedClientBuilder.create(URI.create("https://my-container-endpoint-with-http2:8080/")).build()) {
+ DocumentId id = DocumentId.of("namespace", "documenttype", "1");
+ String json = "{\"fields\": {\"title\": \"hello world\"}}";
+ OperationParameters params = OperationParameters.empty()
+ .timeout(Duration.ofSeconds(5))
+ .route("myvesparoute");
+ CompletableFuture<Result> promise = client.put(id, json, params);
+ promise.whenComplete(((result, throwable) -> {
+ if (throwable != null) {
+ throwable.printStackTrace();
+ } else {
+ System.out.printf("'%s' for document '%s': %s%n", result.type(), result.documentId(), result.resultMessage());
+ }
+ }));
+ }
+ }
+
+}