diff options
10 files changed, 219 insertions, 34 deletions
diff --git a/document/src/main/java/com/yahoo/document/DocumentPut.java b/document/src/main/java/com/yahoo/document/DocumentPut.java index e5ddc2c67a3..5906a9ca0ba 100644 --- a/document/src/main/java/com/yahoo/document/DocumentPut.java +++ b/document/src/main/java/com/yahoo/document/DocumentPut.java @@ -45,4 +45,9 @@ public class DocumentPut extends DocumentOperation { this.document = newDocument; } + @Override + public String toString() { + return "put of document " + getId(); + } + } diff --git a/document/src/main/java/com/yahoo/document/json/JsonFeedReader.java b/document/src/main/java/com/yahoo/document/json/JsonFeedReader.java index ec0d29a53a6..1b5681e7146 100644 --- a/document/src/main/java/com/yahoo/document/json/JsonFeedReader.java +++ b/document/src/main/java/com/yahoo/document/json/JsonFeedReader.java @@ -23,6 +23,7 @@ import com.yahoo.vespaxmlparser.VespaXMLFeedReader.Operation; * @author steinar */ public class JsonFeedReader implements FeedReader { + private final JsonReader reader; private InputStream stream; private static final JsonFactory jsonFactory = new JsonFactory().disable(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES); diff --git a/document/src/main/java/com/yahoo/document/json/JsonReader.java b/document/src/main/java/com/yahoo/document/json/JsonReader.java index bedfbdc3da5..b7818b06b03 100644 --- a/document/src/main/java/com/yahoo/document/json/JsonReader.java +++ b/document/src/main/java/com/yahoo/document/json/JsonReader.java @@ -28,7 +28,6 @@ import static com.yahoo.document.json.readers.JsonParserHelpers.expectArrayStart * @author Steinar Knutsen * @author dybis */ -@Beta public class JsonReader { public Optional<DocumentParseInfo> parseDocument() throws IOException { @@ -79,6 +78,7 @@ public class JsonReader { return operation; } + /** Returns the next document operation, or null if we have reached the end */ public DocumentOperation next() { switch (state) { case AT_START: diff --git a/document/src/main/java/com/yahoo/document/json/TokenBuffer.java b/document/src/main/java/com/yahoo/document/json/TokenBuffer.java index e6fc8171a1a..e20845bfa54 100644 --- a/document/src/main/java/com/yahoo/document/json/TokenBuffer.java +++ b/document/src/main/java/com/yahoo/document/json/TokenBuffer.java @@ -13,9 +13,10 @@ import com.google.common.base.Preconditions; /** * Helper class to enable lookahead in the token stream. * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + * @author Steinar Knutsen */ public class TokenBuffer { + public static final class Token { public final JsonToken token; public final String name; @@ -42,6 +43,9 @@ public class TokenBuffer { } } + /** Returns whether any tokens are available in this */ + public boolean isEmpty() { return size() == 0; } + public JsonToken next() { buffer.removeFirst(); Token t = buffer.peekFirst(); @@ -52,16 +56,25 @@ public class TokenBuffer { return t.token; } + /** Returns the current token without changing position, or null if none */ public JsonToken currentToken() { - return buffer.peekFirst().token; + Token token = buffer.peekFirst(); + if (token == null) return null; + return token.token; } + /** Returns the current token name without changing position, or null if none */ public String currentName() { - return buffer.peekFirst().name; + Token token = buffer.peekFirst(); + if (token == null) return null; + return token.name; } + /** Returns the current token text without changing position, or null if none */ public String currentText() { - return buffer.peekFirst().text; + Token token = buffer.peekFirst(); + if (token == null) return null; + return token.text; } public int size() { diff --git a/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java b/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java index 744ec12bb23..3fc2c941b99 100644 --- a/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java +++ b/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java @@ -33,31 +33,43 @@ public class DocumentParser { this.parser = parser; } + /** + * Parses a single document and returns it. + * Returns empty is we have reached the end of the stream. + */ public Optional<DocumentParseInfo> parse(Optional<DocumentId> documentIdArg) throws IOException { indentLevel = 0; DocumentParseInfo documentParseInfo = new DocumentParseInfo(); documentIdArg.ifPresent(documentId -> documentParseInfo.documentId = documentId); + boolean foundItems = false; do { - parseOneItem(documentParseInfo, documentIdArg.isPresent() /* doc id set externally */); + foundItems |= parseOneItem(documentParseInfo, documentIdArg.isPresent() /* doc id set externally */); } while (indentLevel > 0L); - if (documentParseInfo.documentId != null) { - return Optional.of(documentParseInfo); + if (documentParseInfo.documentId == null) { + if (foundItems) + throw new IllegalArgumentException("Missing a document operation ('put', 'update' or 'remove')"); + else + return Optional.empty(); } - return Optional.empty(); + return Optional.of(documentParseInfo); } - private void parseOneItem(DocumentParseInfo documentParseInfo, boolean docIdAndOperationIsSetExternally) throws IOException { + /** + * Parses one item from the stream. + * + * @return whether an item was found + */ + private boolean parseOneItem(DocumentParseInfo documentParseInfo, boolean docIdAndOperationIsSetExternally) throws IOException { parser.nextValue(); processIndent(); - if (parser.getCurrentName() == null) { - return; - } + if (parser.getCurrentName() == null) return false; if (indentLevel == 1L) { handleIdentLevelOne(documentParseInfo, docIdAndOperationIsSetExternally); } else if (indentLevel == 2L) { handleIdentLevelTwo(documentParseInfo); } + return true; } private void processIndent() { diff --git a/document/src/main/java/com/yahoo/document/json/readers/VespaJsonDocumentReader.java b/document/src/main/java/com/yahoo/document/json/readers/VespaJsonDocumentReader.java index 8e381c8e2fe..6189c12c8c9 100644 --- a/document/src/main/java/com/yahoo/document/json/readers/VespaJsonDocumentReader.java +++ b/document/src/main/java/com/yahoo/document/json/readers/VespaJsonDocumentReader.java @@ -74,6 +74,8 @@ public class VespaJsonDocumentReader { // Exposed for unit testing... public void readPut(TokenBuffer buffer, DocumentPut put) { try { + if (buffer.isEmpty()) // no "fields" map + throw new IllegalArgumentException(put + " is missing a 'fields' map"); populateComposite(buffer, put.getDocument()); } catch (JsonReaderException e) { throw JsonReaderException.addDocId(e, put.getId()); @@ -82,6 +84,8 @@ public class VespaJsonDocumentReader { // Exposed for unit testing... public void readUpdate(TokenBuffer buffer, DocumentUpdate update) { + if (buffer.isEmpty()) + throw new IllegalArgumentException("update of document " + update.getId() + " is missing a 'fields' map"); expectObjectStart(buffer.currentToken()); int localNesting = buffer.nesting(); diff --git a/document/src/test/java/com/yahoo/document/json/JsonReaderTestCase.java b/document/src/test/java/com/yahoo/document/json/JsonReaderTestCase.java index 07f0a172caf..1fd45cb07c4 100644 --- a/document/src/test/java/com/yahoo/document/json/JsonReaderTestCase.java +++ b/document/src/test/java/com/yahoo/document/json/JsonReaderTestCase.java @@ -1094,6 +1094,67 @@ public class JsonReaderTestCase { new JsonReader(types, jsonToInputStream(jsonData), parserFactory).next(); } + @Test + public void testMissingOperation() { + try { + String jsonData = inputJson( + "[", + " {", + " 'fields': {", + " 'actualarray': {", + " 'add': [", + " 'person',", + " 'another person'", + " ]", + " }", + " }", + " }", + "]"); + + new JsonReader(types, jsonToInputStream(jsonData), parserFactory).next(); + fail("Expected exception"); + } + catch (IllegalArgumentException e) { + assertEquals("Missing a document operation ('put', 'update' or 'remove')", e.getMessage()); + } + } + + @Test + public void testMissingFieldsMapInPut() { + try { + String jsonData = inputJson( + "[", + " {", + " 'put': 'id:unittest:smoke::whee'", + " }", + "]"); + + new JsonReader(types, jsonToInputStream(jsonData), parserFactory).next(); + fail("Expected exception"); + } + catch (IllegalArgumentException e) { + assertEquals("put of document id:unittest:smoke::whee is missing a 'fields' map", e.getMessage()); + } + } + + @Test + public void testMissingFieldsMapInUpdate() { + try { + String jsonData = inputJson( + "[", + " {", + " 'update': 'id:unittest:smoke::whee'", + " }", + "]"); + + new JsonReader(types, jsonToInputStream(jsonData), parserFactory).next(); + fail("Expected exception"); + } + catch (IllegalArgumentException e) { + assertEquals("update of document id:unittest:smoke::whee is missing a 'fields' map", e.getMessage()); + } + } + static ByteArrayInputStream jsonToInputStream(String json) { return new ByteArrayInputStream(Utf8.toBytes(json)); } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java index 835a11bf7c2..5f49dd5ddf8 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java @@ -40,6 +40,8 @@ import static com.yahoo.messagebus.ErrorCode.SEND_QUEUE_FULL; * The implementation is based on the code from V2, but the object model is rewritten to simplify the logic and * avoid using a threadpool that has no effect with all the extra that comes with it. V2 has one instance per thread * on the client, while this is one instance for all threads. + * + * @author dybis */ class ClientFeederV3 { @@ -109,7 +111,7 @@ class ClientFeederV3 { ongoingRequests.incrementAndGet(); try { FeederSettings feederSettings = new FeederSettings(request); - /** + /* * The gateway handle overload from clients in different ways. * * If the backend is overloaded, but not the gateway, it will fill the backend, messagebus throttler @@ -132,7 +134,7 @@ class ClientFeederV3 { } InputStream inputStream = StreamReaderV3.unzipStreamIfNeeded(request); - final BlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<>(); + BlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<>(); try { feed(feederSettings, inputStream, replies, threadsAvailableForFeeding); synchronized (monitor) { @@ -148,11 +150,7 @@ class ClientFeederV3 { log.log(LogLevel.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e); } finally { - try { - replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, false, null)); - } catch (InterruptedException e) { - // NOP, we are already exiting the thread - } + replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, false, null)); } return new FeedResponse(200, replies, 3 /* protocol version */, clientId, outstandingOperations.get(), hostName); } finally { @@ -171,7 +169,7 @@ class ClientFeederV3 { private Optional<DocumentOperationMessageV3> pullMessageFromRequest( FeederSettings settings, InputStream requestInputStream, BlockingQueue<OperationStatus> repliesFromOldMessages) { while (true) { - final Optional<String> operationId; + Optional<String> operationId; try { operationId = streamReaderV3.getNextOperationId(requestInputStream); } catch (IOException ioe) { @@ -183,7 +181,8 @@ class ClientFeederV3 { if (! operationId.isPresent()) { return Optional.empty(); } - final DocumentOperationMessageV3 message; + + DocumentOperationMessageV3 message; try { message = getNextMessage(operationId.get(), requestInputStream, settings); } catch (Exception e) { @@ -236,7 +235,7 @@ class ClientFeederV3 { } setMessageParameters(msg.get(), settings); - final Result result; + Result result; try { result = sendMessage(settings, msg.get(), threadsAvailableForFeeding); @@ -265,8 +264,8 @@ class ClientFeederV3 { } } - private OperationStatus createOperationStatus(String id, String message, ErrorCode code, boolean isConditionNotMet, Message msg) - throws InterruptedException { + private OperationStatus createOperationStatus(String id, String message, + ErrorCode code, boolean isConditionNotMet, Message msg) { String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0 ? msg.getTrace().toString() : ""; diff --git a/vespajlib/src/test/java/com/yahoo/tensor/MatrixDotProductBenchmark.java b/vespajlib/src/test/java/com/yahoo/tensor/MatrixDotProductBenchmark.java new file mode 100644 index 00000000000..439aac5578a --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/tensor/MatrixDotProductBenchmark.java @@ -0,0 +1,90 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.tensor; + +import com.yahoo.tensor.evaluation.MapEvaluationContext; +import com.yahoo.tensor.evaluation.VariableTensor; +import com.yahoo.tensor.functions.ConstantTensor; +import com.yahoo.tensor.functions.Join; +import com.yahoo.tensor.functions.Reduce; +import com.yahoo.tensor.functions.TensorFunction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +/** + * Microbenchmark of a "dot product" of two mapped rank 2 tensors + * + * @author bratseth + */ +public class MatrixDotProductBenchmark { + + private final static Random random = new Random(); + + public double benchmark(int iterations, List<Tensor> modelMatrixes, TensorType.Dimension.Type dimensionType) { + Tensor queryMatrix = matrix(1, 20, dimensionType).get(0); + dotProduct(queryMatrix, modelMatrixes, Math.max(iterations/10, 10)); // warmup + System.gc(); + long startTime = System.currentTimeMillis(); + dotProduct(queryMatrix, modelMatrixes, iterations); + long totalTime = System.currentTimeMillis() - startTime; + return (double)totalTime / (double)iterations; + } + + private double dotProduct(Tensor tensor, List<Tensor> tensors, int iterations) { + double result = 0; + for (int i = 0 ; i < iterations; i++) + result = dotProduct(tensor, tensors); + return result; + } + + private double dotProduct(Tensor tensor, List<Tensor> tensors) { + double largest = Double.MIN_VALUE; + TensorFunction dotProductFunction = new Reduce(new Join(new ConstantTensor(tensor), + new VariableTensor("argument"), (a, b) -> a * b), + Reduce.Aggregator.sum).toPrimitive(); + MapEvaluationContext context = new MapEvaluationContext(); + + for (Tensor tensorElement : tensors) { // tensors.size() = 1 for larger tensor + context.put("argument", tensorElement); + double dotProduct = dotProductFunction.evaluate(context).asDouble(); + if (dotProduct > largest) { + largest = dotProduct; + } + } + return largest; + } + + private static List<Tensor> matrix(int dimension1Size, int dimension2Size, TensorType.Dimension.Type dimensionType) { + TensorType.Builder typeBuilder = new TensorType.Builder(); + addDimension(typeBuilder, "i", dimensionType, dimension1Size); + addDimension(typeBuilder, "j", dimensionType, dimension2Size); + Tensor.Builder builder = Tensor.Builder.of(typeBuilder.build()); + for (int i = 0; i < dimension1Size; i++) { + for (int j = 0; j < dimension2Size; j++) { + builder.cell() + .label("i", String.valueOf("label" + i)) + .label("j", String.valueOf("label" + j)) + .value(random.nextDouble()); + } + } + return Collections.singletonList(builder.build()); + } + + private static void addDimension(TensorType.Builder builder, String name, TensorType.Dimension.Type type, int size) { + switch (type) { + case mapped: builder.mapped(name); break; + case indexedUnbound: builder.indexed(name); break; + case indexedBound: builder.indexed(name, size); break; + default: throw new IllegalArgumentException("Dimension type " + type + " not supported"); + } + } + + public static void main(String[] args) { + double time = new MatrixDotProductBenchmark().benchmark(10000, matrix(10, 55, TensorType.Dimension.Type.mapped), TensorType.Dimension.Type.mapped); + System.out.printf("Matrixes, 10*55 size matrixes. Time per sum(join): %1$8.3f ms\n", time); + } + +} diff --git a/vespajlib/src/test/java/com/yahoo/tensor/TensorFunctionBenchmark.java b/vespajlib/src/test/java/com/yahoo/tensor/TensorFunctionBenchmark.java index abdb3071bf7..7b856dde2d5 100644 --- a/vespajlib/src/test/java/com/yahoo/tensor/TensorFunctionBenchmark.java +++ b/vespajlib/src/test/java/com/yahoo/tensor/TensorFunctionBenchmark.java @@ -107,26 +107,26 @@ public class TensorFunctionBenchmark { double time = 0; // ---------------- Mapped with extra space (sidesteps current special-case optimizations): - // 9.9 ms + // 7.8 ms time = new TensorFunctionBenchmark().benchmark(1000, vectors(100, 300, TensorType.Dimension.Type.mapped), TensorType.Dimension.Type.mapped, true); System.out.printf("Mapped vectors, x space time per join: %1$8.3f ms\n", time); - // 10.5 ms + // 7.7 ms time = new TensorFunctionBenchmark().benchmark(1000, matrix(100, 300, TensorType.Dimension.Type.mapped), TensorType.Dimension.Type.mapped, true); System.out.printf("Mapped matrix, x space time per join: %1$8.3f ms\n", time); // ---------------- Mapped: - // 2.6 ms + // 2.1 ms time = new TensorFunctionBenchmark().benchmark(5000, vectors(100, 300, TensorType.Dimension.Type.mapped), TensorType.Dimension.Type.mapped, false); System.out.printf("Mapped vectors, time per join: %1$8.3f ms\n", time); - // 6.8 ms + // 7.0 ms time = new TensorFunctionBenchmark().benchmark(1000, matrix(100, 300, TensorType.Dimension.Type.mapped), TensorType.Dimension.Type.mapped, false); System.out.printf("Mapped matrix, time per join: %1$8.3f ms\n", time); // ---------------- Indexed (unbound) with extra space (sidesteps current special-case optimizations): - // 30 ms + // 14.5 ms time = new TensorFunctionBenchmark().benchmark(500, vectors(100, 300, TensorType.Dimension.Type.indexedUnbound), TensorType.Dimension.Type.indexedUnbound, true); System.out.printf("Indexed vectors, x space time per join: %1$8.3f ms\n", time); - // 27 ms + // 8.9 ms time = new TensorFunctionBenchmark().benchmark(500, matrix(100, 300, TensorType.Dimension.Type.indexedUnbound), TensorType.Dimension.Type.indexedUnbound, true); System.out.printf("Indexed matrix, x space time per join: %1$8.3f ms\n", time); @@ -134,15 +134,15 @@ public class TensorFunctionBenchmark { // 0.14 ms time = new TensorFunctionBenchmark().benchmark(50000, vectors(100, 300, TensorType.Dimension.Type.indexedUnbound), TensorType.Dimension.Type.indexedUnbound, false); System.out.printf("Indexed unbound vectors, time per join: %1$8.3f ms\n", time); - // 0.14 ms + // 0.44 ms time = new TensorFunctionBenchmark().benchmark(50000, matrix(100, 300, TensorType.Dimension.Type.indexedUnbound), TensorType.Dimension.Type.indexedUnbound, false); System.out.printf("Indexed unbound matrix, time per join: %1$8.3f ms\n", time); // ---------------- Indexed bound: - // 0.14 ms + // 0.32 ms time = new TensorFunctionBenchmark().benchmark(50000, vectors(100, 300, TensorType.Dimension.Type.indexedBound), TensorType.Dimension.Type.indexedBound, false); System.out.printf("Indexed bound vectors, time per join: %1$8.3f ms\n", time); - // 0.14 ms + // 0.44 ms time = new TensorFunctionBenchmark().benchmark(50000, matrix(100, 300, TensorType.Dimension.Type.indexedBound), TensorType.Dimension.Type.indexedBound, false); System.out.printf("Indexed bound matrix, time per join: %1$8.3f ms\n", time); } |