aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java3
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java287
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java40
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java83
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java24
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java148
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java152
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java62
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java94
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java88
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java33
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java (renamed from vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java)2
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java25
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java85
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java14
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java7
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java99
17 files changed, 1244 insertions, 2 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index ed068c77e11..8c2e39d595e 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -20,6 +20,7 @@ import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.document.fieldset.AllFields;
import com.yahoo.document.fieldset.DocIdOnly;
import com.yahoo.document.fieldset.DocumentOnly;
import com.yahoo.document.idstring.IdIdString;
@@ -45,7 +46,6 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.documentapi.metrics.DocumentOperationStatus;
-import com.yahoo.documentapi.metrics.MetricNames;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
@@ -68,6 +68,7 @@ import com.yahoo.restapi.Path;
import com.yahoo.search.query.ParameterParser;
import com.yahoo.text.Text;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.vespa.http.server.MetricNames;
import com.yahoo.yolean.Exceptions;
import com.yahoo.yolean.Exceptions.RunnableThrowingIOException;
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
new file mode 100644
index 00000000000..875ff3e5bf0
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
@@ -0,0 +1,287 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Result;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.net.HostName;
+import com.yahoo.vespa.http.client.core.ErrorCode;
+import com.yahoo.vespa.http.client.core.Headers;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+import com.yahoo.vespaxmlparser.FeedOperation;
+import com.yahoo.yolean.Exceptions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An instance of this class handles all requests from one client using VespaHttpClient.
+ *
+ * The implementation is based on the code from V2, but the object model is rewritten to simplify the logic and
+ * avoid using a threadpool that has no effect with all the extra that comes with it. V2 has one instance per thread
+ * on the client, while this is one instance for all threads.
+ *
+ * @author dybis
+ */
+class ClientFeederV3 {
+
+ protected static final Logger log = Logger.getLogger(ClientFeederV3.class.getName());
+ // This is for all clients on this gateway, for load balancing from client.
+ private final static AtomicInteger outstandingOperations = new AtomicInteger(0);
+ private final BlockingQueue<OperationStatus> feedReplies = new LinkedBlockingQueue<>();
+ private final ReferencedResource<SharedSourceSession> sourceSession;
+ private final String clientId;
+ private final ReplyHandler feedReplyHandler;
+ private final Metric metric;
+ private Instant prevOpsPerSecTime = Instant.now();
+ private double operationsForOpsPerSec = 0d;
+ private final Object monitor = new Object();
+ private final StreamReaderV3 streamReaderV3;
+ private final AtomicInteger ongoingRequests = new AtomicInteger(0);
+ private final String hostName;
+
+ ClientFeederV3(
+ ReferencedResource<SharedSourceSession> sourceSession,
+ FeedReaderFactory feedReaderFactory,
+ DocumentTypeManager docTypeManager,
+ String clientId,
+ Metric metric,
+ ReplyHandler feedReplyHandler) {
+ this.sourceSession = sourceSession;
+ this.clientId = clientId;
+ this.feedReplyHandler = feedReplyHandler;
+ this.metric = metric;
+ this.streamReaderV3 = new StreamReaderV3(feedReaderFactory, docTypeManager);
+ this.hostName = HostName.getLocalhost();
+ }
+
+ boolean timedOut() {
+ synchronized (monitor) {
+ return Instant.now().isAfter(prevOpsPerSecTime.plusSeconds(6000)) && ongoingRequests.get() == 0;
+ }
+ }
+
+ void kill() {
+ try (ResourceReference ignored = sourceSession.getReference()) {
+ // No new requests should be sent to this object, but there can be old one, even though this is very unlikely.
+ while (ongoingRequests.get() > 0) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Failed to close reference to source session", e);
+ }
+ }
+
+ private void transferPreviousRepliesToResponse(BlockingQueue<OperationStatus> operations) throws InterruptedException {
+ OperationStatus status = feedReplies.poll();
+ while (status != null) {
+ outstandingOperations.decrementAndGet();
+ operations.put(status);
+ status = feedReplies.poll();
+ }
+ }
+
+ HttpResponse handleRequest(HttpRequest request) throws IOException {
+ ongoingRequests.incrementAndGet();
+ try {
+ FeederSettings feederSettings = new FeederSettings(request);
+ InputStream inputStream = StreamReaderV3.unzipStreamIfNeeded(request);
+ BlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<>();
+ try {
+ feed(feederSettings, inputStream, replies);
+ synchronized (monitor) {
+ // Handshake requests do not have DATA_FORMAT, we do not want to give responses to
+ // handshakes as it won't be processed by the client.
+ if (request.getJDiscRequest().headers().get(Headers.DATA_FORMAT) != null) {
+ transferPreviousRepliesToResponse(replies);
+ }
+ }
+ } catch (InterruptedException e) {
+ log.log(Level.FINE, e, () -> "Feed handler was interrupted: " + e.getMessage());
+ // NOP, just terminate
+ } catch (Throwable e) {
+ log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e);
+ } finally {
+ replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null));
+ }
+ return new FeedResponse(200, replies, 3, clientId, outstandingOperations.get(), hostName);
+ } finally {
+ ongoingRequests.decrementAndGet();
+ }
+ }
+
+ private Optional<DocumentOperationMessageV3> pullMessageFromRequest(FeederSettings settings,
+ InputStream requestInputStream,
+ BlockingQueue<OperationStatus> repliesFromOldMessages) {
+ while (true) {
+ Optional<String> operationId;
+ try {
+ operationId = streamReaderV3.getNextOperationId(requestInputStream);
+ if (operationId.isEmpty()) return Optional.empty();
+ } catch (IOException ioe) {
+ log.log(Level.FINE, () -> Exceptions.toMessageString(ioe));
+ return Optional.empty();
+ }
+
+ try {
+ DocumentOperationMessageV3 message = getNextMessage(operationId.get(), requestInputStream, settings);
+ if (message != null)
+ setRoute(message, settings);
+ return Optional.ofNullable(message);
+ } catch (Exception e) {
+ log.log(Level.WARNING, () -> Exceptions.toMessageString(e));
+ metric.add(MetricNames.PARSE_ERROR, 1, null);
+
+ repliesFromOldMessages.add(new OperationStatus(Exceptions.toMessageString(e),
+ operationId.get(),
+ ErrorCode.ERROR,
+ false,
+ ""));
+ }
+ }
+ }
+
+ private Result sendMessage(DocumentOperationMessageV3 msg) throws InterruptedException {
+ msg.getMessage().pushHandler(feedReplyHandler);
+ return sourceSession.getResource().sendMessageBlocking(msg.getMessage());
+ }
+
+ private void feed(FeederSettings settings,
+ InputStream requestInputStream,
+ BlockingQueue<OperationStatus> repliesFromOldMessages) throws InterruptedException {
+ while (true) {
+ Optional<DocumentOperationMessageV3> message = pullMessageFromRequest(settings,
+ requestInputStream,
+ repliesFromOldMessages);
+
+ if (message.isEmpty()) break;
+ setMessageParameters(message.get(), settings);
+
+ Result result;
+ try {
+ result = sendMessage(message.get());
+
+ } catch (RuntimeException e) {
+ repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
+ Exceptions.toMessageString(e),
+ ErrorCode.ERROR,
+ message.get().getMessage()));
+ continue;
+ }
+
+ if (result.isAccepted()) {
+ outstandingOperations.incrementAndGet();
+ updateOpsPerSec();
+ log(Level.FINE, "Sent message successfully, document id: ", message.get().getOperationId());
+ } else if (!result.getError().isFatal()) {
+ repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
+ result.getError().getMessage(),
+ ErrorCode.TRANSIENT_ERROR,
+ message.get().getMessage()));
+ } else {
+ repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
+ result.getError().getMessage(),
+ ErrorCode.ERROR,
+ message.get().getMessage()));
+ }
+ }
+ }
+
+ private OperationStatus createOperationStatus(String id, String message, ErrorCode code, Message msg) {
+ String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0
+ ? msg.getTrace().toString()
+ : "";
+ return new OperationStatus(message, id, code, false, traceMessage);
+ }
+
+ // protected for mocking
+ /** Returns the next message in the stream, or null if none */
+ protected DocumentOperationMessageV3 getNextMessage(String operationId,
+ InputStream requestInputStream,
+ FeederSettings settings) throws Exception {
+ FeedOperation operation = streamReaderV3.getNextOperation(requestInputStream, settings);
+
+ // This is a bit hard to set up while testing, so we accept that things are not perfect.
+ if (sourceSession.getResource().session() != null) {
+ metric.set(
+ MetricNames.PENDING,
+ Double.valueOf(sourceSession.getResource().session().getPendingCount()),
+ null);
+ }
+
+ DocumentOperationMessageV3 message = DocumentOperationMessageV3.create(operation, operationId, metric);
+ if (message == null) {
+ // typical end of feed
+ return null;
+ }
+ metric.add(MetricNames.NUM_OPERATIONS, 1, null /*metricContext*/);
+ log(Level.FINE, "Successfully deserialized document id: ", message.getOperationId());
+ return message;
+ }
+
+ private void setMessageParameters(DocumentOperationMessageV3 msg, FeederSettings settings) {
+ msg.getMessage().setContext(new ReplyContext(msg.getOperationId(), feedReplies));
+ if (settings.traceLevel != null) {
+ msg.getMessage().getTrace().setLevel(settings.traceLevel);
+ }
+ if (settings.priority != null) {
+ try {
+ DocumentProtocol.Priority priority = DocumentProtocol.Priority.valueOf(settings.priority);
+ }
+ catch (IllegalArgumentException i) {
+ log.severe(i.getMessage());
+ }
+ }
+ }
+
+ private void setRoute(DocumentOperationMessageV3 msg, FeederSettings settings) {
+ if (settings.route != null) {
+ msg.getMessage().setRoute(settings.route);
+ }
+ }
+
+ protected final void log(Level level, Object... msgParts) {
+ if (!log.isLoggable(level)) return;
+
+ StringBuilder s = new StringBuilder();
+ for (Object part : msgParts)
+ s.append(part.toString());
+ log.log(level, s.toString());
+ }
+
+ private void updateOpsPerSec() {
+ Instant now = Instant.now();
+ synchronized (monitor) {
+ if (now.plusSeconds(1).isAfter(prevOpsPerSecTime)) {
+ Duration duration = Duration.between(now, prevOpsPerSecTime);
+ double opsPerSec = operationsForOpsPerSec / (duration.toMillis() / 1000.);
+ metric.set(MetricNames.OPERATIONS_PER_SEC, opsPerSec, null /*metricContext*/);
+ operationsForOpsPerSec = 1.0d;
+ prevOpsPerSecTime = now;
+ } else {
+ operationsForOpsPerSec += 1.0d;
+ }
+ }
+ }
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java
new file mode 100644
index 00000000000..13a12f707d9
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java
@@ -0,0 +1,40 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The state of a client session, used to save replies when client disconnects.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class ClientState {
+
+ public final int pending;
+ public final long creationTime;
+ public final BlockingQueue<OperationStatus> feedReplies;
+ public final ReferencedResource<SharedSourceSession> sourceSession;
+ public final Metric.Context metricContext;
+
+ public final long prevOpsPerSecTime; // previous measurement time of OPS
+ public final double operationsForOpsPerSec;
+
+ public ClientState(int pending, BlockingQueue<OperationStatus> feedReplies,
+ ReferencedResource<SharedSourceSession> sourceSession, Metric.Context metricContext,
+ long prevOpsPerSecTime, double operationsForOpsPerSec) {
+ super();
+ this.pending = pending;
+ this.feedReplies = feedReplies;
+ this.sourceSession = sourceSession;
+ this.metricContext = metricContext;
+ creationTime = System.currentTimeMillis();
+ this.prevOpsPerSecTime = prevOpsPerSecTime;
+ this.operationsForOpsPerSec = operationsForOpsPerSec;
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java
new file mode 100644
index 00000000000..25bf5815907
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java
@@ -0,0 +1,83 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.messagebus.Message;
+import com.yahoo.vespaxmlparser.FeedOperation;
+
+/**
+ * Keeps an operation with its message.
+ *
+ * This implementation is based on V2, but the code is restructured.
+ *
+ * @author dybis
+ */
+class DocumentOperationMessageV3 {
+
+ private final String operationId;
+ private final Message message;
+
+ private DocumentOperationMessageV3(String operationId, Message message) {
+ this.operationId = operationId;
+ this.message = message;
+ }
+
+ Message getMessage() {
+ return message;
+ }
+
+ String getOperationId() {
+ return operationId;
+ }
+
+ private static DocumentOperationMessageV3 newUpdateMessage(FeedOperation op, String operationId) {
+ DocumentUpdate update = op.getDocumentUpdate();
+ update.setCondition(op.getCondition());
+ Message msg = new UpdateDocumentMessage(update);
+
+ String id = (operationId == null) ? update.getId().toString() : operationId;
+ return new DocumentOperationMessageV3(id, msg);
+ }
+
+ static DocumentOperationMessageV3 newRemoveMessage(FeedOperation op, String operationId) {
+ DocumentRemove remove = new DocumentRemove(op.getRemove());
+ remove.setCondition(op.getCondition());
+ Message msg = new RemoveDocumentMessage(remove);
+
+ String id = (operationId == null) ? remove.getId().toString() : operationId;
+ return new DocumentOperationMessageV3(id, msg);
+ }
+
+ private static DocumentOperationMessageV3 newPutMessage(FeedOperation op, String operationId) {
+ DocumentPut put = new DocumentPut(op.getDocument());
+ put.setCondition(op.getCondition());
+ Message msg = new PutDocumentMessage(put);
+
+ String id = (operationId == null) ? put.getId().toString() : operationId;
+ return new DocumentOperationMessageV3(id, msg);
+ }
+
+ static DocumentOperationMessageV3 create(FeedOperation operation, String operationId, Metric metric) {
+ switch (operation.getType()) {
+ case DOCUMENT:
+ metric.add(MetricNames.NUM_PUTS, 1, null /*metricContext*/);
+ return newPutMessage(operation, operationId);
+ case REMOVE:
+ metric.add(MetricNames.NUM_REMOVES, 1, null /*metricContext*/);
+ return newRemoveMessage(operation, operationId);
+ case UPDATE:
+ metric.add(MetricNames.NUM_UPDATES, 1, null /*metricContext*/);
+ return newUpdateMessage(operation, operationId);
+ default:
+ // typical end of feed
+ return null;
+ }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java
new file mode 100644
index 00000000000..a12cd1ec089
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java
@@ -0,0 +1,24 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.text.Utf8;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class ErrorHttpResponse extends HttpResponse {
+
+ private final String msg;
+
+ public ErrorHttpResponse(final int statusCode, final String msg) {
+ super(statusCode);
+ this.msg = msg;
+ }
+
+ @Override
+ public void render(OutputStream outputStream) throws IOException {
+ outputStream.write(Utf8.toBytes(msg));
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
new file mode 100644
index 00000000000..f99274d3f2b
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
@@ -0,0 +1,148 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.collections.Tuple2;
+import com.yahoo.container.handler.threadpool.ContainerThreadPool;
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
+import com.yahoo.container.jdisc.messagebus.SessionCache;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.metrics.simple.MetricReceiver;
+import com.yahoo.vespa.http.client.core.Headers;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Accept feeds from outside of the Vespa cluster.
+ *
+ * @author Steinar Knutsen
+ */
+public class FeedHandler extends ThreadedHttpRequestHandler {
+
+ protected final ReplyHandler feedReplyHandler;
+ private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(3));
+ private static final Pattern USER_AGENT_PATTERN = Pattern.compile("vespa-http-client \\((.+)\\)");
+ private final FeedHandlerV3 feedHandlerV3;
+ private final DocumentApiMetrics metricsHelper;
+
+ @Inject
+ public FeedHandler(ContainerThreadPool threadpool,
+ Metric metric,
+ DocumentTypeManager documentTypeManager,
+ SessionCache sessionCache,
+ MetricReceiver metricReceiver) {
+ super(threadpool.executor(), metric);
+ metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server");
+ feedHandlerV3 = new FeedHandlerV3(threadpool.executor(), metric, documentTypeManager, sessionCache, metricsHelper);
+ feedReplyHandler = new FeedReplyReader(metric, metricsHelper);
+ }
+
+ private Tuple2<HttpResponse, Integer> checkProtocolVersion(HttpRequest request) {
+ return doCheckProtocolVersion(request.getJDiscRequest().headers().get(Headers.VERSION));
+ }
+
+ static Tuple2<HttpResponse, Integer> doCheckProtocolVersion(List<String> clientSupportedVersions) {
+ List<String> washedClientVersions = splitVersions(clientSupportedVersions);
+
+ if (washedClientVersions == null || washedClientVersions.isEmpty()) {
+ return new Tuple2<>(new ErrorHttpResponse(
+ Headers.HTTP_NOT_ACCEPTABLE,
+ "Request did not contain " + Headers.VERSION
+ + "header. Server supports protocol versions "
+ + serverSupportedVersions), -1);
+ }
+
+ //select the highest version supported by both parties
+ //this could be extended when we support a gazillion versions - but right now: keep it simple.
+ int version;
+ if (washedClientVersions.contains("3")) {
+ version = 3;
+ } else {
+ return new Tuple2<>(new ErrorHttpResponse(
+ Headers.HTTP_NOT_ACCEPTABLE,
+ "Could not parse " + Headers.VERSION
+ + "header of request (values: " + washedClientVersions +
+ "). Server supports protocol versions "
+ + serverSupportedVersions), -1);
+ }
+ return new Tuple2<>(null, version);
+ }
+
+ private static List<String> splitVersions(List<String> clientSupportedVersions) {
+ List<String> splittedVersions = new ArrayList<>();
+ for (String v : clientSupportedVersions) {
+ if (v == null || v.trim().isEmpty()) {
+ continue;
+ }
+ if (!v.contains(",")) {
+ splittedVersions.add(v.trim());
+ continue;
+ }
+ for (String part : v.split(",")) {
+ part = part.trim();
+ if (!part.isEmpty()) {
+ splittedVersions.add(part);
+ }
+ }
+ }
+ return splittedVersions;
+ }
+
+ @Override
+ public HttpResponse handle(HttpRequest request) {
+ metricsHelper.reportHttpRequest(findClientVersion(request).orElse(null));
+ Tuple2<HttpResponse, Integer> protocolVersion = checkProtocolVersion(request);
+
+ if (protocolVersion.first != null) {
+ return protocolVersion.first;
+ }
+ return feedHandlerV3.handle(request);
+ }
+
+ @Override
+ protected void writeErrorResponseOnOverload(Request request, ResponseHandler responseHandler) {
+ int responseCode = request.headers().getFirst(Headers.SILENTUPGRADE) != null ? 299 : 429;
+ responseHandler.handleResponse(new Response(responseCode)).close(null);
+ }
+
+ private static Optional<String> findClientVersion(HttpRequest request) {
+ String versionHeader = request.getHeader(Headers.CLIENT_VERSION);
+ if (versionHeader != null) {
+ return Optional.of(versionHeader);
+ }
+ return Optional.ofNullable(request.getHeader("User-Agent"))
+ .map(USER_AGENT_PATTERN::matcher)
+ .filter(Matcher::matches)
+ .map(matcher -> matcher.group(1));
+ }
+
+ // Protected for testing
+ protected static InputStream unzipStreamIfNeeded(InputStream inputStream, HttpRequest httpRequest)
+ throws IOException {
+ String contentEncodingHeader = httpRequest.getHeader("content-encoding");
+ if ("gzip".equals(contentEncodingHeader)) {
+ return new GZIPInputStream(inputStream);
+ } else {
+ return inputStream;
+ }
+ }
+
+ @Override protected void destroy() { feedHandlerV3.destroy(); }
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
new file mode 100644
index 00000000000..c8828df6d54
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
@@ -0,0 +1,152 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
+import com.yahoo.container.jdisc.messagebus.SessionCache;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.vespa.http.client.core.Headers;
+import com.yahoo.yolean.Exceptions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This code is based on v2 code, however, in v3, one client has one ClientFeederV3 shared between all client threads.
+ * The new API has more logic for shutting down cleanly as the server is more likely to be upgraded.
+ * The code is restructured a bit.
+ *
+ * @author dybis
+ */
+public class FeedHandlerV3 extends ThreadedHttpRequestHandler {
+
+ private DocumentTypeManager docTypeManager;
+ private final Map<String, ClientFeederV3> clientFeederByClientId = new HashMap<>();
+ private final ScheduledThreadPoolExecutor cron;
+ private final SessionCache sessionCache;
+ protected final ReplyHandler feedReplyHandler;
+ private final Metric metric;
+ private final Object monitor = new Object();
+ private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName());
+
+ public FeedHandlerV3(Executor executor,
+ Metric metric,
+ DocumentTypeManager documentTypeManager,
+ SessionCache sessionCache,
+ DocumentApiMetrics metricsHelper) {
+ super(executor, metric);
+ docTypeManager = documentTypeManager;
+ this.sessionCache = sessionCache;
+ feedReplyHandler = new FeedReplyReader(metric, metricsHelper);
+ cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feed-handler-v3-janitor"));
+ cron.scheduleWithFixedDelay(this::removeOldClients, 3, 3, TimeUnit.SECONDS);
+ this.metric = metric;
+ }
+
+ public void injectDocumentManangerForTests(DocumentTypeManager docTypeManager) {
+ this.docTypeManager = docTypeManager;
+ }
+
+ // TODO: If this is set up to run without first invoking the old FeedHandler code, we should
+ // verify the version header first. This is done in the old code.
+ @Override
+ public HttpResponse handle(HttpRequest request) {
+ String clientId = clientId(request);
+ ClientFeederV3 clientFeederV3;
+ synchronized (monitor) {
+ if (! clientFeederByClientId.containsKey(clientId)) {
+ SourceSessionParams sourceSessionParams = sourceSessionParams(request);
+ clientFeederByClientId.put(clientId,
+ new ClientFeederV3(retainSource(sessionCache, sourceSessionParams),
+ new FeedReaderFactory(true), //TODO make error debugging configurable
+ docTypeManager,
+ clientId,
+ metric,
+ feedReplyHandler));
+ }
+ clientFeederV3 = clientFeederByClientId.get(clientId);
+ }
+ try {
+ return clientFeederV3.handleRequest(request);
+ } catch (UnknownClientException uce) {
+ String msg = Exceptions.toMessageString(uce);
+ log.log(Level.WARNING, msg);
+ return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.BAD_REQUEST, msg);
+ } catch (Exception e) {
+ String msg = "Could not initialize document parsing: " + Exceptions.toMessageString(e);
+ log.log(Level.WARNING, msg);
+ return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.INTERNAL_SERVER_ERROR, msg);
+ }
+ }
+
+ // SessionCache is final and no easy way to mock it so we need this to be able to do testing.
+ protected ReferencedResource<SharedSourceSession> retainSource(SessionCache sessionCache, SourceSessionParams params) {
+ return sessionCache.retainSource(params);
+ }
+
+ @Override
+ protected void destroy() {
+ // We are forking this to avoid that accidental de-referencing causes any random thread doing destruction.
+ // This caused a deadlock when the single Messenger thread in MessageBus was the last one referring this
+ // and started destructing something that required something only the messenger thread could provide.
+ Thread destroyer = new Thread(() -> {
+ cron.shutdown();
+ synchronized (monitor) {
+ for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) {
+ iterator.next().kill();
+ iterator.remove();
+ }
+ }
+ }, "feed-handler-v3-adhoc-destroyer");
+ destroyer.setDaemon(true);
+ destroyer.start();
+ }
+
+ private String clientId(HttpRequest request) {
+ String clientDictatedId = request.getHeader(Headers.CLIENT_ID);
+ if (clientDictatedId == null || clientDictatedId.isEmpty()) {
+ throw new IllegalArgumentException("Did not get any CLIENT_ID header (" + Headers.CLIENT_ID + ")");
+ }
+ return clientDictatedId;
+ }
+
+ private SourceSessionParams sourceSessionParams(HttpRequest request) {
+ SourceSessionParams params = new SourceSessionParams();
+ String timeout = request.getHeader(Headers.TIMEOUT);
+
+ if (timeout != null) {
+ try {
+ params.setTimeout(Double.parseDouble(timeout));
+ } catch (NumberFormatException e) {
+ // NOP
+ }
+ }
+ return params;
+ }
+
+ private void removeOldClients() {
+ synchronized (monitor) {
+ for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) {
+ ClientFeederV3 client = iterator.next();
+ if (client.timedOut()) {
+ client.kill();
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java
new file mode 100644
index 00000000000..069ccfd84f0
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java
@@ -0,0 +1,62 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.json.JsonFeedReader;
+import com.yahoo.text.Utf8;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespaxmlparser.FeedReader;
+import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
+
+import java.io.InputStream;
+
+/**
+ * Class for creating FeedReader based on dataFormat.
+ * @author dybis
+ */
+public class FeedReaderFactory {
+ private static final int MARK_READLIMIT = 200;
+
+ private final boolean debug;
+ public FeedReaderFactory(boolean debug) {
+ this.debug = debug;
+ }
+
+ /**
+ * Creates FeedReader
+ * @param inputStream source of feed data
+ * @param docTypeManager handles the parsing of the document
+ * @param dataFormat specifies the format
+ * @return a feedreader
+ */
+ public FeedReader createReader(
+ InputStream inputStream,
+ DocumentTypeManager docTypeManager,
+ FeedParams.DataFormat dataFormat) {
+ switch (dataFormat) {
+ case XML_UTF8:
+ byte [] peek = null;
+ int bytesPeeked = 0;
+ try {
+ if (debug && inputStream.markSupported()) {
+ peek = new byte[MARK_READLIMIT];
+ inputStream.mark(MARK_READLIMIT);
+ bytesPeeked = inputStream.read(peek);
+ inputStream.reset();
+ }
+ return new VespaXMLFeedReader(inputStream, docTypeManager);
+ } catch (Exception e) {
+ if (bytesPeeked > 0) {
+ throw new RuntimeException("Could not create VespaXMLFeedReader. First characters are: '" + Utf8.toString(peek, 0, bytesPeeked) + "'", e);
+ } else {
+ throw new RuntimeException("Could not create VespaXMLFeedReader.", e);
+ }
+ }
+ case JSON_UTF8:
+ return new JsonFeedReader(inputStream, docTypeManager);
+ default:
+ throw new IllegalStateException("Can not create feed reader for format: " + dataFormat);
+ }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java
new file mode 100644
index 00000000000..2fbb80d9fcc
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java
@@ -0,0 +1,94 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentReply;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.documentapi.metrics.DocumentOperationStatus;
+import com.yahoo.documentapi.metrics.DocumentOperationType;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.vespa.http.client.core.ErrorCode;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static java.util.function.Predicate.not;
+
+/**
+ * Catch message bus replies and make the available to a given session.
+ *
+ * @author Steinar Knutsen
+ */
+public class FeedReplyReader implements ReplyHandler {
+
+ private static final Logger log = Logger.getLogger(FeedReplyReader.class.getName());
+ private final Metric metric;
+ private final DocumentApiMetrics metricsHelper;
+ private final Metric.Context testAndSetMetricCtx;
+
+ public FeedReplyReader(Metric metric, DocumentApiMetrics metricsHelper) {
+ this.metric = metric;
+ this.metricsHelper = metricsHelper;
+ this.testAndSetMetricCtx = metric.createContext(Map.of("operationType", "testAndSet"));
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ Object o = reply.getContext();
+ if (!(o instanceof ReplyContext)) {
+ return;
+ }
+ ReplyContext context = (ReplyContext) o;
+ final double latencyInSeconds = (System.currentTimeMillis() - context.creationTime) / 1000.0d;
+ metric.set(MetricNames.LATENCY, latencyInSeconds, null);
+
+ DocumentOperationType type = DocumentOperationType.fromMessage(reply.getMessage());
+ boolean conditionMet = conditionMet(reply);
+ if (reply.hasErrors() && conditionMet) {
+ DocumentOperationStatus status = DocumentOperationStatus.fromMessageBusErrorCodes(reply.getErrorCodes());
+ metricsHelper.reportFailure(type, status);
+ metric.add(MetricNames.FAILED, 1, null);
+ enqueue(context, reply.getError(0).getMessage(), ErrorCode.ERROR, false, reply.getTrace());
+ } else {
+ metricsHelper.reportSuccessful(type, latencyInSeconds);
+ metric.add(MetricNames.SUCCEEDED, 1, null);
+ if ( ! conditionMet)
+ metric.add(MetricNames.CONDITION_NOT_MET, 1, testAndSetMetricCtx);
+ if ( ! updateNotFound(reply))
+ metric.add(MetricNames.NOT_FOUND, 1, null);
+ enqueue(context, "Document processed.", ErrorCode.OK, !conditionMet, reply.getTrace());
+ }
+ }
+
+ private static boolean conditionMet(Reply reply) {
+ return ! reply.hasErrors() || reply.getError(0).getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED;
+ }
+
+ private static boolean updateNotFound(Reply reply) {
+ return reply instanceof UpdateDocumentReply
+ && ! ((UpdateDocumentReply) reply).wasFound()
+ && reply.getMessage() instanceof UpdateDocumentMessage
+ && ((UpdateDocumentMessage) reply.getMessage()).getDocumentUpdate() != null
+ && ! ((UpdateDocumentMessage) reply.getMessage()).getDocumentUpdate().getCreateIfNonExistent();
+ }
+
+ private void enqueue(ReplyContext context, String message, ErrorCode status, boolean isConditionNotMet, Trace trace) {
+ try {
+ String traceMessage = (trace != null && trace.getLevel() > 0) ? trace.toString() : "";
+
+ context.feedReplies.put(new OperationStatus(message, context.docId, status, isConditionNotMet, traceMessage));
+ } catch (InterruptedException e) {
+ log.log(Level.WARNING,
+ "Interrupted while enqueueing result from putting document with id: " + context.docId);
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java
new file mode 100644
index 00000000000..3e2a4a8795f
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java
@@ -0,0 +1,88 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.vespa.http.client.core.Headers;
+import com.yahoo.vespa.http.client.core.ErrorCode;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Reads feed responses from a queue and renders them continuously to the
+ * feeder.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @since 5.1
+ */
+public class FeedResponse extends HttpResponse {
+
+ BlockingQueue<OperationStatus> operations;
+
+ public FeedResponse(
+ int status,
+ BlockingQueue<OperationStatus> operations,
+ int protocolVersion,
+ String sessionId) {
+ super(status);
+ this.operations = operations;
+ headers().add(Headers.SESSION_ID, sessionId);
+ headers().add(Headers.VERSION, Integer.toString(protocolVersion));
+ }
+
+ // This is used by the V3 protocol.
+ public FeedResponse(
+ int status,
+ BlockingQueue<OperationStatus> operations,
+ int protocolVersion,
+ String sessionId,
+ int outstandingClientOperations,
+ String hostName) {
+ super(status);
+ this.operations = operations;
+ headers().add(Headers.SESSION_ID, sessionId);
+ headers().add(Headers.VERSION, Integer.toString(protocolVersion));
+ headers().add(Headers.OUTSTANDING_REQUESTS, Integer.toString(outstandingClientOperations));
+ headers().add(Headers.HOSTNAME, hostName);
+ }
+
+ @Override
+ public void render(OutputStream output) throws IOException {
+ int i = 0;
+ OperationStatus status;
+ try {
+ status = operations.take();
+ while (status.errorCode != ErrorCode.END_OF_FEED) {
+ output.write(toBytes(status.render()));
+ if (++i % 5 == 0) {
+ output.flush();
+ }
+ status = operations.take();
+ }
+ } catch (InterruptedException e) {
+ output.flush();
+ }
+ }
+
+ private byte[] toBytes(String s) {
+ byte[] b = new byte[s.length()];
+ for (int i = 0; i < b.length; ++i) {
+ b[i] = (byte) s.charAt(i); // renderSingleStatus ensures ASCII only
+ }
+ return b;
+ }
+
+ @Override
+ public String getContentType() {
+ return "text/plain";
+ }
+
+ @Override
+ public String getCharacterEncoding() {
+ return StandardCharsets.US_ASCII.name();
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
new file mode 100644
index 00000000000..725349f6ebe
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
@@ -0,0 +1,33 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
+import com.yahoo.vespa.http.client.core.Headers;
+
+import java.util.Optional;
+
+/**
+ * Wrapper for the feed feederSettings read from HTTP request.
+ *
+ * @author Steinar Knutsen
+ */
+public class FeederSettings {
+
+ private static final Route DEFAULT_ROUTE = Route.parse("default");
+ public final boolean drain; // TODO: Implement drain=true
+ public final Route route;
+ public final DataFormat dataFormat;
+ public final String priority;
+ public final Integer traceLevel;
+
+ public FeederSettings(HttpRequest request) {
+ this.drain = Optional.ofNullable(request.getHeader(Headers.DRAIN)).map(Boolean::parseBoolean).orElse(false);
+ this.route = Optional.ofNullable(request.getHeader(Headers.ROUTE)).map(Route::parse).orElse(DEFAULT_ROUTE);
+ this.dataFormat = Optional.ofNullable(request.getHeader(Headers.DATA_FORMAT)).map(DataFormat::valueOf).orElse(DataFormat.JSON_UTF8);
+ this.priority = request.getHeader(Headers.PRIORITY);
+ this.traceLevel = Optional.ofNullable(request.getHeader(Headers.TRACE_LEVEL)).map(Integer::valueOf).orElse(null);
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java
index c2d9f0b292e..a5987f2398e 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.metrics;
+package com.yahoo.vespa.http.server;
/**
* Place to store the metric names so where the metrics are logged can be found
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java
new file mode 100644
index 00000000000..aa2651595ef
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java
@@ -0,0 +1,25 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.vespa.http.client.core.OperationStatus;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Mapping between document ID and client session.
+ *
+ * @author Steinar Knutsen
+ */
+public class ReplyContext {
+
+ public final String docId;
+ public final BlockingQueue<OperationStatus> feedReplies;
+ public final long creationTime;
+
+ public ReplyContext(String docId, BlockingQueue<OperationStatus> feedReplies) {
+ this.docId = docId;
+ this.feedReplies = feedReplies;
+ this.creationTime = System.currentTimeMillis();
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java
new file mode 100644
index 00000000000..4ddc430b35f
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java
@@ -0,0 +1,85 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.vespa.http.client.core.Encoder;
+import com.yahoo.vespa.http.server.util.ByteLimitedInputStream;
+import com.yahoo.vespaxmlparser.FeedOperation;
+import com.yahoo.vespaxmlparser.FeedReader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.logging.Logger;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * This code is based on v2 code, but restructured so stream reading code is in one dedicated class.
+ * @author dybis
+ */
+public class StreamReaderV3 {
+
+ protected static final Logger log = Logger.getLogger(StreamReaderV3.class.getName());
+
+ private final FeedReaderFactory feedReaderFactory;
+ private final DocumentTypeManager docTypeManager;
+
+ public StreamReaderV3(FeedReaderFactory feedReaderFactory, DocumentTypeManager docTypeManager) {
+ this.feedReaderFactory = feedReaderFactory;
+ this.docTypeManager = docTypeManager;
+ }
+
+ public FeedOperation getNextOperation(InputStream requestInputStream, FeederSettings settings) throws Exception {
+ FeedOperation op = null;
+
+ int length = readByteLength(requestInputStream);
+
+ try (InputStream limitedInputStream = new ByteLimitedInputStream(requestInputStream, length)){
+ FeedReader reader = feedReaderFactory.createReader(limitedInputStream, docTypeManager, settings.dataFormat);
+ op = reader.read();
+ }
+ return op;
+ }
+
+ public Optional<String> getNextOperationId(InputStream requestInputStream) throws IOException {
+ StringBuilder idBuf = new StringBuilder(100);
+ int c;
+ while ((c = requestInputStream.read()) != -1) {
+ if (c == 32) {
+ break;
+ }
+ idBuf.append((char) c); //it's ASCII
+ }
+ if (idBuf.length() == 0) {
+ return Optional.empty();
+ }
+ return Optional.of(Encoder.decode(idBuf.toString(), new StringBuilder(idBuf.length())).toString());
+ }
+
+ private int readByteLength(InputStream requestInputStream) throws IOException {
+ StringBuilder lenBuf = new StringBuilder(8);
+ int c;
+ while ((c = requestInputStream.read()) != -1) {
+ if (c == 10) {
+ break;
+ }
+ lenBuf.append((char) c); //it's ASCII
+ }
+ if (lenBuf.length() == 0) {
+ throw new IllegalStateException("Operation length missing.");
+ }
+ return Integer.valueOf(lenBuf.toString(), 16);
+ }
+
+ public static InputStream unzipStreamIfNeeded(final HttpRequest httpRequest)
+ throws IOException {
+ final String contentEncodingHeader = httpRequest.getHeader("content-encoding");
+ if ("gzip".equals(contentEncodingHeader)) {
+ return new GZIPInputStream(httpRequest.getData());
+ } else {
+ return httpRequest.getData();
+ }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java
new file mode 100644
index 00000000000..5324b86a98a
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java
@@ -0,0 +1,14 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.5.0
+ */
+public class UnknownClientException extends RuntimeException {
+
+ public UnknownClientException(String message) {
+ super(message);
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java
new file mode 100644
index 00000000000..ea01137d9af
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java
@@ -0,0 +1,7 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * Server side of programmatic API for feeding into Vespa from outside of the
+ * clusters. Not a public API, not meant for direct use.
+ */
+@com.yahoo.api.annotations.PackageMarker
+package com.yahoo.vespa.http.server;
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java
new file mode 100644
index 00000000000..270ebe7796b
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java
@@ -0,0 +1,99 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * @author Einar M R Rosenvinge
+ *
+ * @since 5.1.23
+ */
+public class ByteLimitedInputStream extends InputStream {
+
+ private final InputStream wrappedStream;
+ private int remaining;
+ private int remainingWhenMarked;
+
+ public ByteLimitedInputStream(InputStream wrappedStream, int limit) {
+ this.wrappedStream = wrappedStream;
+ if (limit < 0) {
+ throw new IllegalArgumentException("limit cannot be 0");
+ }
+ this.remaining = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining <= 0) {
+ return -1;
+ }
+ int retval = wrappedStream.read();
+ if (retval < 0) {
+ remaining = 0;
+ } else {
+ --remaining;
+ }
+ return retval;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ if (remaining <= 0) {
+ return -1;
+ }
+
+ int bytesToRead = Math.min(remaining, len);
+ int retval = wrappedStream.read(b, off, bytesToRead);
+
+ if (retval < 0) {
+ //end of underlying stream was reached, and nothing was read.
+ remaining = 0;
+ } else {
+ remaining -= retval;
+ }
+ return retval;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return remaining;
+ }
+
+ @Override
+ public void close() throws IOException {
+ //we will never close the underlying stream
+ if (remaining <= 0) {
+ return;
+ }
+ while (remaining > 0) {
+ skip(remaining);
+ }
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ wrappedStream.mark(readlimit);
+ remainingWhenMarked = remaining;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ wrappedStream.reset();
+ remaining = remainingWhenMarked;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return wrappedStream.markSupported();
+ }
+
+}