summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-15 14:43:48 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-15 15:10:11 +0200
commitc216ff20854a4b51e3ceac615f978688f4d60e3b (patch)
tree841dc25b964410ca87bcdeeeed4711021f4d844c /vespa-feed-client
parentf986f75c5bb3f587bb39b290b78f335e24641e03 (diff)
Add 3 examples
The two more complex examples are copied from vespa-http-client documentation.
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java89
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java146
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java34
3 files changed, 269 insertions, 0 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
new file mode 100644
index 00000000000..579adf9048f
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
@@ -0,0 +1,89 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.JsonFeeder;
+import ai.vespa.feed.client.Result;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+/**
+ * Sample feeder demonstrating how to programmatically feed to a Vespa cluster.
+ */
+class JsonFileFeederExample implements Closeable {
+
+ private final static Logger log = Logger.getLogger(JsonFileFeederExample.class.getName());
+
+ private final JsonFeeder jsonFeeder;
+ private final URI endpoint;
+
+ static class ResultCallBack implements JsonFeeder.ResultCallback {
+
+ final AtomicInteger resultsReceived = new AtomicInteger(0);
+ final AtomicInteger errorsReceived = new AtomicInteger(0);
+ final long startTimeMillis = System.currentTimeMillis();;
+
+ @Override
+ public void onNextResult(Result result, Throwable error) {
+ resultsReceived.incrementAndGet();
+ if (error != null) {
+ log.warning("Problems with feeding document");
+ errorsReceived.incrementAndGet();
+ } else if (result.type() == Result.Type.failure) {
+ log.warning("Problems with docID " + result.documentId() + ":" + error);
+ errorsReceived.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ log.severe("Feeding failed: " + error.getMessage());
+ }
+
+ @Override
+ public void onComplete() {
+ log.info("Feeding completed");
+ }
+
+ void dumpStatsToLog() {
+ log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors.");
+ log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms.");
+ }
+
+ }
+
+ JsonFileFeederExample(URI endpoint) {
+ this.endpoint = endpoint;
+ FeedClient feedClient = FeedClientBuilder.create(endpoint)
+ .build();
+ this.jsonFeeder = JsonFeeder.builder(feedClient)
+ .withTimeout(Duration.ofSeconds(30))
+ .build();
+ }
+
+ /**
+ * Feed all operations from a stream.
+ *
+ * @param stream The input stream to read operations from (JSON array containing one or more document operations).
+ */
+ void batchFeed(InputStream stream, String batchId) {
+ ResultCallBack callback = new ResultCallBack();
+ log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'");
+ CompletableFuture<Void> promise = jsonFeeder.feedMany(stream, callback);
+ promise.join(); // wait for feeding to complete
+ callback.dumpStatsToLog();
+ }
+
+ @Override
+ public void close() throws IOException {
+ jsonFeeder.close();
+ }
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
new file mode 100644
index 00000000000..11ed3ace304
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
@@ -0,0 +1,146 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.Result;
+
+import java.net.URI;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Locale;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Simple Streaming feeder implementation which will send operations to a Vespa endpoint.
+ * Other threads communicate with the feeder by adding new operations on the BlockingQueue
+ */
+
+class JsonStreamFeederExample extends Thread implements AutoCloseable {
+
+ static class Operation {
+ final String type;
+ final String documentId;
+ final String documentFieldsJson;
+
+ Operation(String type, String id, String fields) {
+ this.type = type;
+ this.documentId = id;
+ this.documentFieldsJson = fields;
+ }
+ }
+
+ private final static Logger log = Logger.getLogger(JsonStreamFeederExample.class.getName());
+
+ private final BlockingQueue<Operation> operations;
+ private final FeedClient feedClient;
+ private final AtomicBoolean drain = new AtomicBoolean(false);
+ private final CountDownLatch finishedDraining = new CountDownLatch(1);
+ private final Object monitor = new Object();
+ private final AtomicInteger sentCounter = new AtomicInteger();
+ private final AtomicInteger resultCounter = new AtomicInteger();
+ private final AtomicInteger failureCounter = new AtomicInteger();
+ private int startSampleResultCount = 0;
+ private Instant startSampleInstant = Instant.now();
+
+ /**
+ * Constructor
+ * @param operations The shared blocking queue where other threads can put document operations to.
+ * @param endpoint The endpoint to feed to
+ */
+ JsonStreamFeederExample(BlockingQueue<JsonStreamFeederExample.Operation> operations, URI endpoint) {
+ this.operations = operations;
+ this.feedClient = FeedClientBuilder.create(endpoint).build();
+ }
+
+ /**
+ * Shutdown this feeder, waits until operations on queue is drained
+ */
+ @Override
+ public void close() {
+ log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size());
+ drain.set(true);
+ try {
+ finishedDraining.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!drain.get() || !operations.isEmpty()) {
+ try {
+ JsonStreamFeederExample.Operation op = operations.poll(1, TimeUnit.SECONDS);
+ if(op == null) // no operations available
+ continue;
+ log.info("Put document " + op.documentId);
+ CompletableFuture<Result> promise;
+ DocumentId docId = DocumentId.of(op.documentId);
+ OperationParameters params = OperationParameters.empty();
+ String json = op.documentFieldsJson;
+ switch (op.type) {
+ case "put":
+ promise = feedClient.put(docId, json, params);
+ break;
+ case "remove":
+ promise = feedClient.remove(docId, params);
+ break;
+ case "update":
+ promise = feedClient.update(docId, json, params);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid operation: " + op.type);
+ }
+ sentCounter.incrementAndGet();
+ promise.whenComplete((result, throwable) -> {
+ if (resultCounter.getAndIncrement() % 10 == 0) {
+ printProgress();
+ }
+ if (throwable != null) {
+ failureCounter.incrementAndGet();
+ System.err.println("Failure: " + throwable);
+ throwable.printStackTrace();
+ } else if (result.type() == Result.Type.failure) {
+ failureCounter.incrementAndGet();
+ System.err.println("Failure: " + result.resultMessage());
+ }
+ });
+ } catch (InterruptedException e) {
+ log.log(Level.SEVERE, "Got interrupt exception.", e);
+ break;
+ }
+ }
+ log.info("Shutting down feeding thread");
+ this.feedClient.close();
+ finishedDraining.countDown();
+ }
+
+ void printProgress() {
+ synchronized (monitor) {
+ Instant now = Instant.now();
+ int resultCounter = this.resultCounter.get();
+ int failureCounter = this.failureCounter.get();
+ int sentCounter = this.sentCounter.get();
+ double docsDelta = resultCounter - failureCounter - startSampleResultCount;
+ Duration duration = Duration.between(startSampleInstant, now);
+ startSampleInstant = now;
+ this.startSampleResultCount = resultCounter - failureCounter;
+ long durationMilliSecs = duration.toMillis() + 1; // +1 to avoid division by zero
+ double rate = 1000. * docsDelta / durationMilliSecs;
+ System.err.println(new Date() + " Result received: " + resultCounter
+ + " (" + failureCounter + " failed so far, " + sentCounter
+ + " sent, success rate " + String.format(Locale.US, "%.2f docs/sec", rate) + ").");
+ }
+ }
+} \ No newline at end of file
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java
new file mode 100644
index 00000000000..5ece9051e41
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java
@@ -0,0 +1,34 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.Result;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+class SimpleExample {
+
+ public static void main(String[] args) {
+ try (FeedClient client = FeedClientBuilder.create(URI.create("https://my-container-endpoint-with-http2:8080/")).build()) {
+ DocumentId id = DocumentId.of("namespace", "documenttype", "1");
+ String json = "{\"fields\": {\"title\": \"hello world\"}}";
+ OperationParameters params = OperationParameters.empty()
+ .timeout(Duration.ofSeconds(5))
+ .route("myvesparoute");
+ CompletableFuture<Result> promise = client.put(id, json, params);
+ promise.whenComplete(((result, throwable) -> {
+ if (throwable != null) {
+ throwable.printStackTrace();
+ } else {
+ System.out.printf("'%s' for document '%s': %s%n", result.type(), result.documentId(), result.resultMessage());
+ }
+ }));
+ }
+ }
+
+}