diff options
author | Jon Bratseth <bratseth@gmail.com> | 2022-10-06 23:55:41 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2022-10-06 23:55:41 +0200 |
commit | 8fcec55c82a1035dd17a59eb7bd1b1b65fb16f17 (patch) | |
tree | 4ea47e490d61b657601e338d70dbeb8977c2cdb8 /vespaclient-container-plugin | |
parent | 24c70d22397fad2c2d5d2e8b45d7da664283fd85 (diff) |
Return X-Vespa-Ignored-Fields if fields were ignored
Diffstat (limited to 'vespaclient-container-plugin')
11 files changed, 104 insertions, 110 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index c72bc1ef4c5..2d97c33741e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -12,7 +12,6 @@ import com.yahoo.container.core.documentapi.VespaDocumentAccess; import com.yahoo.container.jdisc.ContentChannelOutputStream; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentOperation; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentRemove; import com.yahoo.document.DocumentTypeManager; @@ -26,6 +25,7 @@ import com.yahoo.document.idstring.IdIdString; import com.yahoo.document.json.DocumentOperationType; import com.yahoo.document.json.JsonReader; import com.yahoo.document.json.JsonWriter; +import com.yahoo.document.json.ParsedDocumentOperation; import com.yahoo.document.restapi.DocumentOperationExecutorConfig; import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.AckToken; @@ -67,6 +67,7 @@ import com.yahoo.restapi.Path; import com.yahoo.search.query.ParameterParser; import com.yahoo.text.Text; import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.vespa.http.server.Headers; import com.yahoo.vespa.http.server.MetricNames; import com.yahoo.yolean.Exceptions; import com.yahoo.yolean.Exceptions.RunnableThrowingIOException; @@ -215,7 +216,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.operations = new ConcurrentLinkedDeque<>(); long resendDelayMS = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(executorConfig.resendDelayMillis())).toMillis(); - // TODO: Here it would be better do have dedicated threads with different wait depending on blocked or empty. + // TODO: Here it would be better to have dedicated threads with different wait depending on blocked or empty. this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, resendDelayMS, resendDelayMS, MILLISECONDS); } @@ -238,7 +239,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { MILLISECONDS); Path requestPath = Path.withoutValidation(request.getUri()); // No segment validation here, as document IDs can be anything. - for (String path : handlers.keySet()) + for (String path : handlers.keySet()) { if (requestPath.matches(path)) { Map<Method, Handler> methods = handlers.get(path); if (methods.containsKey(request.getMethod())) @@ -249,6 +250,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { methodNotAllowed(request, methods.keySet(), responseHandler); } + } notFound(request, handlers.keySet(), responseHandler); } catch (IllegalArgumentException e) { @@ -398,10 +400,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { parameters.setFieldSet(DocIdOnly.NAME); String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates")); IdIdString dummyId = new IdIdString("dummy", type, "", ""); - DocumentUpdate update = parser.parseUpdate(in, dummyId.toString()); - update.setCondition(new TestAndSetCondition(requireProperty(request, SELECTION))); + ParsedDocumentOperation update = parser.parseUpdate(in, dummyId.toString()); + update.operation().setCondition(new TestAndSetCondition(requireProperty(request, SELECTION))); return () -> { - visitAndUpdate(request, parameters, handler, update, cluster.name()); + visitAndUpdate(request, parameters, update.fullyApplied(), handler, (DocumentUpdate)update.operation(), cluster.name()); return true; // VisitorSession has its own throttle handling. }; }); @@ -448,21 +450,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant()); if (getProperty(request, DRY_RUN, booleanParser).orElse(false)) { - handleFeedOperation(path, handler, new com.yahoo.documentapi.Response(-1)); + handleFeedOperation(path, true, handler, new com.yahoo.documentapi.Response(-1)); return ignoredContent; } return new ForwardingContentChannel(in -> { enqueueAndDispatch(request, handler, () -> { - DocumentPut put = parser.parsePut(in, path.id().toString()); - getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition); + ParsedDocumentOperation put = parser.parsePut(in, path.id().toString()); + getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(c -> put.operation().setCondition(c)); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) .withResponseHandler(response -> { outstanding.decrementAndGet(); updatePutMetrics(response.outcome()); - handleFeedOperation(path, handler, response); + handleFeedOperation(path, put.fullyApplied(), handler, response); }); - return () -> dispatchOperation(() -> asyncSession.put(put, parameters)); + return () -> dispatchOperation(() -> asyncSession.put((DocumentPut)put.operation(), parameters)); }); }); } @@ -470,20 +472,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.UPDATE, clock.instant()); if (getProperty(request, DRY_RUN, booleanParser).orElse(false)) { - handleFeedOperation(path, handler, new com.yahoo.documentapi.Response(-1)); + handleFeedOperation(path, true, handler, new com.yahoo.documentapi.Response(-1)); return ignoredContent; } return new ForwardingContentChannel(in -> { enqueueAndDispatch(request, handler, () -> { - DocumentUpdate update = parser.parseUpdate(in, path.id().toString()); + ParsedDocumentOperation parsed = parser.parseUpdate(in, path.id().toString()); + DocumentUpdate update = (DocumentUpdate)parsed.operation(); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition); getProperty(request, CREATE, booleanParser).ifPresent(update::setCreateIfNonExistent); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) .withResponseHandler(response -> { outstanding.decrementAndGet(); updateUpdateMetrics(response.outcome(), update.getCreateIfNonExistent()); - handleFeedOperation(path, handler, response); + handleFeedOperation(path, parsed.fullyApplied(), handler, response); }); return () -> dispatchOperation(() -> asyncSession.update(update, parameters)); }); @@ -493,7 +496,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.REMOVE, clock.instant()); if (getProperty(request, DRY_RUN, booleanParser).orElse(false)) { - handleFeedOperation(path, handler, new com.yahoo.documentapi.Response(-1)); + handleFeedOperation(path, true, handler, new com.yahoo.documentapi.Response(-1)); return ignoredContent; } @@ -504,7 +507,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { .withResponseHandler(response -> { outstanding.decrementAndGet(); updateRemoveMetrics(response.outcome()); - handleFeedOperation(path, handler, response); + handleFeedOperation(path, true, handler, response); }); return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters)); }); @@ -659,10 +662,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return response; } - /** Commits a response with the given status code and some default headers, and writes whatever content is buffered. */ synchronized void commit(int status) throws IOException { + commit(status, true); + } + + /** Commits a response with the given status code and some default headers, and writes whatever content is buffered. */ + synchronized void commit(int status, boolean fullyApplied) throws IOException { Response response = new Response(status); - response.headers().addAll(Map.of("Content-Type", List.of("application/json; charset=UTF-8"))); + response.headers().add("Content-Type", List.of("application/json; charset=UTF-8")); + if (! fullyApplied) + response.headers().add(Headers.IGNORED_FIELDS, "true"); try { channel = handler.handleResponse(response); buffer.connectTo(channel); @@ -1023,15 +1032,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.manager = new DocumentTypeManager(config); } - DocumentPut parsePut(InputStream inputStream, String docId) { - return (DocumentPut) parse(inputStream, docId, DocumentOperationType.PUT); + ParsedDocumentOperation parsePut(InputStream inputStream, String docId) { + return parse(inputStream, docId, DocumentOperationType.PUT); } - DocumentUpdate parseUpdate(InputStream inputStream, String docId) { - return (DocumentUpdate) parse(inputStream, docId, DocumentOperationType.UPDATE); + ParsedDocumentOperation parseUpdate(InputStream inputStream, String docId) { + return parse(inputStream, docId, DocumentOperationType.UPDATE); } - private DocumentOperation parse(InputStream inputStream, String docId, DocumentOperationType operation) { + private ParsedDocumentOperation parse(InputStream inputStream, String docId, DocumentOperationType operation) { return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId); } @@ -1041,7 +1050,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { void onSuccess(Document document, JsonResponse response) throws IOException; } - private static void handle(DocumentPath path, HttpRequest request, ResponseHandler handler, com.yahoo.documentapi.Response response, SuccessCallback callback) { + private static void handle(DocumentPath path, + HttpRequest request, + ResponseHandler handler, + com.yahoo.documentapi.Response response, + SuccessCallback callback) { try (JsonResponse jsonResponse = JsonResponse.create(path, handler, request)) { jsonResponse.writeTrace(response.getTrace()); if (response.isSuccess()) @@ -1049,25 +1062,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { else { jsonResponse.writeMessage(response.getTextMessage()); switch (response.outcome()) { - case NOT_FOUND: - jsonResponse.commit(Response.Status.NOT_FOUND); - break; - case CONDITION_FAILED: - jsonResponse.commit(Response.Status.PRECONDITION_FAILED); - break; - case INSUFFICIENT_STORAGE: - jsonResponse.commit(Response.Status.INSUFFICIENT_STORAGE); - break; - case TIMEOUT: - jsonResponse.commit(Response.Status.GATEWAY_TIMEOUT); - break; - case ERROR: + case NOT_FOUND -> jsonResponse.commit(Response.Status.NOT_FOUND); + case CONDITION_FAILED -> jsonResponse.commit(Response.Status.PRECONDITION_FAILED); + case INSUFFICIENT_STORAGE -> jsonResponse.commit(Response.Status.INSUFFICIENT_STORAGE); + case TIMEOUT -> jsonResponse.commit(Response.Status.GATEWAY_TIMEOUT); + case ERROR -> { log.log(FINE, () -> "Exception performing document operation: " + response.getTextMessage()); jsonResponse.commit(Response.Status.BAD_GATEWAY); - break; - default: + } + default -> { log.log(WARNING, "Unexpected document API operation outcome '" + response.outcome() + "' " + response.getTextMessage()); jsonResponse.commit(Response.Status.BAD_GATEWAY); + } } } } @@ -1076,8 +1082,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } } - private static void handleFeedOperation(DocumentPath path, ResponseHandler handler, com.yahoo.documentapi.Response response) { - handle(path, null, handler, response, (document, jsonResponse) -> jsonResponse.commit(Response.Status.OK)); + private static void handleFeedOperation(DocumentPath path, + boolean fullyApplied, + ResponseHandler handler, + com.yahoo.documentapi.Response response) { + handle(path, null, handler, response, (document, jsonResponse) -> jsonResponse.commit(Response.Status.OK, fullyApplied)); } private void updatePutMetrics(Outcome outcome) { @@ -1188,7 +1197,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private interface VisitCallback { /** Called at the start of response rendering. */ - default void onStart(JsonResponse response) throws IOException { } + default void onStart(JsonResponse response, boolean fullyApplied) throws IOException { } /** Called for every document received from backend visitors—must call the ack for these to proceed. */ default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { } @@ -1199,25 +1208,26 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, TestAndSetCondition condition, String route) { - visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + visitAndProcess(request, parameters, true, handler, route, (id, operationParameters) -> { DocumentRemove remove = new DocumentRemove(id); remove.setCondition(condition); return asyncSession.remove(remove, operationParameters); }); } - private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, - DocumentUpdate protoUpdate, String route) { - visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, boolean fullyApplied, + ResponseHandler handler, DocumentUpdate protoUpdate, String route) { + visitAndProcess(request, parameters, fullyApplied, handler, route, (id, operationParameters) -> { DocumentUpdate update = new DocumentUpdate(protoUpdate); update.setId(id); return asyncSession.update(update, operationParameters); }); } - private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + private void visitAndProcess(HttpRequest request, VisitorParameters parameters, boolean fullyApplied, + ResponseHandler handler, String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { - visit(request, parameters, false, handler, new VisitCallback() { + visit(request, parameters, false, fullyApplied, handler, new VisitCallback() { @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { DocumentOperationParameters operationParameters = parameters().withRoute(route) .withResponseHandler(operationResponse -> { @@ -1255,10 +1265,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streamed) { - visit(request, parameters, streamed, handler, new VisitCallback() { - @Override public void onStart(JsonResponse response) throws IOException { + visit(request, parameters, streamed, true, handler, new VisitCallback() { + @Override public void onStart(JsonResponse response, boolean fullyApplied) throws IOException { if (streamed) - response.commit(Response.Status.OK); + response.commit(Response.Status.OK, fullyApplied); response.writeDocumentsArrayStart(); } @@ -1288,16 +1298,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { - visit(request, parameters, false, handler, new VisitCallback() { }); + visit(request, parameters, false, true, handler, new VisitCallback() { }); } @SuppressWarnings("fallthrough") - private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) { + private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, boolean fullyApplied, ResponseHandler handler, VisitCallback callback) { try { JsonResponse response = JsonResponse.create(request, handler); Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. - callback.onStart(response); + callback.onStart(response, fullyApplied); VisitorControlHandler controller = new VisitorControlHandler() { final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(MILLISECONDS), MILLISECONDS) : null; @Override public void onDone(CompletionCode code, String message) { @@ -1332,7 +1342,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); } if ( ! streaming) - response.commit(status); + response.commit(status, fullyApplied); } }); if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion. diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java index 8ea9234009d..438248f31a7 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java @@ -4,7 +4,6 @@ package com.yahoo.vespa.http.server; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.document.DocumentTypeManager; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.ReferencedResource; import com.yahoo.jdisc.ResourceReference; @@ -53,13 +52,12 @@ class ClientFeederV3 { private final AtomicInteger ongoingRequests = new AtomicInteger(0); private final String hostName; - ClientFeederV3( - ReferencedResource<SharedSourceSession> sourceSession, - FeedReaderFactory feedReaderFactory, - DocumentTypeManager docTypeManager, - String clientId, - Metric metric, - ReplyHandler feedReplyHandler) { + ClientFeederV3(ReferencedResource<SharedSourceSession> sourceSession, + FeedReaderFactory feedReaderFactory, + DocumentTypeManager docTypeManager, + String clientId, + Metric metric, + ReplyHandler feedReplyHandler) { this.sourceSession = sourceSession; this.clientId = clientId; this.feedReplyHandler = feedReplyHandler; @@ -220,10 +218,7 @@ class ClientFeederV3 { // This is a bit hard to set up while testing, so we accept that things are not perfect. if (sourceSession.getResource().session() != null) { - metric.set( - MetricNames.PENDING, - Double.valueOf(sourceSession.getResource().session().getPendingCount()), - null); + metric.set(MetricNames.PENDING, (double) sourceSession.getResource().session().getPendingCount(), null); } DocumentOperationMessageV3 message = DocumentOperationMessageV3.create(operation, operationId, metric); @@ -231,7 +226,7 @@ class ClientFeederV3 { // typical end of feed return null; } - metric.add(MetricNames.NUM_OPERATIONS, 1, null /*metricContext*/); + metric.add(MetricNames.NUM_OPERATIONS, 1, null); log(Level.FINE, "Successfully deserialized document id: ", message.getOperationId()); return message; } @@ -241,14 +236,6 @@ class ClientFeederV3 { if (settings.traceLevel != null) { msg.getMessage().getTrace().setLevel(settings.traceLevel); } - if (settings.priority != null) { - try { - DocumentProtocol.Priority priority = DocumentProtocol.Priority.valueOf(settings.priority); - } - catch (IllegalArgumentException i) { - log.severe(i.getMessage()); - } - } } private void setRoute(DocumentOperationMessageV3 msg, FeederSettings settings) { @@ -272,7 +259,7 @@ class ClientFeederV3 { if (now.plusSeconds(1).isAfter(prevOpsPerSecTime)) { Duration duration = Duration.between(now, prevOpsPerSecTime); double opsPerSec = operationsForOpsPerSec / (duration.toMillis() / 1000.); - metric.set(MetricNames.OPERATIONS_PER_SEC, opsPerSec, null /*metricContext*/); + metric.set(MetricNames.OPERATIONS_PER_SEC, opsPerSec, null); operationsForOpsPerSec = 1.0d; prevOpsPerSecTime = now; } else { @@ -280,4 +267,5 @@ class ClientFeederV3 { } } } + } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java index 25bf5815907..a12fe4efa8b 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java @@ -14,8 +14,6 @@ import com.yahoo.vespaxmlparser.FeedOperation; /** * Keeps an operation with its message. * - * This implementation is based on V2, but the code is restructured. - * * @author dybis */ class DocumentOperationMessageV3 { @@ -66,13 +64,13 @@ class DocumentOperationMessageV3 { static DocumentOperationMessageV3 create(FeedOperation operation, String operationId, Metric metric) { switch (operation.getType()) { case DOCUMENT: - metric.add(MetricNames.NUM_PUTS, 1, null /*metricContext*/); + metric.add(MetricNames.NUM_PUTS, 1, null); return newPutMessage(operation, operationId); case REMOVE: - metric.add(MetricNames.NUM_REMOVES, 1, null /*metricContext*/); + metric.add(MetricNames.NUM_REMOVES, 1, null); return newRemoveMessage(operation, operationId); case UPDATE: - metric.add(MetricNames.NUM_UPDATES, 1, null /*metricContext*/); + metric.add(MetricNames.NUM_UPDATES, 1, null); return newUpdateMessage(operation, operationId); default: // typical end of feed diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java index 74665d60a04..3c1f376b4eb 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java @@ -29,14 +29,14 @@ import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; /** - * Accept feeds from outside of the Vespa cluster. + * Accept feeds from outside the Vespa cluster. * * @author Steinar Knutsen */ public class FeedHandler extends ThreadedHttpRequestHandler { protected final ReplyHandler feedReplyHandler; - private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(3)); + private static final List<Integer> serverSupportedVersions = List.of(3); private static final Pattern USER_AGENT_PATTERN = Pattern.compile("vespa-http-client \\((.+)\\)"); private final FeedHandlerV3 feedHandlerV3; private final DocumentApiMetrics metricsHelper; @@ -144,4 +144,5 @@ public class FeedHandler extends ThreadedHttpRequestHandler { } @Override protected void destroy() { feedHandlerV3.destroy(); } + } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java index f9ae04623e6..4de3eebec2d 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java @@ -24,9 +24,8 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * This code is based on v2 code, however, in v3, one client has one ClientFeederV3 shared between all client threads. - * The new API has more logic for shutting down cleanly as the server is more likely to be upgraded. - * The code is restructured a bit. + * One client has one ClientFeederV3 shared between all client threads. + * Contains logic for shutting down cleanly as the server is upgraded. * * @author dybis */ @@ -60,7 +59,7 @@ public class FeedHandlerV3 extends ThreadedHttpRequestHandler { } // TODO: If this is set up to run without first invoking the old FeedHandler code, we should - // verify the version header first. This is done in the old code. + // verify the version header first. @Override public HttpResponse handle(HttpRequest request) { String clientId = clientId(request); @@ -70,7 +69,7 @@ public class FeedHandlerV3 extends ThreadedHttpRequestHandler { SourceSessionParams sourceSessionParams = sourceSessionParams(request); clientFeederByClientId.put(clientId, new ClientFeederV3(retainSource(sessionCache, sourceSessionParams), - new FeedReaderFactory(true), //TODO make error debugging configurable + new FeedReaderFactory(true), // TODO: Make error debugging configurable docTypeManager, clientId, metric, diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java index 9bb8a58d6f6..a8175a48a39 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java @@ -17,14 +17,12 @@ public class FeederSettings { public final boolean drain; // TODO: Implement drain=true public final Route route; public final FeedParams.DataFormat dataFormat; - public final String priority; public final Integer traceLevel; public FeederSettings(HttpRequest request) { this.drain = Optional.ofNullable(request.getHeader(Headers.DRAIN)).map(Boolean::parseBoolean).orElse(false); this.route = Optional.ofNullable(request.getHeader(Headers.ROUTE)).map(Route::parse).orElse(DEFAULT_ROUTE); this.dataFormat = Optional.ofNullable(request.getHeader(Headers.DATA_FORMAT)).map(FeedParams.DataFormat::valueOf).orElse(FeedParams.DataFormat.JSON_UTF8); - this.priority = request.getHeader(Headers.PRIORITY); this.traceLevel = Optional.ofNullable(request.getHeader(Headers.TRACE_LEVEL)).map(Integer::valueOf).orElse(null); } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Headers.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Headers.java index 16bff38af4b..657c22ba7ee 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Headers.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Headers.java @@ -6,7 +6,7 @@ package com.yahoo.vespa.http.server; * * @author Steinar Knutsen */ -final class Headers { +public final class Headers { private Headers() { } @@ -23,7 +23,6 @@ final class Headers { // This value can be used to route the request to a specific server when using // several servers. It is a random value that is the same for the whole session. public static final String SHARDING_KEY = "X-Yahoo-Feed-Sharding-Key"; - public static final String PRIORITY = "X-Yahoo-Feed-Priority"; public static final String TRACE_LEVEL = "X-Yahoo-Feed-Trace-Level"; public static final int HTTP_NOT_ACCEPTABLE = 406; @@ -34,4 +33,8 @@ final class Headers { public static final String HOSTNAME = "X-Yahoo-Hostname"; public static final String SILENTUPGRADE = "X-Yahoo-Silent-Upgrade"; + // A response header present and set to "true" onlynif any fields of a document operation were ignored + // because they were not declared in the target document type. + public static final String IGNORED_FIELDS = "X-Vespa-Ignored-Fields"; + } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java index c2c6d00fa25..3d82919d503 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java @@ -10,17 +10,15 @@ import com.yahoo.vespaxmlparser.FeedReader; import java.io.IOException; import java.io.InputStream; import java.util.Optional; -import java.util.logging.Logger; import java.util.zip.GZIPInputStream; /** * This code is based on v2 code, but restructured so stream reading code is in one dedicated class. + * * @author dybis */ public class StreamReaderV3 { - protected static final Logger log = Logger.getLogger(StreamReaderV3.class.getName()); - private final FeedReaderFactory feedReaderFactory; private final DocumentTypeManager docTypeManager; @@ -30,15 +28,11 @@ public class StreamReaderV3 { } public FeedOperation getNextOperation(InputStream requestInputStream, FeederSettings settings) throws Exception { - FeedOperation op = null; - int length = readByteLength(requestInputStream); - try (InputStream limitedInputStream = new ByteLimitedInputStream(requestInputStream, length)){ FeedReader reader = feedReaderFactory.createReader(limitedInputStream, docTypeManager, settings.dataFormat); - op = reader.read(); + return reader.read(); } - return op; } public Optional<String> getNextOperationId(InputStream requestInputStream) throws IOException { @@ -48,7 +42,7 @@ public class StreamReaderV3 { if (c == 32) { break; } - idBuf.append((char) c); //it's ASCII + idBuf.append((char) c); // it's ASCII } if (idBuf.length() == 0) { return Optional.empty(); @@ -63,7 +57,7 @@ public class StreamReaderV3 { if (c == 10) { break; } - lenBuf.append((char) c); //it's ASCII + lenBuf.append((char) c); // it's ASCII } if (lenBuf.length() == 0) { throw new IllegalStateException("Operation length missing."); @@ -71,9 +65,8 @@ public class StreamReaderV3 { return Integer.valueOf(lenBuf.toString(), 16); } - public static InputStream unzipStreamIfNeeded(final HttpRequest httpRequest) - throws IOException { - final String contentEncodingHeader = httpRequest.getHeader("content-encoding"); + public static InputStream unzipStreamIfNeeded(final HttpRequest httpRequest) throws IOException { + String contentEncodingHeader = httpRequest.getHeader("content-encoding"); if ("gzip".equals(contentEncodingHeader)) { return new GZIPInputStream(httpRequest.getData()); } else { diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java index ea01137d9af..3678a0b9fac 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java @@ -1,6 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * Server side of programmatic API for feeding into Vespa from outside of the + * Server side of programmatic API for feeding into Vespa from outside the * clusters. Not a public API, not meant for direct use. */ @com.yahoo.api.annotations.PackageMarker diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 7f77ce9d0d5..cd57818e74e 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -113,7 +113,8 @@ public class DocumentV1ApiTest { .maxThrottled(2) .resendDelayMillis(1 << 30) .build(); - final DocumentmanagerConfig docConfig = Deriver.getDocumentManagerConfig("src/test/cfg/music.sd").build(); + final DocumentmanagerConfig docConfig = Deriver.getDocumentManagerConfig("src/test/cfg/music.sd") + .ignoreundefinedfields(true).build(); final DocumentTypeManager manager = new DocumentTypeManager(docConfig); final Document doc1 = new Document(manager.getDocumentType("music"), "id:space:music::one"); final Document doc2 = new Document(manager.getDocumentType("music"), "id:space:music:n=1:two"); @@ -330,6 +331,7 @@ public class DocumentV1ApiTest { " \"message\": \"failure?\"" + "}", response.readAll()); assertEquals(200, response.getStatus()); + assertNull(response.getResponse().headers().get("X-Vespa-Ignored-Fields")); // POST with namespace and document type is a restricted visit with a required destination cluster ("destinationCluster") access.expect(parameters -> { @@ -376,13 +378,15 @@ public class DocumentV1ApiTest { response = driver.sendRequest("http://localhost/document/v1/space/music/docid?selection=true&cluster=content&timeChunk=10", PUT, "{" + " \"fields\": {" + - " \"artist\": { \"assign\": \"Lisa Ekdahl\" }" + + " \"artist\": { \"assign\": \"Lisa Ekdahl\" }, " + + " \"nonexisting\": { \"assign\": \"Ignored\" }" + " }" + "}"); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/docid\"" + "}", response.readAll()); assertEquals(200, response.getStatus()); + assertEquals("true", response.getResponse().headers().get("X-Vespa-Ignored-Fields").get(0).toString()); // PUT with namespace, document type and group is also a restricted visit which requires a cluster. access.expect(parameters -> { diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java index 5b8b5b1827f..dbbe664c9f8 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class FeedHandlerV3Test { + final CollectingMetric metric = new CollectingMetric(); private final Executor simpleThreadpool = Executors.newCachedThreadPool(); @@ -101,7 +102,6 @@ public class FeedHandlerV3Test { request.getJDiscRequest().headers().add(Headers.DATA_FORMAT, FeedParams.DataFormat.JSON_UTF8.name()); request.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000"); request.getJDiscRequest().headers().add(Headers.CLIENT_ID, "client123"); - request.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST"); request.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4"); request.getJDiscRequest().headers().add(Headers.DRAIN, "true"); return request; |