diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-06-15 14:43:48 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-06-15 15:10:11 +0200 |
commit | c216ff20854a4b51e3ceac615f978688f4d60e3b (patch) | |
tree | 841dc25b964410ca87bcdeeeed4711021f4d844c /vespa-feed-client | |
parent | f986f75c5bb3f587bb39b290b78f335e24641e03 (diff) |
Add 3 examples
The two more complex examples are copied from vespa-http-client documentation.
Diffstat (limited to 'vespa-feed-client')
3 files changed, 269 insertions, 0 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java new file mode 100644 index 00000000000..579adf9048f --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java @@ -0,0 +1,89 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.JsonFeeder; +import ai.vespa.feed.client.Result; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +/** + * Sample feeder demonstrating how to programmatically feed to a Vespa cluster. + */ +class JsonFileFeederExample implements Closeable { + + private final static Logger log = Logger.getLogger(JsonFileFeederExample.class.getName()); + + private final JsonFeeder jsonFeeder; + private final URI endpoint; + + static class ResultCallBack implements JsonFeeder.ResultCallback { + + final AtomicInteger resultsReceived = new AtomicInteger(0); + final AtomicInteger errorsReceived = new AtomicInteger(0); + final long startTimeMillis = System.currentTimeMillis();; + + @Override + public void onNextResult(Result result, Throwable error) { + resultsReceived.incrementAndGet(); + if (error != null) { + log.warning("Problems with feeding document"); + errorsReceived.incrementAndGet(); + } else if (result.type() == Result.Type.failure) { + log.warning("Problems with docID " + result.documentId() + ":" + error); + errorsReceived.incrementAndGet(); + } + } + + @Override + public void onError(Throwable error) { + log.severe("Feeding failed: " + error.getMessage()); + } + + @Override + public void onComplete() { + log.info("Feeding completed"); + } + + void dumpStatsToLog() { + log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors."); + log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms."); + } + + } + + JsonFileFeederExample(URI endpoint) { + this.endpoint = endpoint; + FeedClient feedClient = FeedClientBuilder.create(endpoint) + .build(); + this.jsonFeeder = JsonFeeder.builder(feedClient) + .withTimeout(Duration.ofSeconds(30)) + .build(); + } + + /** + * Feed all operations from a stream. + * + * @param stream The input stream to read operations from (JSON array containing one or more document operations). + */ + void batchFeed(InputStream stream, String batchId) { + ResultCallBack callback = new ResultCallBack(); + log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'"); + CompletableFuture<Void> promise = jsonFeeder.feedMany(stream, callback); + promise.join(); // wait for feeding to complete + callback.dumpStatsToLog(); + } + + @Override + public void close() throws IOException { + jsonFeeder.close(); + } +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java new file mode 100644 index 00000000000..11ed3ace304 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java @@ -0,0 +1,146 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.Result; + +import java.net.URI; +import java.time.Duration; +import java.time.Instant; +import java.util.Date; +import java.util.Locale; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Simple Streaming feeder implementation which will send operations to a Vespa endpoint. + * Other threads communicate with the feeder by adding new operations on the BlockingQueue + */ + +class JsonStreamFeederExample extends Thread implements AutoCloseable { + + static class Operation { + final String type; + final String documentId; + final String documentFieldsJson; + + Operation(String type, String id, String fields) { + this.type = type; + this.documentId = id; + this.documentFieldsJson = fields; + } + } + + private final static Logger log = Logger.getLogger(JsonStreamFeederExample.class.getName()); + + private final BlockingQueue<Operation> operations; + private final FeedClient feedClient; + private final AtomicBoolean drain = new AtomicBoolean(false); + private final CountDownLatch finishedDraining = new CountDownLatch(1); + private final Object monitor = new Object(); + private final AtomicInteger sentCounter = new AtomicInteger(); + private final AtomicInteger resultCounter = new AtomicInteger(); + private final AtomicInteger failureCounter = new AtomicInteger(); + private int startSampleResultCount = 0; + private Instant startSampleInstant = Instant.now(); + + /** + * Constructor + * @param operations The shared blocking queue where other threads can put document operations to. + * @param endpoint The endpoint to feed to + */ + JsonStreamFeederExample(BlockingQueue<JsonStreamFeederExample.Operation> operations, URI endpoint) { + this.operations = operations; + this.feedClient = FeedClientBuilder.create(endpoint).build(); + } + + /** + * Shutdown this feeder, waits until operations on queue is drained + */ + @Override + public void close() { + log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size()); + drain.set(true); + try { + finishedDraining.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void run() { + while (!drain.get() || !operations.isEmpty()) { + try { + JsonStreamFeederExample.Operation op = operations.poll(1, TimeUnit.SECONDS); + if(op == null) // no operations available + continue; + log.info("Put document " + op.documentId); + CompletableFuture<Result> promise; + DocumentId docId = DocumentId.of(op.documentId); + OperationParameters params = OperationParameters.empty(); + String json = op.documentFieldsJson; + switch (op.type) { + case "put": + promise = feedClient.put(docId, json, params); + break; + case "remove": + promise = feedClient.remove(docId, params); + break; + case "update": + promise = feedClient.update(docId, json, params); + break; + default: + throw new IllegalArgumentException("Invalid operation: " + op.type); + } + sentCounter.incrementAndGet(); + promise.whenComplete((result, throwable) -> { + if (resultCounter.getAndIncrement() % 10 == 0) { + printProgress(); + } + if (throwable != null) { + failureCounter.incrementAndGet(); + System.err.println("Failure: " + throwable); + throwable.printStackTrace(); + } else if (result.type() == Result.Type.failure) { + failureCounter.incrementAndGet(); + System.err.println("Failure: " + result.resultMessage()); + } + }); + } catch (InterruptedException e) { + log.log(Level.SEVERE, "Got interrupt exception.", e); + break; + } + } + log.info("Shutting down feeding thread"); + this.feedClient.close(); + finishedDraining.countDown(); + } + + void printProgress() { + synchronized (monitor) { + Instant now = Instant.now(); + int resultCounter = this.resultCounter.get(); + int failureCounter = this.failureCounter.get(); + int sentCounter = this.sentCounter.get(); + double docsDelta = resultCounter - failureCounter - startSampleResultCount; + Duration duration = Duration.between(startSampleInstant, now); + startSampleInstant = now; + this.startSampleResultCount = resultCounter - failureCounter; + long durationMilliSecs = duration.toMillis() + 1; // +1 to avoid division by zero + double rate = 1000. * docsDelta / durationMilliSecs; + System.err.println(new Date() + " Result received: " + resultCounter + + " (" + failureCounter + " failed so far, " + sentCounter + + " sent, success rate " + String.format(Locale.US, "%.2f docs/sec", rate) + ")."); + } + } +}
\ No newline at end of file diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java new file mode 100644 index 00000000000..5ece9051e41 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java @@ -0,0 +1,34 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.Result; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +class SimpleExample { + + public static void main(String[] args) { + try (FeedClient client = FeedClientBuilder.create(URI.create("https://my-container-endpoint-with-http2:8080/")).build()) { + DocumentId id = DocumentId.of("namespace", "documenttype", "1"); + String json = "{\"fields\": {\"title\": \"hello world\"}}"; + OperationParameters params = OperationParameters.empty() + .timeout(Duration.ofSeconds(5)) + .route("myvesparoute"); + CompletableFuture<Result> promise = client.put(id, json, params); + promise.whenComplete(((result, throwable) -> { + if (throwable != null) { + throwable.printStackTrace(); + } else { + System.out.printf("'%s' for document '%s': %s%n", result.type(), result.documentId(), result.resultMessage()); + } + })); + } + } + +} |