summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-15 16:49:06 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-15 16:49:06 +0200
commit407d673e1de1db281684a0aba1685f7609bbc22d (patch)
tree645cbc49f6d9ac3f7624c92f805c4f3627ef4202 /vespa-feed-client
parentc216ff20854a4b51e3ceac615f978688f4d60e3b (diff)
Simplify JsonStreamFeederExample
Use FeedClient.stats() instead of manual book-keeping. Include document id in error messages.
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java35
1 files changed, 3 insertions, 32 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
index 11ed3ace304..5cee776b244 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
@@ -8,10 +8,6 @@ import ai.vespa.feed.client.OperationParameters;
import ai.vespa.feed.client.Result;
import java.net.URI;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Date;
-import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -46,12 +42,7 @@ class JsonStreamFeederExample extends Thread implements AutoCloseable {
private final FeedClient feedClient;
private final AtomicBoolean drain = new AtomicBoolean(false);
private final CountDownLatch finishedDraining = new CountDownLatch(1);
- private final Object monitor = new Object();
- private final AtomicInteger sentCounter = new AtomicInteger();
private final AtomicInteger resultCounter = new AtomicInteger();
- private final AtomicInteger failureCounter = new AtomicInteger();
- private int startSampleResultCount = 0;
- private Instant startSampleInstant = Instant.now();
/**
* Constructor
@@ -102,18 +93,15 @@ class JsonStreamFeederExample extends Thread implements AutoCloseable {
default:
throw new IllegalArgumentException("Invalid operation: " + op.type);
}
- sentCounter.incrementAndGet();
promise.whenComplete((result, throwable) -> {
if (resultCounter.getAndIncrement() % 10 == 0) {
- printProgress();
+ System.err.println(feedClient.stats());
}
if (throwable != null) {
- failureCounter.incrementAndGet();
- System.err.println("Failure: " + throwable);
+ System.err.printf("Failure for '%s': %s", docId, throwable);
throwable.printStackTrace();
} else if (result.type() == Result.Type.failure) {
- failureCounter.incrementAndGet();
- System.err.println("Failure: " + result.resultMessage());
+ System.err.printf("Failure for '%s': %s", docId, result.resultMessage().orElse("<no messsage>"));
}
});
} catch (InterruptedException e) {
@@ -126,21 +114,4 @@ class JsonStreamFeederExample extends Thread implements AutoCloseable {
finishedDraining.countDown();
}
- void printProgress() {
- synchronized (monitor) {
- Instant now = Instant.now();
- int resultCounter = this.resultCounter.get();
- int failureCounter = this.failureCounter.get();
- int sentCounter = this.sentCounter.get();
- double docsDelta = resultCounter - failureCounter - startSampleResultCount;
- Duration duration = Duration.between(startSampleInstant, now);
- startSampleInstant = now;
- this.startSampleResultCount = resultCounter - failureCounter;
- long durationMilliSecs = duration.toMillis() + 1; // +1 to avoid division by zero
- double rate = 1000. * docsDelta / durationMilliSecs;
- System.err.println(new Date() + " Result received: " + resultCounter
- + " (" + failureCounter + " failed so far, " + sentCounter
- + " sent, success rate " + String.format(Locale.US, "%.2f docs/sec", rate) + ").");
- }
- }
} \ No newline at end of file