From 5e956429169d3a733114e5f76f051167f291c786 Mon Sep 17 00:00:00 2001 From: Morten Tokle Date: Tue, 7 Dec 2021 12:52:42 +0100 Subject: Extract vespa-feed-client-api module from vespa-feed-client --- .../java/ai/vespa/feed/client/JsonFeederTest.java | 240 +++++++++++++++++++++ .../client/examples/JsonFileFeederExample.java | 90 ++++++++ .../client/examples/JsonStreamFeederExample.java | 115 ++++++++++ .../vespa/feed/client/examples/SimpleExample.java | 34 +++ 4 files changed, 479 insertions(+) create mode 100644 vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java create mode 100644 vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java create mode 100644 vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java create mode 100644 vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java (limited to 'vespa-feed-client-api/src/test') diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java new file mode 100644 index 00000000000..d795678db39 --- /dev/null +++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -0,0 +1,240 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.joining; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class JsonFeederTest { + + @Test + void test() throws IOException { + int docs = 1 << 14; + String json = "[\n" + + + IntStream.range(0, docs).mapToObj(i -> + " {\n" + + " \"id\": \"id:ns:type::abc" + i + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " },\n" + ).collect(joining()) + + + " {\n" + + " \"id\": \"id:ns:type::abc" + docs + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n" + + "]"; + AtomicReference exceptionThrow = new AtomicReference<>(); + Path tmpFile = Files.createTempFile(null, null); + Files.write(tmpFile, json.getBytes(UTF_8)); + try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) { + AtomicInteger resultsReceived = new AtomicInteger(); + AtomicBoolean completedSuccessfully = new AtomicBoolean(); + long startNanos = System.nanoTime(); + MockClient feedClient = new MockClient(); + JsonFeeder.builder(feedClient).build() + .feedMany(in, 1 << 10, + new JsonFeeder.ResultCallback() { + @Override + public void onNextResult(Result result, FeedException error) { resultsReceived.incrementAndGet(); } + + @Override + public void onError(FeedException error) { exceptionThrow.set(error); } + + @Override + public void onComplete() { completedSuccessfully.set(true); } + }) + .join(); + + System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + assertEquals(docs + 1, feedClient.putOperations.size()); + assertEquals(docs + 1, resultsReceived.get()); + assertTrue(completedSuccessfully.get()); + assertNull(exceptionThrow.get()); + } + } + + @Test + public void multipleJsonArrayOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { + MockClient client = new MockClient(); + try (JsonFeeder feeder = JsonFeeder.builder(client).build()) { + String json = "[{" + + " \"put\": \"id:ns:type::abc1\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + "},\n" + + "{" + + " \"put\": \"id:ns:type::abc2\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + "}]\n"; + feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8))).get(); + client.assertPutDocumentIds("abc1", "abc2"); + client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}"); + client.assertPutOperation("abc2", "{\"fields\":{\n \"lul\":\"lal\"\n }}"); + } + } + + @Test + public void multipleJsonLOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { + MockClient client = new MockClient(); + try (JsonFeeder feeder = JsonFeeder.builder(client).build()) { + String json = "{\n" + + " \"remove\": \"id:ns:type::abc1\"\n" + + "}\n" + + "{\n" + + " \"fields\": {\n" + + " \"lul\": { \"assign\": \"lal\" }\n" + + " },\n" + + " \"update\": \"id:ns:type::abc2\"\n" + + "}\n" + + "{\n" + + " \"put\": \"id:ns:type::abc3\",\n" + + " \"fields\": {\n" + + " \"lul\": \"lal\"\n" + + " }\n" + + "}\n"; + + feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8)), + 3, // Mini-buffer, which needs to expand. + new JsonFeeder.ResultCallback() { }) + .get(); + client.assertRemoveDocumentIds("abc1"); + client.assertUpdateDocumentIds("abc2"); + client.assertUpdateOperation("abc2", "{\"fields\":{\n \"lul\": { \"assign\": \"lal\" }\n }}"); + client.assertPutDocumentIds("abc3"); + client.assertPutOperation("abc3", "{\"fields\":{\n \"lul\": \"lal\"\n }}"); + } + } + + @Test + public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { + MockClient client = new MockClient(); + try (JsonFeeder feeder = JsonFeeder.builder(client).build()) { + String json = "{\"put\": \"id:ns:type::abc1\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n"; + Result result = feeder.feedSingle(json).get(); + Assertions.assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId()); + assertEquals(Result.Type.success, result.type()); + assertEquals("success", result.resultMessage().get()); + client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}"); + } + } + + private static class MockClient implements FeedClient { + final Map putOperations = new LinkedHashMap<>(); + final Map updateOperations = new LinkedHashMap<>(); + final Map removeOperations = new LinkedHashMap<>(); + + @Override + public CompletableFuture put(DocumentId documentId, String documentJson, OperationParameters params) { + putOperations.put(documentId, documentJson); + return createSuccessResult(documentId); + } + + @Override + public CompletableFuture update(DocumentId documentId, String updateJson, OperationParameters params) { + updateOperations.put(documentId, updateJson); + return createSuccessResult(documentId); + } + + @Override + public CompletableFuture remove(DocumentId documentId, OperationParameters params) { + removeOperations.put(documentId, null); + return createSuccessResult(documentId); + } + + @Override + public OperationStats stats() { return null; } + + @Override + public CircuitBreaker.State circuitBreakerState() { return null; } + + @Override + public void close(boolean graceful) { } + + private CompletableFuture createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(new Result(){ + @Override public Type type() { return Type.success; } + @Override public DocumentId documentId() { return documentId; } + @Override public Optional resultMessage() { return Optional.of("success"); } + @Override public Optional traceMessage() { return Optional.empty(); } + }); + } + + void assertDocumentIds(Collection keys, String... expectedUserSpecificIds) { + List expected = Arrays.stream(expectedUserSpecificIds) + .map(userSpecific -> "id:ns:type::" + userSpecific) + .sorted() + .collect(Collectors.toList()); + List actual = keys.stream() + .map(DocumentId::toString).sorted() + .collect(Collectors.toList()); + assertEquals(expected, actual, "Document ids must match"); + } + + void assertPutDocumentIds(String... expectedUserSpecificIds) { + assertDocumentIds(putOperations.keySet(), expectedUserSpecificIds); + } + + void assertUpdateDocumentIds(String... expectedUserSpecificIds) { + assertDocumentIds(updateOperations.keySet(), expectedUserSpecificIds); + } + + void assertRemoveDocumentIds(String... expectedUserSpecificIds) { + assertDocumentIds(removeOperations.keySet(), expectedUserSpecificIds); + } + + void assertPutOperation(String userSpecificId, String expectedJson) { + DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId); + String json = putOperations.get(docId); + assertNotNull(json); + assertEquals(expectedJson.trim(), json.trim()); + } + + void assertUpdateOperation(String userSpecificId, String expectedJson) { + DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId); + String json = updateOperations.get(docId); + assertNotNull(json); + assertEquals(expectedJson.trim(), json.trim()); + } + + } + +} diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java new file mode 100644 index 00000000000..b951fb62fb5 --- /dev/null +++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java @@ -0,0 +1,90 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.FeedException; +import ai.vespa.feed.client.JsonFeeder; +import ai.vespa.feed.client.Result; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +/** + * Sample feeder demonstrating how to programmatically feed to a Vespa cluster. + */ +class JsonFileFeederExample implements Closeable { + + private final static Logger log = Logger.getLogger(JsonFileFeederExample.class.getName()); + + private final JsonFeeder jsonFeeder; + private final URI endpoint; + + static class ResultCallBack implements JsonFeeder.ResultCallback { + + final AtomicInteger resultsReceived = new AtomicInteger(0); + final AtomicInteger errorsReceived = new AtomicInteger(0); + final long startTimeMillis = System.currentTimeMillis();; + + @Override + public void onNextResult(Result result, FeedException error) { + resultsReceived.incrementAndGet(); + if (error != null) { + log.warning("Problems with feeding document " + + error.documentId().map(DocumentId::toString).orElse("") + + ": " + error); + errorsReceived.incrementAndGet(); + } + } + + @Override + public void onError(FeedException error) { + log.severe("Feeding failed fatally: " + error.getMessage()); + } + + @Override + public void onComplete() { + log.info("Feeding completed"); + } + + void dumpStatsToLog() { + log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors."); + log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms."); + } + + } + + JsonFileFeederExample(URI endpoint) { + this.endpoint = endpoint; + FeedClient feedClient = FeedClientBuilder.create(endpoint) + .build(); + this.jsonFeeder = JsonFeeder.builder(feedClient) + .withTimeout(Duration.ofSeconds(30)) + .build(); + } + + /** + * Feed all operations from a stream. + * + * @param stream The input stream to read operations from (JSON array containing one or more document operations). + */ + void batchFeed(InputStream stream, String batchId) { + ResultCallBack callback = new ResultCallBack(); + log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'"); + CompletableFuture promise = jsonFeeder.feedMany(stream, callback); + promise.join(); // wait for feeding to complete + callback.dumpStatsToLog(); + } + + @Override + public void close() throws IOException { + jsonFeeder.close(); + } +} diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java new file mode 100644 index 00000000000..3d4ce150fcf --- /dev/null +++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java @@ -0,0 +1,115 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.Result; + +import java.net.URI; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Simple Streaming feeder implementation which will send operations to a Vespa endpoint. + * Other threads communicate with the feeder by adding new operations on the BlockingQueue + */ + +class JsonStreamFeederExample extends Thread implements AutoCloseable { + + static class Operation { + final String type; + final String documentId; + final String documentFieldsJson; + + Operation(String type, String id, String fields) { + this.type = type; + this.documentId = id; + this.documentFieldsJson = fields; + } + } + + private final static Logger log = Logger.getLogger(JsonStreamFeederExample.class.getName()); + + private final BlockingQueue operations; + private final FeedClient feedClient; + private final AtomicBoolean drain = new AtomicBoolean(false); + private final CountDownLatch finishedDraining = new CountDownLatch(1); + private final AtomicInteger resultCounter = new AtomicInteger(); + + /** + * Constructor + * @param operations The shared blocking queue where other threads can put document operations to. + * @param endpoint The endpoint to feed to + */ + JsonStreamFeederExample(BlockingQueue operations, URI endpoint) { + this.operations = operations; + this.feedClient = FeedClientBuilder.create(endpoint).build(); + } + + /** + * Shutdown this feeder, waits until operations on queue is drained + */ + @Override + public void close() { + log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size()); + drain.set(true); + try { + finishedDraining.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void run() { + while (!drain.get() || !operations.isEmpty()) { + try { + JsonStreamFeederExample.Operation op = operations.poll(1, TimeUnit.SECONDS); + if(op == null) // no operations available + continue; + log.info("Put document " + op.documentId); + CompletableFuture promise; + DocumentId docId = DocumentId.of(op.documentId); + OperationParameters params = OperationParameters.empty(); + String json = op.documentFieldsJson; + switch (op.type) { + case "put": + promise = feedClient.put(docId, json, params); + break; + case "remove": + promise = feedClient.remove(docId, params); + break; + case "update": + promise = feedClient.update(docId, json, params); + break; + default: + throw new IllegalArgumentException("Invalid operation: " + op.type); + } + promise.whenComplete((result, throwable) -> { + if (resultCounter.getAndIncrement() % 10 == 0) { + System.err.println(feedClient.stats()); + } + if (throwable != null) { + System.err.printf("Failure for '%s': %s", docId, throwable); + throwable.printStackTrace(); + } + }); + } catch (InterruptedException e) { + log.log(Level.SEVERE, "Got interrupt exception.", e); + break; + } + } + log.info("Shutting down feeding thread"); + this.feedClient.close(); + finishedDraining.countDown(); + } + +} \ No newline at end of file diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java new file mode 100644 index 00000000000..4e6473a6568 --- /dev/null +++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.Result; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +class SimpleExample { + + public static void main(String[] args) { + try (FeedClient client = FeedClientBuilder.create(URI.create("https://my-container-endpoint-with-http2:8080/")).build()) { + DocumentId id = DocumentId.of("namespace", "documenttype", "1"); + String json = "{\"fields\": {\"title\": \"hello world\"}}"; + OperationParameters params = OperationParameters.empty() + .timeout(Duration.ofSeconds(5)) + .route("myvesparoute"); + CompletableFuture promise = client.put(id, json, params); + promise.whenComplete(((result, throwable) -> { + if (throwable != null) { + throwable.printStackTrace(); + } else { + System.out.printf("'%s' for document '%s': %s%n", result.type(), result.documentId(), result.resultMessage()); + } + })); + } + } + +} -- cgit v1.2.3