From 23bbba389056c7811a4c9812b0ba9341bb85795c Mon Sep 17 00:00:00 2001 From: Jon Bratseth Date: Wed, 16 Jan 2019 22:58:35 +0100 Subject: Handle multiple results per document in a sync batch --- .../com/yahoo/vespa/http/client/FeedClient.java | 19 ++++++++++-- .../java/com/yahoo/vespa/http/client/Result.java | 25 +++++----------- .../yahoo/vespa/http/client/SyncFeedClient.java | 34 ++++++++++++++++++---- .../com/yahoo/vespa/http/client/core/Document.java | 3 +- .../vespa/http/client/core/api/FeedClientImpl.java | 9 ++---- .../core/operationProcessor/DocumentSendInfo.java | 2 +- .../src/test/java/ExampleUsageFeedClientTest.java | 13 ++++----- .../yahoo/vespa/http/client/QueueBoundsTest.java | 5 ++-- .../yahoo/vespa/http/client/core/DocumentTest.java | 10 ++++--- .../vespa/http/client/core/XmlFeedReaderTest.java | 5 ++-- .../core/communication/CloseableQTestCase.java | 10 +++---- .../IncompleteResultsThrottlerTest.java | 1 + .../operationProcessor/OperationProcessorTest.java | 13 ++++----- .../vespa/http/client/runner/JsonReaderTest.java | 4 +-- 14 files changed, 90 insertions(+), 63 deletions(-) (limited to 'vespa-http-client') diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java index e35c0316433..1028100fbcf 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java @@ -28,7 +28,9 @@ public interface FeedClient extends AutoCloseable { * @param documentId the document id of the document. * @param documentData the document data as JSON or XML (as specified when using the factory to create the API) */ - void stream(String documentId, CharSequence documentData); + default void stream(String documentId, CharSequence documentData) { + stream(documentId, documentData, null); + } /** * Streams a document to cluster(s). If the pipeline and buffers are full, this call will be blocking. @@ -39,8 +41,21 @@ public interface FeedClient extends AutoCloseable { * @param documentData the document data as JSON or XML (as specified when using the factory to create the API) * @param context a context object which will be accessible in the result of the callback, or null if none */ - void stream(String documentId, CharSequence documentData, Object context); + default void stream(String documentId, CharSequence documentData, Object context) { + stream(documentId, null, documentData, context); + } + /** + * Streams a document to cluster(s). If the pipeline and buffers are full, this call will be blocking. + * Documents might time out before they are sent. Failed documents are not retried. + * Don't call stream() after close is called. + * + * @param documentId the document id of the document. + * @param operationId the id to use for this operation, or null to let the client decide an operation id + * @param documentData the document data as JSON or XML (as specified when using the factory to create the API) + * @param context a context object which will be accessible in the result of the callback, or null if none + */ + void stream(String documentId, String operationId, CharSequence documentData, Object context); /** * This callback is executed when new results are arriving or an error occur. diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java index efa609a2d59..dbdd6eb23ac 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java @@ -18,7 +18,6 @@ import java.util.List; * * @author Einar M R Rosenvinge */ -// This should be an interface, but in order to be binary compatible during refactoring we made it abstract. public class Result { public enum ResultType { @@ -48,28 +47,20 @@ public class Result { this.localTrace = localTrace == null ? null : localTrace.toString(); } - - /** - * Returns the document ID that this Result is for. - * - * @return the document ID that this Result is for. - */ + /** Returns the document id that this result is for */ public String getDocumentId() { return document.getDocumentId(); } - /** - * Returns the document data. - * @return data as bytebuffer. - */ + /** Returns the id of the operation this is the result of */ + public String getOperationId() { return document.getOperationId(); } + + /** Returns the document data */ public CharSequence getDocumentDataAsCharSequence() { return document.getDataAsString(); } - /** - * Returns the context of the object if any. - * @return context. - */ + /** Returns the context of the object if any */ public Object getContext() { return document.getContext(); } @@ -77,12 +68,11 @@ public class Result { /** * Returns true if the operation(s) was successful. If at least one {@link Detail} * in {@link #getDetails()} is unsuccessful, this will return false. - * - * @return true if the operation was successful. */ public boolean isSuccess() { return success; } + /** * @deprecated use resultType on items getDetails() to check operations. * Returns true if an error is transient, false if it is permanent. Irrelevant @@ -99,6 +89,7 @@ public class Result { /** * Checks if operation has been set up with local tracing. + * * @return true if operation has local trace. */ public boolean hasLocalTrace() { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java index c0928cad556..3851fdb40fe 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java @@ -3,12 +3,14 @@ package com.yahoo.vespa.http.client; import com.google.common.annotations.Beta; import com.yahoo.vespa.http.client.config.SessionParams; +import java.math.BigInteger; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; /** * A utility wrapper of a FeedClient which feeds a list of documents and blocks until all responses are returned, @@ -41,7 +43,7 @@ public class SyncFeedClient implements AutoCloseable { public SyncResult stream(List operations) { callback.expectResultsOf(operations); for (SyncOperation operation : operations) - wrappedClient.stream(operation.documentId, operation.documentData, operation.context); + wrappedClient.stream(operation.documentId, operation.operationId, operation.documentData, operation.context); return callback.waitForResults(); } @@ -57,6 +59,9 @@ public class SyncFeedClient implements AutoCloseable { private final CharSequence documentData; private final Object context; + /** Operation id passed on to the Document created from this */ + private final String operationId; + public SyncOperation(String documentId, CharSequence documentData) { this(documentId, documentData, null); } @@ -65,6 +70,7 @@ public class SyncFeedClient implements AutoCloseable { this.documentId = Objects.requireNonNull(documentId, "documentId"); this.documentData = Objects.requireNonNull(documentData, "documentData"); this.context = context; + this.operationId = new BigInteger(64, ThreadLocalRandom.current()).toString(32); } } @@ -113,11 +119,28 @@ public class SyncFeedClient implements AutoCloseable { private Exception exception = null; /** - * A map from document ids to their results. This is initially populated with null values to keep track of + * A map from operation ids to their results. This is initially populated with null values to keep track of * which responses we are waiting for. */ private LinkedHashMap results = null; + void resetExpectedResults() { + synchronized (monitor) { + if (results != null) + throw new ConcurrentModificationException("A SyncFeedClient instance is used by multiple threads"); + + resultsReceived = 0; + exception = null; + results = new LinkedHashMap<>(); + } + } + + void addExpectationOfResultFor(String operationId) { + synchronized (monitor) { + results.put(operationId, null); + } + } + void expectResultsOf(List operations) { synchronized (monitor) { if (results != null) @@ -127,10 +150,9 @@ public class SyncFeedClient implements AutoCloseable { exception = null; results = new LinkedHashMap<>(operations.size()); for (SyncOperation operation : operations) - results.put(operation.documentId, null); + results.put(operation.operationId, null); } } - SyncResult waitForResults() { try { synchronized (monitor) { @@ -150,9 +172,9 @@ public class SyncFeedClient implements AutoCloseable { @Override public void onCompletion(String docId, Result documentResult) { synchronized (monitor) { - if ( ! results.containsKey(docId)) return; // Not expecting this result - ignore + if ( ! results.containsKey(documentResult.getOperationId())) return; // Stale result - ignore - Result previousValue = results.put(docId, documentResult); + Result previousValue = results.put(documentResult.getOperationId(), documentResult); if (previousValue != null) throw new IllegalStateException("Received duplicate result for " + docId); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java index 88d8d702d7c..f4da5d02012 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java @@ -28,8 +28,9 @@ final public class Document { this.data = ByteBuffer.wrap(data); } - public Document(String documentId, CharSequence data, Object context) { + public Document(String documentId, String operationId, CharSequence data, Object context) { this.documentId = documentId; + this.operationId = operationId; this.context = context; try { this.data = StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(data)); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java index b60b9fa89d3..7238a0c4ba7 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java @@ -45,17 +45,12 @@ public class FeedClientImpl implements FeedClient { } @Override - public void stream(String documentId, CharSequence documentData) { - stream(documentId, documentData, null); - } - - @Override - public void stream(String documentId, CharSequence documentData, Object context) { + public void stream(String documentId, String operationId, CharSequence documentData, Object context) { CharsetEncoder charsetEncoder = StandardCharsets.UTF_8.newEncoder(); charsetEncoder.onMalformedInput(CodingErrorAction.REPORT); charsetEncoder.onUnmappableCharacter(CodingErrorAction.REPORT); - Document document = new Document(documentId, documentData, context); + Document document = new Document(documentId, operationId, documentData, context); operationProcessor.sendDocument(document); } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java index 54eac939b9f..0952837ecc4 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java @@ -9,7 +9,7 @@ import java.util.Map; /** * Keeps an overview of what is sent and what is received for an operation. - * This class is NOT thread-safe by design. + * This class is NOT thread-safe. */ class DocumentSendInfo { diff --git a/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java index 238f6a91e53..909131a8979 100644 --- a/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java +++ b/vespa-http-client/src/test/java/ExampleUsageFeedClientTest.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Unit test that test documentation code. + * * @author dybis */ public class ExampleUsageFeedClientTest { @@ -45,7 +46,7 @@ public class ExampleUsageFeedClientTest { // Example usage of FeedClient public static void exampleCode(String hostNameA, int portServerA, String hostNameB, int portServerB) { - final boolean useSsl = false; + boolean useSsl = false; final SessionParams sessionParams = new SessionParams.Builder() .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create(hostNameA, portServerA, useSsl)).build()) .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create(hostNameB, portServerB, useSsl)).build()) @@ -54,8 +55,8 @@ public class ExampleUsageFeedClientTest { .build()) .build(); - final AtomicInteger resultsReceived = new AtomicInteger(0); - final AtomicInteger errorsReceived = new AtomicInteger(0); + AtomicInteger resultsReceived = new AtomicInteger(0); + AtomicInteger errorsReceived = new AtomicInteger(0); FeedClient feedClient = FeedClientFactory.create(sessionParams, new FeedClient.ResultCallback() { @Override @@ -72,15 +73,13 @@ public class ExampleUsageFeedClientTest { } }); int sentCounter = 0; - final List docIds = Arrays.asList("1", "2", "3", "4"); + List docIds = Arrays.asList("1", "2", "3", "4"); for (final String docId : docIds) { CharSequence docData = generateDocument(docId); feedClient.stream(docId, docData, docId); sentCounter++; - System.out.println("Sent " + sentCounter + " received results from " + resultsReceived.get()); } feedClient.close(); - System.out.println("Finished, got " + errorsReceived.get() - + " errors from " + resultsReceived.get() + " results, sent " + sentCounter + " documents."); } + } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java index 168745cb75b..357296eb789 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java @@ -30,12 +30,13 @@ import static org.junit.Assert.fail; /** * Only runs on screwdriver to save time! - * @author Einar M R Rosenvinge - * @since 5.1.29 + * + * @author Einar M R Rosenvinge */ public class QueueBoundsTest extends TestOnCiBuildingSystemOnly { public static final List documents; + static { List docs = new ArrayList<>(); docs.add(new TestDocument("id:music:music::http://music.yahoo.com/bobdylan/BestOf", diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java index a712828717b..b5c03eade51 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java @@ -10,11 +10,12 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; public class DocumentTest { + @Test - public void simpleCaseOk() throws Document.DocumentException { + public void simpleCaseOk() { String docId = "doc id"; String docContent = "foo"; - Document document = new Document(docId, docContent.getBytes(), null /* context */); + Document document = new Document(docId, docContent.getBytes(), null); assertThat(document.getDocumentId(), is(docId)); assertThat(document.getData(), is(ByteBuffer.wrap(docContent.getBytes()))); assertThat(document.getDataAsString().toString(), is(docContent)); @@ -26,13 +27,14 @@ public class DocumentTest { @Test(expected = ReadOnlyBufferException.class) public void notMutablePutTest() { - Document document = new Document("id", "data", null /* context */); + Document document = new Document("id", null, "data", null /* context */); document.getData().put("a".getBytes()); } @Test(expected = ReadOnlyBufferException.class) public void notMutableCompactTest() { - Document document = new Document("id", "data", null /* context */); + Document document = new Document("id", null, "data", null /* context */); document.getData().compact(); } + } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/XmlFeedReaderTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/XmlFeedReaderTest.java index 96569b2985f..e71c54672a9 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/XmlFeedReaderTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/XmlFeedReaderTest.java @@ -209,8 +209,8 @@ public class XmlFeedReaderTest { } @Override - public void stream(String documentId, CharSequence documentData, Object context) { - documentIds.add(documentId.toString()); + public void stream(String documentId, String operationId, CharSequence documentData, Object context) { + documentIds.add(documentId); datas.add(documentData); contexts.add(context); } @@ -256,4 +256,5 @@ public class XmlFeedReaderTest { @Test public void testAposData() throws Exception { verifyNoTransformationOfXml(feedResource4); } + } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java index 82538179ef9..35a06258f86 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java @@ -11,7 +11,7 @@ public class CloseableQTestCase { @Test public void requestThatPutIsInterruptedOnClose() throws InterruptedException { final DocumentQueue q = new DocumentQueue(1); - q.put(new Document("id", "data", null /* context */), false); + q.put(new Document("id", null, "data", null), false); Thread t = new Thread(new Runnable() { @Override public void run() { @@ -26,7 +26,7 @@ public class CloseableQTestCase { }); t.start(); try { - q.put(new Document("id2", "data2", null /* context */), false); + q.put(new Document("id2", null, "data2", null), false); fail("This shouldn't have worked."); } catch (IllegalStateException ise) { // ok! @@ -40,9 +40,9 @@ public class CloseableQTestCase { @Test public void requireThatSelfIsUnbounded() throws InterruptedException { DocumentQueue q = new DocumentQueue(1); - q.put(new Document("1", "data", null /* context */), true); - q.put(new Document("2", "data", null /* context */), true); - q.put(new Document("3", "data", null /* context */), true); + q.put(new Document("1", null, "data", null), true); + q.put(new Document("2", null, "data", null), true); + q.put(new Document("3", null, "data", null), true); assertEquals(3, q.size()); } } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java index a7d44cf597a..47aeb4d3b0f 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class IncompleteResultsThrottlerTest { + @Test public void simpleStaticQueueSizeTest() { IncompleteResultsThrottler incompleteResultsThrottler = new IncompleteResultsThrottler(2, 2, null, null); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java index 3a335aa5da0..42cbfd19a88 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java @@ -27,16 +27,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** - * @author Einar M R Rosenvinge - * @since 5.1.20 + * @author Einar M R Rosenvinge */ public class OperationProcessorTest { final Queue queue = new ArrayDeque<>(); - final Document doc1 = new Document("doc:a:b", "data doc 1", null /* context */); - final Document doc1b = new Document("doc:a:b", "data doc 1b", null /* context */); - final Document doc2 = new Document("doc:a:b2", "data doc 2", null /* context */); - final Document doc3 = new Document("doc:a:b3", "data doc 3", null /* context */); + final Document doc1 = new Document("doc:a:b", null, "data doc 1", null); + final Document doc1b = new Document("doc:a:b", null, "data doc 1b", null); + final Document doc2 = new Document("doc:a:b2", null, "data doc 2", null); + final Document doc3 = new Document("doc:a:b3", null, "data doc 3", null); @Test public void testBasic() { @@ -204,7 +203,7 @@ public class OperationProcessorTest { Queue documentQueue = new ArrayDeque<>(); for (int x = 0; x < 100; x++) { - Document document = new Document("doc:a:b", String.valueOf(x), null /* context */); + Document document = new Document("doc:a:b", null, String.valueOf(x), null); operationProcessor.sendDocument(document); documentQueue.add(document); } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/JsonReaderTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/JsonReaderTest.java index d8dfae00b85..134274ff869 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/JsonReaderTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/runner/JsonReaderTest.java @@ -84,8 +84,8 @@ public class JsonReaderTest { } @Override - public void stream(String documentId, CharSequence documentData, Object context) { - documentIds.add(documentId.toString()); + public void stream(String documentId, String operationId, CharSequence documentData, Object context) { + documentIds.add(documentId); datas.add(documentData); contexts.add(context); } -- cgit v1.2.3