aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2022-06-07 11:59:03 +0200
committergjoranv <gv@verizonmedia.com>2022-06-08 11:45:30 +0200
commit033d6494edc17b554ab841c3f5ea70bc5f8925de (patch)
treefc24f1564b91ee6e7009f4a92adb0981ffb92924 /vespaclient-container-plugin
parent72e82db1739fd88a78aba7d55c7ee4ef7f953863 (diff)
Revert "Remove http client use"
This reverts commit a7fd13540d34de50ed3526576c62eebc476a1e1c.
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/pom.xml36
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java3
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java287
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java40
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java83
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java24
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java148
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java152
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java62
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java94
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java88
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java33
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java (renamed from vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java)2
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java25
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java85
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java14
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java7
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java99
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/CollectingMetric.java38
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/DummyMetric.java31
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerCompressionTest.java69
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerTest.java45
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java152
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java40
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MetaStream.java39
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockNetwork.java69
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockReply.java35
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java118
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java106
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java32
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockReader.java69
31 files changed, 2123 insertions, 2 deletions
diff --git a/vespaclient-container-plugin/pom.xml b/vespaclient-container-plugin/pom.xml
index d7b36e39c94..c960c2cca44 100644
--- a/vespaclient-container-plugin/pom.xml
+++ b/vespaclient-container-plugin/pom.xml
@@ -38,6 +38,42 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>vespa-http-client</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <!-- Exclude artifacts that are provided by Jdisc container -->
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>security-utils</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index ed068c77e11..8c2e39d595e 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -20,6 +20,7 @@ import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.document.fieldset.AllFields;
import com.yahoo.document.fieldset.DocIdOnly;
import com.yahoo.document.fieldset.DocumentOnly;
import com.yahoo.document.idstring.IdIdString;
@@ -45,7 +46,6 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.documentapi.metrics.DocumentOperationStatus;
-import com.yahoo.documentapi.metrics.MetricNames;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
@@ -68,6 +68,7 @@ import com.yahoo.restapi.Path;
import com.yahoo.search.query.ParameterParser;
import com.yahoo.text.Text;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.vespa.http.server.MetricNames;
import com.yahoo.yolean.Exceptions;
import com.yahoo.yolean.Exceptions.RunnableThrowingIOException;
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
new file mode 100644
index 00000000000..875ff3e5bf0
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
@@ -0,0 +1,287 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Result;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.net.HostName;
+import com.yahoo.vespa.http.client.core.ErrorCode;
+import com.yahoo.vespa.http.client.core.Headers;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+import com.yahoo.vespaxmlparser.FeedOperation;
+import com.yahoo.yolean.Exceptions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An instance of this class handles all requests from one client using VespaHttpClient.
+ *
+ * The implementation is based on the code from V2, but the object model is rewritten to simplify the logic and
+ * avoid using a threadpool that has no effect with all the extra that comes with it. V2 has one instance per thread
+ * on the client, while this is one instance for all threads.
+ *
+ * @author dybis
+ */
+class ClientFeederV3 {
+
+ protected static final Logger log = Logger.getLogger(ClientFeederV3.class.getName());
+ // This is for all clients on this gateway, for load balancing from client.
+ private final static AtomicInteger outstandingOperations = new AtomicInteger(0);
+ private final BlockingQueue<OperationStatus> feedReplies = new LinkedBlockingQueue<>();
+ private final ReferencedResource<SharedSourceSession> sourceSession;
+ private final String clientId;
+ private final ReplyHandler feedReplyHandler;
+ private final Metric metric;
+ private Instant prevOpsPerSecTime = Instant.now();
+ private double operationsForOpsPerSec = 0d;
+ private final Object monitor = new Object();
+ private final StreamReaderV3 streamReaderV3;
+ private final AtomicInteger ongoingRequests = new AtomicInteger(0);
+ private final String hostName;
+
+ ClientFeederV3(
+ ReferencedResource<SharedSourceSession> sourceSession,
+ FeedReaderFactory feedReaderFactory,
+ DocumentTypeManager docTypeManager,
+ String clientId,
+ Metric metric,
+ ReplyHandler feedReplyHandler) {
+ this.sourceSession = sourceSession;
+ this.clientId = clientId;
+ this.feedReplyHandler = feedReplyHandler;
+ this.metric = metric;
+ this.streamReaderV3 = new StreamReaderV3(feedReaderFactory, docTypeManager);
+ this.hostName = HostName.getLocalhost();
+ }
+
+ boolean timedOut() {
+ synchronized (monitor) {
+ return Instant.now().isAfter(prevOpsPerSecTime.plusSeconds(6000)) && ongoingRequests.get() == 0;
+ }
+ }
+
+ void kill() {
+ try (ResourceReference ignored = sourceSession.getReference()) {
+ // No new requests should be sent to this object, but there can be old one, even though this is very unlikely.
+ while (ongoingRequests.get() > 0) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Failed to close reference to source session", e);
+ }
+ }
+
+ private void transferPreviousRepliesToResponse(BlockingQueue<OperationStatus> operations) throws InterruptedException {
+ OperationStatus status = feedReplies.poll();
+ while (status != null) {
+ outstandingOperations.decrementAndGet();
+ operations.put(status);
+ status = feedReplies.poll();
+ }
+ }
+
+ HttpResponse handleRequest(HttpRequest request) throws IOException {
+ ongoingRequests.incrementAndGet();
+ try {
+ FeederSettings feederSettings = new FeederSettings(request);
+ InputStream inputStream = StreamReaderV3.unzipStreamIfNeeded(request);
+ BlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<>();
+ try {
+ feed(feederSettings, inputStream, replies);
+ synchronized (monitor) {
+ // Handshake requests do not have DATA_FORMAT, we do not want to give responses to
+ // handshakes as it won't be processed by the client.
+ if (request.getJDiscRequest().headers().get(Headers.DATA_FORMAT) != null) {
+ transferPreviousRepliesToResponse(replies);
+ }
+ }
+ } catch (InterruptedException e) {
+ log.log(Level.FINE, e, () -> "Feed handler was interrupted: " + e.getMessage());
+ // NOP, just terminate
+ } catch (Throwable e) {
+ log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e);
+ } finally {
+ replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null));
+ }
+ return new FeedResponse(200, replies, 3, clientId, outstandingOperations.get(), hostName);
+ } finally {
+ ongoingRequests.decrementAndGet();
+ }
+ }
+
+ private Optional<DocumentOperationMessageV3> pullMessageFromRequest(FeederSettings settings,
+ InputStream requestInputStream,
+ BlockingQueue<OperationStatus> repliesFromOldMessages) {
+ while (true) {
+ Optional<String> operationId;
+ try {
+ operationId = streamReaderV3.getNextOperationId(requestInputStream);
+ if (operationId.isEmpty()) return Optional.empty();
+ } catch (IOException ioe) {
+ log.log(Level.FINE, () -> Exceptions.toMessageString(ioe));
+ return Optional.empty();
+ }
+
+ try {
+ DocumentOperationMessageV3 message = getNextMessage(operationId.get(), requestInputStream, settings);
+ if (message != null)
+ setRoute(message, settings);
+ return Optional.ofNullable(message);
+ } catch (Exception e) {
+ log.log(Level.WARNING, () -> Exceptions.toMessageString(e));
+ metric.add(MetricNames.PARSE_ERROR, 1, null);
+
+ repliesFromOldMessages.add(new OperationStatus(Exceptions.toMessageString(e),
+ operationId.get(),
+ ErrorCode.ERROR,
+ false,
+ ""));
+ }
+ }
+ }
+
+ private Result sendMessage(DocumentOperationMessageV3 msg) throws InterruptedException {
+ msg.getMessage().pushHandler(feedReplyHandler);
+ return sourceSession.getResource().sendMessageBlocking(msg.getMessage());
+ }
+
+ private void feed(FeederSettings settings,
+ InputStream requestInputStream,
+ BlockingQueue<OperationStatus> repliesFromOldMessages) throws InterruptedException {
+ while (true) {
+ Optional<DocumentOperationMessageV3> message = pullMessageFromRequest(settings,
+ requestInputStream,
+ repliesFromOldMessages);
+
+ if (message.isEmpty()) break;
+ setMessageParameters(message.get(), settings);
+
+ Result result;
+ try {
+ result = sendMessage(message.get());
+
+ } catch (RuntimeException e) {
+ repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
+ Exceptions.toMessageString(e),
+ ErrorCode.ERROR,
+ message.get().getMessage()));
+ continue;
+ }
+
+ if (result.isAccepted()) {
+ outstandingOperations.incrementAndGet();
+ updateOpsPerSec();
+ log(Level.FINE, "Sent message successfully, document id: ", message.get().getOperationId());
+ } else if (!result.getError().isFatal()) {
+ repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
+ result.getError().getMessage(),
+ ErrorCode.TRANSIENT_ERROR,
+ message.get().getMessage()));
+ } else {
+ repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
+ result.getError().getMessage(),
+ ErrorCode.ERROR,
+ message.get().getMessage()));
+ }
+ }
+ }
+
+ private OperationStatus createOperationStatus(String id, String message, ErrorCode code, Message msg) {
+ String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0
+ ? msg.getTrace().toString()
+ : "";
+ return new OperationStatus(message, id, code, false, traceMessage);
+ }
+
+ // protected for mocking
+ /** Returns the next message in the stream, or null if none */
+ protected DocumentOperationMessageV3 getNextMessage(String operationId,
+ InputStream requestInputStream,
+ FeederSettings settings) throws Exception {
+ FeedOperation operation = streamReaderV3.getNextOperation(requestInputStream, settings);
+
+ // This is a bit hard to set up while testing, so we accept that things are not perfect.
+ if (sourceSession.getResource().session() != null) {
+ metric.set(
+ MetricNames.PENDING,
+ Double.valueOf(sourceSession.getResource().session().getPendingCount()),
+ null);
+ }
+
+ DocumentOperationMessageV3 message = DocumentOperationMessageV3.create(operation, operationId, metric);
+ if (message == null) {
+ // typical end of feed
+ return null;
+ }
+ metric.add(MetricNames.NUM_OPERATIONS, 1, null /*metricContext*/);
+ log(Level.FINE, "Successfully deserialized document id: ", message.getOperationId());
+ return message;
+ }
+
+ private void setMessageParameters(DocumentOperationMessageV3 msg, FeederSettings settings) {
+ msg.getMessage().setContext(new ReplyContext(msg.getOperationId(), feedReplies));
+ if (settings.traceLevel != null) {
+ msg.getMessage().getTrace().setLevel(settings.traceLevel);
+ }
+ if (settings.priority != null) {
+ try {
+ DocumentProtocol.Priority priority = DocumentProtocol.Priority.valueOf(settings.priority);
+ }
+ catch (IllegalArgumentException i) {
+ log.severe(i.getMessage());
+ }
+ }
+ }
+
+ private void setRoute(DocumentOperationMessageV3 msg, FeederSettings settings) {
+ if (settings.route != null) {
+ msg.getMessage().setRoute(settings.route);
+ }
+ }
+
+ protected final void log(Level level, Object... msgParts) {
+ if (!log.isLoggable(level)) return;
+
+ StringBuilder s = new StringBuilder();
+ for (Object part : msgParts)
+ s.append(part.toString());
+ log.log(level, s.toString());
+ }
+
+ private void updateOpsPerSec() {
+ Instant now = Instant.now();
+ synchronized (monitor) {
+ if (now.plusSeconds(1).isAfter(prevOpsPerSecTime)) {
+ Duration duration = Duration.between(now, prevOpsPerSecTime);
+ double opsPerSec = operationsForOpsPerSec / (duration.toMillis() / 1000.);
+ metric.set(MetricNames.OPERATIONS_PER_SEC, opsPerSec, null /*metricContext*/);
+ operationsForOpsPerSec = 1.0d;
+ prevOpsPerSecTime = now;
+ } else {
+ operationsForOpsPerSec += 1.0d;
+ }
+ }
+ }
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java
new file mode 100644
index 00000000000..13a12f707d9
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java
@@ -0,0 +1,40 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The state of a client session, used to save replies when client disconnects.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class ClientState {
+
+ public final int pending;
+ public final long creationTime;
+ public final BlockingQueue<OperationStatus> feedReplies;
+ public final ReferencedResource<SharedSourceSession> sourceSession;
+ public final Metric.Context metricContext;
+
+ public final long prevOpsPerSecTime; // previous measurement time of OPS
+ public final double operationsForOpsPerSec;
+
+ public ClientState(int pending, BlockingQueue<OperationStatus> feedReplies,
+ ReferencedResource<SharedSourceSession> sourceSession, Metric.Context metricContext,
+ long prevOpsPerSecTime, double operationsForOpsPerSec) {
+ super();
+ this.pending = pending;
+ this.feedReplies = feedReplies;
+ this.sourceSession = sourceSession;
+ this.metricContext = metricContext;
+ creationTime = System.currentTimeMillis();
+ this.prevOpsPerSecTime = prevOpsPerSecTime;
+ this.operationsForOpsPerSec = operationsForOpsPerSec;
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java
new file mode 100644
index 00000000000..25bf5815907
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java
@@ -0,0 +1,83 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.messagebus.Message;
+import com.yahoo.vespaxmlparser.FeedOperation;
+
+/**
+ * Keeps an operation with its message.
+ *
+ * This implementation is based on V2, but the code is restructured.
+ *
+ * @author dybis
+ */
+class DocumentOperationMessageV3 {
+
+ private final String operationId;
+ private final Message message;
+
+ private DocumentOperationMessageV3(String operationId, Message message) {
+ this.operationId = operationId;
+ this.message = message;
+ }
+
+ Message getMessage() {
+ return message;
+ }
+
+ String getOperationId() {
+ return operationId;
+ }
+
+ private static DocumentOperationMessageV3 newUpdateMessage(FeedOperation op, String operationId) {
+ DocumentUpdate update = op.getDocumentUpdate();
+ update.setCondition(op.getCondition());
+ Message msg = new UpdateDocumentMessage(update);
+
+ String id = (operationId == null) ? update.getId().toString() : operationId;
+ return new DocumentOperationMessageV3(id, msg);
+ }
+
+ static DocumentOperationMessageV3 newRemoveMessage(FeedOperation op, String operationId) {
+ DocumentRemove remove = new DocumentRemove(op.getRemove());
+ remove.setCondition(op.getCondition());
+ Message msg = new RemoveDocumentMessage(remove);
+
+ String id = (operationId == null) ? remove.getId().toString() : operationId;
+ return new DocumentOperationMessageV3(id, msg);
+ }
+
+ private static DocumentOperationMessageV3 newPutMessage(FeedOperation op, String operationId) {
+ DocumentPut put = new DocumentPut(op.getDocument());
+ put.setCondition(op.getCondition());
+ Message msg = new PutDocumentMessage(put);
+
+ String id = (operationId == null) ? put.getId().toString() : operationId;
+ return new DocumentOperationMessageV3(id, msg);
+ }
+
+ static DocumentOperationMessageV3 create(FeedOperation operation, String operationId, Metric metric) {
+ switch (operation.getType()) {
+ case DOCUMENT:
+ metric.add(MetricNames.NUM_PUTS, 1, null /*metricContext*/);
+ return newPutMessage(operation, operationId);
+ case REMOVE:
+ metric.add(MetricNames.NUM_REMOVES, 1, null /*metricContext*/);
+ return newRemoveMessage(operation, operationId);
+ case UPDATE:
+ metric.add(MetricNames.NUM_UPDATES, 1, null /*metricContext*/);
+ return newUpdateMessage(operation, operationId);
+ default:
+ // typical end of feed
+ return null;
+ }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java
new file mode 100644
index 00000000000..a12cd1ec089
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java
@@ -0,0 +1,24 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.text.Utf8;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class ErrorHttpResponse extends HttpResponse {
+
+ private final String msg;
+
+ public ErrorHttpResponse(final int statusCode, final String msg) {
+ super(statusCode);
+ this.msg = msg;
+ }
+
+ @Override
+ public void render(OutputStream outputStream) throws IOException {
+ outputStream.write(Utf8.toBytes(msg));
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
new file mode 100644
index 00000000000..f99274d3f2b
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
@@ -0,0 +1,148 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.collections.Tuple2;
+import com.yahoo.container.handler.threadpool.ContainerThreadPool;
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
+import com.yahoo.container.jdisc.messagebus.SessionCache;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.metrics.simple.MetricReceiver;
+import com.yahoo.vespa.http.client.core.Headers;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Accept feeds from outside of the Vespa cluster.
+ *
+ * @author Steinar Knutsen
+ */
+public class FeedHandler extends ThreadedHttpRequestHandler {
+
+ protected final ReplyHandler feedReplyHandler;
+ private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(3));
+ private static final Pattern USER_AGENT_PATTERN = Pattern.compile("vespa-http-client \\((.+)\\)");
+ private final FeedHandlerV3 feedHandlerV3;
+ private final DocumentApiMetrics metricsHelper;
+
+ @Inject
+ public FeedHandler(ContainerThreadPool threadpool,
+ Metric metric,
+ DocumentTypeManager documentTypeManager,
+ SessionCache sessionCache,
+ MetricReceiver metricReceiver) {
+ super(threadpool.executor(), metric);
+ metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server");
+ feedHandlerV3 = new FeedHandlerV3(threadpool.executor(), metric, documentTypeManager, sessionCache, metricsHelper);
+ feedReplyHandler = new FeedReplyReader(metric, metricsHelper);
+ }
+
+ private Tuple2<HttpResponse, Integer> checkProtocolVersion(HttpRequest request) {
+ return doCheckProtocolVersion(request.getJDiscRequest().headers().get(Headers.VERSION));
+ }
+
+ static Tuple2<HttpResponse, Integer> doCheckProtocolVersion(List<String> clientSupportedVersions) {
+ List<String> washedClientVersions = splitVersions(clientSupportedVersions);
+
+ if (washedClientVersions == null || washedClientVersions.isEmpty()) {
+ return new Tuple2<>(new ErrorHttpResponse(
+ Headers.HTTP_NOT_ACCEPTABLE,
+ "Request did not contain " + Headers.VERSION
+ + "header. Server supports protocol versions "
+ + serverSupportedVersions), -1);
+ }
+
+ //select the highest version supported by both parties
+ //this could be extended when we support a gazillion versions - but right now: keep it simple.
+ int version;
+ if (washedClientVersions.contains("3")) {
+ version = 3;
+ } else {
+ return new Tuple2<>(new ErrorHttpResponse(
+ Headers.HTTP_NOT_ACCEPTABLE,
+ "Could not parse " + Headers.VERSION
+ + "header of request (values: " + washedClientVersions +
+ "). Server supports protocol versions "
+ + serverSupportedVersions), -1);
+ }
+ return new Tuple2<>(null, version);
+ }
+
+ private static List<String> splitVersions(List<String> clientSupportedVersions) {
+ List<String> splittedVersions = new ArrayList<>();
+ for (String v : clientSupportedVersions) {
+ if (v == null || v.trim().isEmpty()) {
+ continue;
+ }
+ if (!v.contains(",")) {
+ splittedVersions.add(v.trim());
+ continue;
+ }
+ for (String part : v.split(",")) {
+ part = part.trim();
+ if (!part.isEmpty()) {
+ splittedVersions.add(part);
+ }
+ }
+ }
+ return splittedVersions;
+ }
+
+ @Override
+ public HttpResponse handle(HttpRequest request) {
+ metricsHelper.reportHttpRequest(findClientVersion(request).orElse(null));
+ Tuple2<HttpResponse, Integer> protocolVersion = checkProtocolVersion(request);
+
+ if (protocolVersion.first != null) {
+ return protocolVersion.first;
+ }
+ return feedHandlerV3.handle(request);
+ }
+
+ @Override
+ protected void writeErrorResponseOnOverload(Request request, ResponseHandler responseHandler) {
+ int responseCode = request.headers().getFirst(Headers.SILENTUPGRADE) != null ? 299 : 429;
+ responseHandler.handleResponse(new Response(responseCode)).close(null);
+ }
+
+ private static Optional<String> findClientVersion(HttpRequest request) {
+ String versionHeader = request.getHeader(Headers.CLIENT_VERSION);
+ if (versionHeader != null) {
+ return Optional.of(versionHeader);
+ }
+ return Optional.ofNullable(request.getHeader("User-Agent"))
+ .map(USER_AGENT_PATTERN::matcher)
+ .filter(Matcher::matches)
+ .map(matcher -> matcher.group(1));
+ }
+
+ // Protected for testing
+ protected static InputStream unzipStreamIfNeeded(InputStream inputStream, HttpRequest httpRequest)
+ throws IOException {
+ String contentEncodingHeader = httpRequest.getHeader("content-encoding");
+ if ("gzip".equals(contentEncodingHeader)) {
+ return new GZIPInputStream(inputStream);
+ } else {
+ return inputStream;
+ }
+ }
+
+ @Override protected void destroy() { feedHandlerV3.destroy(); }
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
new file mode 100644
index 00000000000..c8828df6d54
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
@@ -0,0 +1,152 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
+import com.yahoo.container.jdisc.messagebus.SessionCache;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.vespa.http.client.core.Headers;
+import com.yahoo.yolean.Exceptions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This code is based on v2 code, however, in v3, one client has one ClientFeederV3 shared between all client threads.
+ * The new API has more logic for shutting down cleanly as the server is more likely to be upgraded.
+ * The code is restructured a bit.
+ *
+ * @author dybis
+ */
+public class FeedHandlerV3 extends ThreadedHttpRequestHandler {
+
+ private DocumentTypeManager docTypeManager;
+ private final Map<String, ClientFeederV3> clientFeederByClientId = new HashMap<>();
+ private final ScheduledThreadPoolExecutor cron;
+ private final SessionCache sessionCache;
+ protected final ReplyHandler feedReplyHandler;
+ private final Metric metric;
+ private final Object monitor = new Object();
+ private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName());
+
+ public FeedHandlerV3(Executor executor,
+ Metric metric,
+ DocumentTypeManager documentTypeManager,
+ SessionCache sessionCache,
+ DocumentApiMetrics metricsHelper) {
+ super(executor, metric);
+ docTypeManager = documentTypeManager;
+ this.sessionCache = sessionCache;
+ feedReplyHandler = new FeedReplyReader(metric, metricsHelper);
+ cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feed-handler-v3-janitor"));
+ cron.scheduleWithFixedDelay(this::removeOldClients, 3, 3, TimeUnit.SECONDS);
+ this.metric = metric;
+ }
+
+ public void injectDocumentManangerForTests(DocumentTypeManager docTypeManager) {
+ this.docTypeManager = docTypeManager;
+ }
+
+ // TODO: If this is set up to run without first invoking the old FeedHandler code, we should
+ // verify the version header first. This is done in the old code.
+ @Override
+ public HttpResponse handle(HttpRequest request) {
+ String clientId = clientId(request);
+ ClientFeederV3 clientFeederV3;
+ synchronized (monitor) {
+ if (! clientFeederByClientId.containsKey(clientId)) {
+ SourceSessionParams sourceSessionParams = sourceSessionParams(request);
+ clientFeederByClientId.put(clientId,
+ new ClientFeederV3(retainSource(sessionCache, sourceSessionParams),
+ new FeedReaderFactory(true), //TODO make error debugging configurable
+ docTypeManager,
+ clientId,
+ metric,
+ feedReplyHandler));
+ }
+ clientFeederV3 = clientFeederByClientId.get(clientId);
+ }
+ try {
+ return clientFeederV3.handleRequest(request);
+ } catch (UnknownClientException uce) {
+ String msg = Exceptions.toMessageString(uce);
+ log.log(Level.WARNING, msg);
+ return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.BAD_REQUEST, msg);
+ } catch (Exception e) {
+ String msg = "Could not initialize document parsing: " + Exceptions.toMessageString(e);
+ log.log(Level.WARNING, msg);
+ return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.INTERNAL_SERVER_ERROR, msg);
+ }
+ }
+
+ // SessionCache is final and no easy way to mock it so we need this to be able to do testing.
+ protected ReferencedResource<SharedSourceSession> retainSource(SessionCache sessionCache, SourceSessionParams params) {
+ return sessionCache.retainSource(params);
+ }
+
+ @Override
+ protected void destroy() {
+ // We are forking this to avoid that accidental de-referencing causes any random thread doing destruction.
+ // This caused a deadlock when the single Messenger thread in MessageBus was the last one referring this
+ // and started destructing something that required something only the messenger thread could provide.
+ Thread destroyer = new Thread(() -> {
+ cron.shutdown();
+ synchronized (monitor) {
+ for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) {
+ iterator.next().kill();
+ iterator.remove();
+ }
+ }
+ }, "feed-handler-v3-adhoc-destroyer");
+ destroyer.setDaemon(true);
+ destroyer.start();
+ }
+
+ private String clientId(HttpRequest request) {
+ String clientDictatedId = request.getHeader(Headers.CLIENT_ID);
+ if (clientDictatedId == null || clientDictatedId.isEmpty()) {
+ throw new IllegalArgumentException("Did not get any CLIENT_ID header (" + Headers.CLIENT_ID + ")");
+ }
+ return clientDictatedId;
+ }
+
+ private SourceSessionParams sourceSessionParams(HttpRequest request) {
+ SourceSessionParams params = new SourceSessionParams();
+ String timeout = request.getHeader(Headers.TIMEOUT);
+
+ if (timeout != null) {
+ try {
+ params.setTimeout(Double.parseDouble(timeout));
+ } catch (NumberFormatException e) {
+ // NOP
+ }
+ }
+ return params;
+ }
+
+ private void removeOldClients() {
+ synchronized (monitor) {
+ for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) {
+ ClientFeederV3 client = iterator.next();
+ if (client.timedOut()) {
+ client.kill();
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java
new file mode 100644
index 00000000000..069ccfd84f0
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java
@@ -0,0 +1,62 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.json.JsonFeedReader;
+import com.yahoo.text.Utf8;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespaxmlparser.FeedReader;
+import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
+
+import java.io.InputStream;
+
+/**
+ * Class for creating FeedReader based on dataFormat.
+ * @author dybis
+ */
+public class FeedReaderFactory {
+ private static final int MARK_READLIMIT = 200;
+
+ private final boolean debug;
+ public FeedReaderFactory(boolean debug) {
+ this.debug = debug;
+ }
+
+ /**
+ * Creates FeedReader
+ * @param inputStream source of feed data
+ * @param docTypeManager handles the parsing of the document
+ * @param dataFormat specifies the format
+ * @return a feedreader
+ */
+ public FeedReader createReader(
+ InputStream inputStream,
+ DocumentTypeManager docTypeManager,
+ FeedParams.DataFormat dataFormat) {
+ switch (dataFormat) {
+ case XML_UTF8:
+ byte [] peek = null;
+ int bytesPeeked = 0;
+ try {
+ if (debug && inputStream.markSupported()) {
+ peek = new byte[MARK_READLIMIT];
+ inputStream.mark(MARK_READLIMIT);
+ bytesPeeked = inputStream.read(peek);
+ inputStream.reset();
+ }
+ return new VespaXMLFeedReader(inputStream, docTypeManager);
+ } catch (Exception e) {
+ if (bytesPeeked > 0) {
+ throw new RuntimeException("Could not create VespaXMLFeedReader. First characters are: '" + Utf8.toString(peek, 0, bytesPeeked) + "'", e);
+ } else {
+ throw new RuntimeException("Could not create VespaXMLFeedReader.", e);
+ }
+ }
+ case JSON_UTF8:
+ return new JsonFeedReader(inputStream, docTypeManager);
+ default:
+ throw new IllegalStateException("Can not create feed reader for format: " + dataFormat);
+ }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java
new file mode 100644
index 00000000000..2fbb80d9fcc
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java
@@ -0,0 +1,94 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentReply;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.documentapi.metrics.DocumentOperationStatus;
+import com.yahoo.documentapi.metrics.DocumentOperationType;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.vespa.http.client.core.ErrorCode;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static java.util.function.Predicate.not;
+
+/**
+ * Catch message bus replies and make the available to a given session.
+ *
+ * @author Steinar Knutsen
+ */
+public class FeedReplyReader implements ReplyHandler {
+
+ private static final Logger log = Logger.getLogger(FeedReplyReader.class.getName());
+ private final Metric metric;
+ private final DocumentApiMetrics metricsHelper;
+ private final Metric.Context testAndSetMetricCtx;
+
+ public FeedReplyReader(Metric metric, DocumentApiMetrics metricsHelper) {
+ this.metric = metric;
+ this.metricsHelper = metricsHelper;
+ this.testAndSetMetricCtx = metric.createContext(Map.of("operationType", "testAndSet"));
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ Object o = reply.getContext();
+ if (!(o instanceof ReplyContext)) {
+ return;
+ }
+ ReplyContext context = (ReplyContext) o;
+ final double latencyInSeconds = (System.currentTimeMillis() - context.creationTime) / 1000.0d;
+ metric.set(MetricNames.LATENCY, latencyInSeconds, null);
+
+ DocumentOperationType type = DocumentOperationType.fromMessage(reply.getMessage());
+ boolean conditionMet = conditionMet(reply);
+ if (reply.hasErrors() && conditionMet) {
+ DocumentOperationStatus status = DocumentOperationStatus.fromMessageBusErrorCodes(reply.getErrorCodes());
+ metricsHelper.reportFailure(type, status);
+ metric.add(MetricNames.FAILED, 1, null);
+ enqueue(context, reply.getError(0).getMessage(), ErrorCode.ERROR, false, reply.getTrace());
+ } else {
+ metricsHelper.reportSuccessful(type, latencyInSeconds);
+ metric.add(MetricNames.SUCCEEDED, 1, null);
+ if ( ! conditionMet)
+ metric.add(MetricNames.CONDITION_NOT_MET, 1, testAndSetMetricCtx);
+ if ( ! updateNotFound(reply))
+ metric.add(MetricNames.NOT_FOUND, 1, null);
+ enqueue(context, "Document processed.", ErrorCode.OK, !conditionMet, reply.getTrace());
+ }
+ }
+
+ private static boolean conditionMet(Reply reply) {
+ return ! reply.hasErrors() || reply.getError(0).getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED;
+ }
+
+ private static boolean updateNotFound(Reply reply) {
+ return reply instanceof UpdateDocumentReply
+ && ! ((UpdateDocumentReply) reply).wasFound()
+ && reply.getMessage() instanceof UpdateDocumentMessage
+ && ((UpdateDocumentMessage) reply.getMessage()).getDocumentUpdate() != null
+ && ! ((UpdateDocumentMessage) reply.getMessage()).getDocumentUpdate().getCreateIfNonExistent();
+ }
+
+ private void enqueue(ReplyContext context, String message, ErrorCode status, boolean isConditionNotMet, Trace trace) {
+ try {
+ String traceMessage = (trace != null && trace.getLevel() > 0) ? trace.toString() : "";
+
+ context.feedReplies.put(new OperationStatus(message, context.docId, status, isConditionNotMet, traceMessage));
+ } catch (InterruptedException e) {
+ log.log(Level.WARNING,
+ "Interrupted while enqueueing result from putting document with id: " + context.docId);
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java
new file mode 100644
index 00000000000..3e2a4a8795f
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java
@@ -0,0 +1,88 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.vespa.http.client.core.Headers;
+import com.yahoo.vespa.http.client.core.ErrorCode;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Reads feed responses from a queue and renders them continuously to the
+ * feeder.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @since 5.1
+ */
+public class FeedResponse extends HttpResponse {
+
+ BlockingQueue<OperationStatus> operations;
+
+ public FeedResponse(
+ int status,
+ BlockingQueue<OperationStatus> operations,
+ int protocolVersion,
+ String sessionId) {
+ super(status);
+ this.operations = operations;
+ headers().add(Headers.SESSION_ID, sessionId);
+ headers().add(Headers.VERSION, Integer.toString(protocolVersion));
+ }
+
+ // This is used by the V3 protocol.
+ public FeedResponse(
+ int status,
+ BlockingQueue<OperationStatus> operations,
+ int protocolVersion,
+ String sessionId,
+ int outstandingClientOperations,
+ String hostName) {
+ super(status);
+ this.operations = operations;
+ headers().add(Headers.SESSION_ID, sessionId);
+ headers().add(Headers.VERSION, Integer.toString(protocolVersion));
+ headers().add(Headers.OUTSTANDING_REQUESTS, Integer.toString(outstandingClientOperations));
+ headers().add(Headers.HOSTNAME, hostName);
+ }
+
+ @Override
+ public void render(OutputStream output) throws IOException {
+ int i = 0;
+ OperationStatus status;
+ try {
+ status = operations.take();
+ while (status.errorCode != ErrorCode.END_OF_FEED) {
+ output.write(toBytes(status.render()));
+ if (++i % 5 == 0) {
+ output.flush();
+ }
+ status = operations.take();
+ }
+ } catch (InterruptedException e) {
+ output.flush();
+ }
+ }
+
+ private byte[] toBytes(String s) {
+ byte[] b = new byte[s.length()];
+ for (int i = 0; i < b.length; ++i) {
+ b[i] = (byte) s.charAt(i); // renderSingleStatus ensures ASCII only
+ }
+ return b;
+ }
+
+ @Override
+ public String getContentType() {
+ return "text/plain";
+ }
+
+ @Override
+ public String getCharacterEncoding() {
+ return StandardCharsets.US_ASCII.name();
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
new file mode 100644
index 00000000000..725349f6ebe
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
@@ -0,0 +1,33 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
+import com.yahoo.vespa.http.client.core.Headers;
+
+import java.util.Optional;
+
+/**
+ * Wrapper for the feed feederSettings read from HTTP request.
+ *
+ * @author Steinar Knutsen
+ */
+public class FeederSettings {
+
+ private static final Route DEFAULT_ROUTE = Route.parse("default");
+ public final boolean drain; // TODO: Implement drain=true
+ public final Route route;
+ public final DataFormat dataFormat;
+ public final String priority;
+ public final Integer traceLevel;
+
+ public FeederSettings(HttpRequest request) {
+ this.drain = Optional.ofNullable(request.getHeader(Headers.DRAIN)).map(Boolean::parseBoolean).orElse(false);
+ this.route = Optional.ofNullable(request.getHeader(Headers.ROUTE)).map(Route::parse).orElse(DEFAULT_ROUTE);
+ this.dataFormat = Optional.ofNullable(request.getHeader(Headers.DATA_FORMAT)).map(DataFormat::valueOf).orElse(DataFormat.JSON_UTF8);
+ this.priority = request.getHeader(Headers.PRIORITY);
+ this.traceLevel = Optional.ofNullable(request.getHeader(Headers.TRACE_LEVEL)).map(Integer::valueOf).orElse(null);
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java
index c2d9f0b292e..a5987f2398e 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.metrics;
+package com.yahoo.vespa.http.server;
/**
* Place to store the metric names so where the metrics are logged can be found
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java
new file mode 100644
index 00000000000..aa2651595ef
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java
@@ -0,0 +1,25 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.vespa.http.client.core.OperationStatus;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Mapping between document ID and client session.
+ *
+ * @author Steinar Knutsen
+ */
+public class ReplyContext {
+
+ public final String docId;
+ public final BlockingQueue<OperationStatus> feedReplies;
+ public final long creationTime;
+
+ public ReplyContext(String docId, BlockingQueue<OperationStatus> feedReplies) {
+ this.docId = docId;
+ this.feedReplies = feedReplies;
+ this.creationTime = System.currentTimeMillis();
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java
new file mode 100644
index 00000000000..4ddc430b35f
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java
@@ -0,0 +1,85 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.vespa.http.client.core.Encoder;
+import com.yahoo.vespa.http.server.util.ByteLimitedInputStream;
+import com.yahoo.vespaxmlparser.FeedOperation;
+import com.yahoo.vespaxmlparser.FeedReader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.logging.Logger;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * This code is based on v2 code, but restructured so stream reading code is in one dedicated class.
+ * @author dybis
+ */
+public class StreamReaderV3 {
+
+ protected static final Logger log = Logger.getLogger(StreamReaderV3.class.getName());
+
+ private final FeedReaderFactory feedReaderFactory;
+ private final DocumentTypeManager docTypeManager;
+
+ public StreamReaderV3(FeedReaderFactory feedReaderFactory, DocumentTypeManager docTypeManager) {
+ this.feedReaderFactory = feedReaderFactory;
+ this.docTypeManager = docTypeManager;
+ }
+
+ public FeedOperation getNextOperation(InputStream requestInputStream, FeederSettings settings) throws Exception {
+ FeedOperation op = null;
+
+ int length = readByteLength(requestInputStream);
+
+ try (InputStream limitedInputStream = new ByteLimitedInputStream(requestInputStream, length)){
+ FeedReader reader = feedReaderFactory.createReader(limitedInputStream, docTypeManager, settings.dataFormat);
+ op = reader.read();
+ }
+ return op;
+ }
+
+ public Optional<String> getNextOperationId(InputStream requestInputStream) throws IOException {
+ StringBuilder idBuf = new StringBuilder(100);
+ int c;
+ while ((c = requestInputStream.read()) != -1) {
+ if (c == 32) {
+ break;
+ }
+ idBuf.append((char) c); //it's ASCII
+ }
+ if (idBuf.length() == 0) {
+ return Optional.empty();
+ }
+ return Optional.of(Encoder.decode(idBuf.toString(), new StringBuilder(idBuf.length())).toString());
+ }
+
+ private int readByteLength(InputStream requestInputStream) throws IOException {
+ StringBuilder lenBuf = new StringBuilder(8);
+ int c;
+ while ((c = requestInputStream.read()) != -1) {
+ if (c == 10) {
+ break;
+ }
+ lenBuf.append((char) c); //it's ASCII
+ }
+ if (lenBuf.length() == 0) {
+ throw new IllegalStateException("Operation length missing.");
+ }
+ return Integer.valueOf(lenBuf.toString(), 16);
+ }
+
+ public static InputStream unzipStreamIfNeeded(final HttpRequest httpRequest)
+ throws IOException {
+ final String contentEncodingHeader = httpRequest.getHeader("content-encoding");
+ if ("gzip".equals(contentEncodingHeader)) {
+ return new GZIPInputStream(httpRequest.getData());
+ } else {
+ return httpRequest.getData();
+ }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java
new file mode 100644
index 00000000000..5324b86a98a
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java
@@ -0,0 +1,14 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.5.0
+ */
+public class UnknownClientException extends RuntimeException {
+
+ public UnknownClientException(String message) {
+ super(message);
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java
new file mode 100644
index 00000000000..ea01137d9af
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java
@@ -0,0 +1,7 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * Server side of programmatic API for feeding into Vespa from outside of the
+ * clusters. Not a public API, not meant for direct use.
+ */
+@com.yahoo.api.annotations.PackageMarker
+package com.yahoo.vespa.http.server;
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java
new file mode 100644
index 00000000000..270ebe7796b
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java
@@ -0,0 +1,99 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * @author Einar M R Rosenvinge
+ *
+ * @since 5.1.23
+ */
+public class ByteLimitedInputStream extends InputStream {
+
+ private final InputStream wrappedStream;
+ private int remaining;
+ private int remainingWhenMarked;
+
+ public ByteLimitedInputStream(InputStream wrappedStream, int limit) {
+ this.wrappedStream = wrappedStream;
+ if (limit < 0) {
+ throw new IllegalArgumentException("limit cannot be 0");
+ }
+ this.remaining = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining <= 0) {
+ return -1;
+ }
+ int retval = wrappedStream.read();
+ if (retval < 0) {
+ remaining = 0;
+ } else {
+ --remaining;
+ }
+ return retval;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ if (remaining <= 0) {
+ return -1;
+ }
+
+ int bytesToRead = Math.min(remaining, len);
+ int retval = wrappedStream.read(b, off, bytesToRead);
+
+ if (retval < 0) {
+ //end of underlying stream was reached, and nothing was read.
+ remaining = 0;
+ } else {
+ remaining -= retval;
+ }
+ return retval;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return remaining;
+ }
+
+ @Override
+ public void close() throws IOException {
+ //we will never close the underlying stream
+ if (remaining <= 0) {
+ return;
+ }
+ while (remaining > 0) {
+ skip(remaining);
+ }
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ wrappedStream.mark(readlimit);
+ remainingWhenMarked = remaining;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ wrappedStream.reset();
+ remaining = remainingWhenMarked;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return wrappedStream.markSupported();
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/CollectingMetric.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/CollectingMetric.java
new file mode 100644
index 00000000000..1b9a5eb6381
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/CollectingMetric.java
@@ -0,0 +1,38 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.jdisc.Metric;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author ollivir
+ */
+public final class CollectingMetric implements Metric {
+ private final Context DUMMY_CONTEXT = new Context() {};
+ private final Map<String, AtomicLong> values = new ConcurrentHashMap<>();
+
+ public CollectingMetric() {}
+
+ @Override
+ public void set(String key, Number val, Context ctx) {
+ values.computeIfAbsent(key, ignored -> new AtomicLong(0)).set(val.longValue());
+ }
+
+ @Override
+ public void add(String key, Number val, Context ctx) {
+ values.computeIfAbsent(key, ignored -> new AtomicLong(0)).addAndGet(val.longValue());
+ }
+
+ public long get(String key) {
+ return Optional.ofNullable(values.get(key)).map(AtomicLong::get).orElse(0L);
+ }
+
+ @Override
+ public Context createContext(Map<String, ?> properties) {
+ return DUMMY_CONTEXT;
+ }
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/DummyMetric.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/DummyMetric.java
new file mode 100644
index 00000000000..1cdac87f3df
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/DummyMetric.java
@@ -0,0 +1,31 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.jdisc.Metric;
+
+import java.util.Map;
+
+/**
+ * @author Einar M R Rosenvinge
+ * @since 5.1.20
+ */
+class DummyMetric implements Metric {
+
+ @Override
+ public void set(String key, Number val, Context ctx) {
+ }
+
+ @Override
+ public void add(String key, Number val, Context ctx) {
+ }
+
+ @Override
+ public Context createContext(Map<String, ?> properties) {
+ return DummyContext.INSTANCE;
+ }
+
+ private static class DummyContext implements Context {
+ private static final DummyContext INSTANCE = new DummyContext();
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerCompressionTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerCompressionTest.java
new file mode 100644
index 00000000000..6f1b5eebcc4
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerCompressionTest.java
@@ -0,0 +1,69 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.container.jdisc.HttpRequest;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FeedHandlerCompressionTest {
+
+ public static byte[] compress(final String dataToBrCompressed) throws IOException {
+ final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(dataToBrCompressed.length());
+ final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
+ gzipOutputStream.write(dataToBrCompressed.getBytes());
+ gzipOutputStream.close();
+ byte[] compressedBytes = byteArrayOutputStream.toByteArray();
+ byteArrayOutputStream.close();
+ return compressedBytes;
+ }
+
+ @Test
+ public void testUnzipStreamIfNeeded() throws Exception {
+ final String testData = "foo bar";
+ InputStream inputStream = new ByteArrayInputStream(compress(testData));
+ HttpRequest httpRequest = mock(HttpRequest.class);
+ when(httpRequest.getHeader("content-encoding")).thenReturn("gzip");
+ InputStream decompressedStream = FeedHandler.unzipStreamIfNeeded(inputStream, httpRequest);
+ final StringBuilder processedInput = new StringBuilder();
+ while (true) {
+ int readValue = decompressedStream.read();
+ if (readValue < 0) {
+ break;
+ }
+ processedInput.append((char)readValue);
+ }
+ assertEquals(processedInput.toString(), testData);
+ }
+
+ /**
+ * Test by setting encoding, but not compressing data.
+ * @throws Exception
+ */
+ @Test
+ public void testUnzipFails() throws Exception {
+ final String testData = "foo bar";
+ InputStream inputStream = new ByteArrayInputStream(testData.getBytes());
+ HttpRequest httpRequest = mock(HttpRequest.class);
+ when(httpRequest.getHeader("Content-Encoding")).thenReturn("gzip");
+ InputStream decompressedStream = FeedHandler.unzipStreamIfNeeded(inputStream, httpRequest);
+ final StringBuilder processedInput = new StringBuilder();
+ while (true) {
+ int readValue = decompressedStream.read();
+ if (readValue < 0) {
+ break;
+ }
+ processedInput.append((char)readValue);
+ }
+ assertEquals(processedInput.toString(), testData);
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerTest.java
new file mode 100644
index 00000000000..f3ea8fb5a80
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerTest.java
@@ -0,0 +1,45 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+import com.yahoo.container.handler.threadpool.ContainerThreadPool;
+import com.yahoo.container.jdisc.RequestHandlerTestDriver;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.jdisc.handler.OverloadException;
+import com.yahoo.metrics.simple.MetricReceiver;
+import org.junit.Test;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+
+import static com.yahoo.vespa.http.server.FeedHandlerV3Test.createRequest;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author bjorncs
+ */
+public class FeedHandlerTest {
+
+ @Test
+ public void response_has_status_code_429_when_throttling() {
+ FeedHandler handler = new FeedHandler(
+ new RejectingContainerThreadpool(),
+ new CollectingMetric(),
+ new DocumentTypeManager(new DocumentmanagerConfig.Builder().enablecompression(true).build()),
+ null /* session cache */,
+ MetricReceiver.nullImplementation);
+ var responseHandler = new RequestHandlerTestDriver.MockResponseHandler();
+ try {
+ handler.handleRequest(createRequest(100).getJDiscRequest(), responseHandler);
+ fail();
+ } catch (OverloadException e) {}
+ assertEquals(429, responseHandler.getStatus());
+ }
+
+ private static class RejectingContainerThreadpool implements ContainerThreadPool {
+ private final Executor executor = ignored -> { throw new RejectedExecutionException(); };
+
+ @Override public Executor executor() { return executor; }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java
new file mode 100644
index 00000000000..a5a8f4cb5bd
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java
@@ -0,0 +1,152 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.google.common.base.Splitter;
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.container.jdisc.messagebus.SessionCache;
+import com.yahoo.document.DataType;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.messagebus.Result;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.metrics.simple.MetricReceiver;
+import com.yahoo.text.Utf8;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.core.ErrorCode;
+import com.yahoo.vespa.http.client.core.Headers;
+import com.yahoo.vespa.http.client.core.OperationStatus;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FeedHandlerV3Test {
+ final CollectingMetric metric = new CollectingMetric();
+ private final Executor simpleThreadpool = Executors.newCachedThreadPool();
+
+ @Test
+ public void feedOneDocument() throws Exception {
+ final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(simpleThreadpool);
+ HttpResponse httpResponse = feedHandlerV3.handle(createRequest(1));
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ httpResponse.render(outStream);
+ assertEquals(httpResponse.getContentType(), "text/plain");
+ assertEquals(Utf8.toString(outStream.toByteArray()), "1230 OK message trace\n");
+ }
+
+ @Test
+ public void feedOneBrokenDocument() throws Exception {
+ final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(simpleThreadpool);
+ HttpResponse httpResponse = feedHandlerV3.handle(createBrokenRequest());
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ httpResponse.render(outStream);
+ assertEquals(httpResponse.getContentType(), "text/plain");
+ assertTrue(Utf8.toString(outStream.toByteArray()).startsWith("1230 ERROR "));
+ assertEquals(1L, metric.get(MetricNames.PARSE_ERROR));
+ }
+
+ @Test
+ public void feedManyDocument() throws Exception {
+ final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(simpleThreadpool);
+ HttpResponse httpResponse = feedHandlerV3.handle(createRequest(100));
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ httpResponse.render(outStream);
+ assertEquals(httpResponse.getContentType(), "text/plain");
+ String result = Utf8.toString(outStream.toByteArray());
+ assertEquals(101, Splitter.on("\n").splitToList(result).size());
+ }
+
+ private static DocumentTypeManager createDoctypeManager() {
+ DocumentTypeManager docTypeManager = new DocumentTypeManager();
+ DocumentType documentType = new DocumentType("testdocument");
+ documentType.addField("title", DataType.STRING);
+ documentType.addField("body", DataType.STRING);
+ docTypeManager.registerDocumentType(documentType);
+ return docTypeManager;
+ }
+
+ static HttpRequest createRequest(int numberOfDocs) {
+ StringBuilder wireData = new StringBuilder();
+ for (int x = 0; x < numberOfDocs; x++) {
+ String docData = "[{\"put\": \"id:testdocument:testdocument::c\", \"fields\": { \"title\": \"fooKey\", \"body\": \"value\"}}]";
+ String operationId = "123" + x;
+ wireData.append(operationId + " " + Integer.toHexString(docData.length()) + "\n" + docData);
+ }
+ return createRequestWithPayload(wireData.toString());
+ }
+
+ private static HttpRequest createBrokenRequest() {
+ String docData = "[{\"put oops I broke it]";
+ String wireData = "1230 " + Integer.toHexString(docData.length()) + "\n" + docData;
+ return createRequestWithPayload(wireData);
+ }
+
+ static HttpRequest createRequestWithPayload(String payload) {
+ InputStream inputStream = new ByteArrayInputStream(payload.getBytes());
+ HttpRequest request = HttpRequest.createTestRequest("http://dummyhostname:19020/reserved-for-internal-use/feedapi",
+ com.yahoo.jdisc.http.HttpRequest.Method.POST, inputStream);
+ request.getJDiscRequest().headers().add(Headers.VERSION, "3");
+ request.getJDiscRequest().headers().add(Headers.DATA_FORMAT, FeedParams.DataFormat.JSON_UTF8.name());
+ request.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000");
+ request.getJDiscRequest().headers().add(Headers.CLIENT_ID, "client123");
+ request.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST");
+ request.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4");
+ request.getJDiscRequest().headers().add(Headers.DRAIN, "true");
+ return request;
+ }
+
+ private FeedHandlerV3 setupFeederHandler(Executor threadPool) {
+ DocumentTypeManager docMan = new DocumentTypeManager(new DocumentmanagerConfig.Builder().enablecompression(true).build());
+ FeedHandlerV3 feedHandlerV3 = new FeedHandlerV3(
+ threadPool,
+ metric,
+ docMan,
+ null /* session cache */,
+ new DocumentApiMetrics(MetricReceiver.nullImplementation, "test")) {
+ @Override
+ protected ReferencedResource<SharedSourceSession> retainSource(
+ SessionCache sessionCache, SourceSessionParams sessionParams) {
+ SharedSourceSession sharedSourceSession = mock(SharedSourceSession.class);
+
+ try {
+ when(sharedSourceSession.sendMessageBlocking(any())).thenAnswer((Answer<?>) invocation -> {
+ Object[] args = invocation.getArguments();
+ PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0];
+ ReplyContext replyContext = (ReplyContext) putDocumentMessage.getContext();
+ replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace"));
+ Result result = mock(Result.class);
+ when(result.isAccepted()).thenReturn(true);
+ return result;
+ });
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ Result result = mock(Result.class);
+ when(result.isAccepted()).thenReturn(true);
+ ReferencedResource<SharedSourceSession> refSharedSessopn =
+ new ReferencedResource<>(sharedSourceSession, () -> {});
+ return refSharedSessopn;
+ }
+ };
+ feedHandlerV3.injectDocumentManangerForTests(createDoctypeManager());
+ return feedHandlerV3;
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java
new file mode 100644
index 00000000000..6b0bd1c9518
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java
@@ -0,0 +1,40 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.text.Utf8;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FeedReaderFactoryTestCase {
+ DocumentTypeManager manager = new DocumentTypeManager();
+
+ private InputStream createStream(String s) {
+ return new ByteArrayInputStream(Utf8.toBytes(s));
+ }
+
+ @Test
+ public void testXmlExceptionWithDebug() {
+ try {
+ new FeedReaderFactory(true).createReader(createStream("Some malformed xml"), manager, FeedParams.DataFormat.XML_UTF8);
+ fail();
+ } catch (RuntimeException e) {
+ assertEquals("Could not create VespaXMLFeedReader. First characters are: 'Some malformed xml'", e.getMessage());
+ }
+ }
+ @Test
+ public void testXmlException() {
+ try {
+ new FeedReaderFactory(false).createReader(createStream("Some malformed xml"), manager, FeedParams.DataFormat.XML_UTF8);
+ fail();
+ } catch (RuntimeException e) {
+ assertEquals("Could not create VespaXMLFeedReader.", e.getMessage());
+ }
+ }
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MetaStream.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MetaStream.java
new file mode 100644
index 00000000000..4dce8cb4e7d
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MetaStream.java
@@ -0,0 +1,39 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.text.Utf8;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * Stream with extra data outside the actual input stream.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public final class MetaStream extends ByteArrayInputStream {
+
+ private byte[] operations;
+ int i;
+
+ public MetaStream(byte[] buf) {
+ super(createPayload(buf));
+ this.operations = buf;
+ i = 0;
+ }
+
+ private static final byte[] createPayload(byte[] buf) {
+ StringBuilder bu = new StringBuilder();
+ for (int i = 0; i < buf.length; i++) {
+ bu.append("id:banana:banana::doc1 0\n");
+ }
+ return Utf8.toBytes(bu.toString());
+ }
+
+ public byte getNextOperation() {
+ if (i >= operations.length) {
+ return 0;
+ }
+ return operations[i++];
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockNetwork.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockNetwork.java
new file mode 100644
index 00000000000..7d3c0bb74ca
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockNetwork.java
@@ -0,0 +1,69 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import java.util.List;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.NetworkOwner;
+import com.yahoo.messagebus.routing.RoutingNode;
+
+/**
+ * NOP-network.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+class MockNetwork implements Network {
+
+ @Override
+ public boolean waitUntilReady(double seconds) {
+ return true;
+ }
+
+ @Override
+ public void attach(NetworkOwner owner) {
+ }
+
+ @Override
+ public void registerSession(String session) {
+ }
+
+ @Override
+ public void unregisterSession(String session) {
+
+ }
+
+ @Override
+ public boolean allocServiceAddress(RoutingNode recipient) {
+ return true;
+ }
+
+ @Override
+ public void freeServiceAddress(RoutingNode recipient) {
+
+ }
+
+ @Override
+ public void send(Message msg, List<RoutingNode> recipients) {
+ }
+
+ @Override
+ public void sync() {
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public String getConnectionSpec() {
+ return null;
+ }
+
+ @Override
+ public IMirror getMirror() {
+ return null;
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockReply.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockReply.java
new file mode 100644
index 00000000000..1cb00160bbd
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/MockReply.java
@@ -0,0 +1,35 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.messagebus.Reply;
+import com.yahoo.text.Utf8String;
+
+/**
+ * Minimal reply simulator.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+class MockReply extends Reply {
+
+ Object context;
+
+ public MockReply(Object context) {
+ this.context = context;
+ }
+
+ @Override
+ public Utf8String getProtocol() {
+ return null;
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+
+ @Override
+ public Object getContext() {
+ return context;
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java
new file mode 100644
index 00000000000..6858c4bede3
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java
@@ -0,0 +1,118 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server;
+
+import com.yahoo.collections.Tuple2;
+import com.yahoo.container.jdisc.HttpResponse;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.7.0
+ */
+public class VersionsTestCase {
+
+ private static final List<String> EMPTY = Collections.emptyList();
+ private static final List<String> ONE_TWO = Arrays.asList("1", "2");
+ private static final List<String> ONE_THREE = Arrays.asList("1", "3");
+ private static final List<String> TWO_THREE = Arrays.asList("3", "2");
+ private static final List<String> ONE_NULL_THREE = Arrays.asList("1", null, "3");
+ private static final List<String> ONE_COMMA_THREE = Collections.singletonList("1, 3");
+ private static final List<String> ONE_EMPTY_THREE = Arrays.asList("1", "", "3");
+ private static final List<String> TOO_LARGE_NUMBER = Collections.singletonList("1000000000");
+ private static final List<String> THREE_TOO_LARGE_NUMBER = Arrays.asList("3", "1000000000");
+ private static final List<String> THREE_COMMA_TOO_LARGE_NUMBER = Arrays.asList("3,1000000000");
+ private static final List<String> GARBAGE = Collections.singletonList("garbage");
+
+ @Test
+ public void testEmpty() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(EMPTY);
+ assertTrue(v.first instanceof ErrorHttpResponse);
+ assertEquals(Integer.valueOf(-1), v.second);
+ }
+
+ @Test
+ public void testOneTwo() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_TWO);
+ assertTrue(v.first instanceof ErrorHttpResponse);
+ assertEquals(Integer.valueOf(-1), v.second);
+ }
+
+ @Test
+ public void testOneThree() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_THREE);
+ assertNull(v.first);
+ assertEquals(Integer.valueOf(3), v.second);
+ }
+
+ @Test
+ public void testTwoThree() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(TWO_THREE);
+ assertNull(v.first);
+ assertEquals(Integer.valueOf(3), v.second);
+ }
+
+ @Test
+ public void testOneNullThree() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_NULL_THREE);
+ assertNull(v.first);
+ assertEquals(Integer.valueOf(3), v.second);
+ }
+
+ @Test
+ public void testOneCommaThree() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_COMMA_THREE);
+ assertNull(v.first);
+ assertEquals(Integer.valueOf(3), v.second);
+ }
+
+ @Test
+ public void testOneEmptyThree() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_EMPTY_THREE);
+ assertNull(v.first);
+ assertEquals(Integer.valueOf(3), v.second);
+ }
+
+ @Test
+ public void testTooLarge() throws Exception {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(TOO_LARGE_NUMBER);
+ assertTrue(v.first instanceof ErrorHttpResponse);
+ ByteArrayOutputStream errorMsg = new ByteArrayOutputStream();
+ ErrorHttpResponse errorResponse = (ErrorHttpResponse) v.first;
+ errorResponse.render(errorMsg);
+ assertEquals(errorMsg.toString(),
+ "Could not parse X-Yahoo-Feed-Protocol-Versionheader of request (values: [1000000000]). " +
+ "Server supports protocol versions [3]");
+ assertEquals(Integer.valueOf(-1), v.second);
+ }
+
+ @Test
+ public void testThreeTooLarge() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(THREE_TOO_LARGE_NUMBER);
+ assertNull(v.first);
+ assertEquals(Integer.valueOf(3), v.second);
+ }
+
+ @Test
+ public void testTwoCommaTooLarge() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(THREE_COMMA_TOO_LARGE_NUMBER);
+ assertNull(v.first);
+ assertEquals(Integer.valueOf(3), v.second);
+ }
+
+ @Test
+ public void testGarbage() {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(GARBAGE);
+ assertTrue(v.first instanceof ErrorHttpResponse);
+ assertEquals(Integer.valueOf(-1), v.second);
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java
new file mode 100644
index 00000000000..ed571c6baff
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java
@@ -0,0 +1,106 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.server.util;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.23
+ */
+public class ByteLimitedInputStreamTestCase {
+
+ private static ByteLimitedInputStream create(byte[] source, int limit) {
+ if (limit > source.length) {
+ throw new IllegalArgumentException("Limit is greater than length of source buffer.");
+ }
+ InputStream wrappedStream = new ByteArrayInputStream(source);
+ return new ByteLimitedInputStream(wrappedStream, limit);
+ }
+
+ @Test
+ public void requireThatBasicsWork() throws IOException {
+ ByteLimitedInputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9);
+
+ assertEquals(9, stream.available());
+ assertEquals(97, stream.read());
+ assertEquals(8, stream.available());
+ assertEquals(98, stream.read());
+ assertEquals(7, stream.available());
+ assertEquals(99, stream.read());
+ assertEquals(6, stream.available());
+ assertEquals(100, stream.read());
+ assertEquals(5, stream.available());
+ assertEquals(101, stream.read());
+ assertEquals(4, stream.available());
+ assertEquals(102, stream.read());
+ assertEquals(3, stream.available());
+ assertEquals(103, stream.read());
+ assertEquals(2, stream.available());
+ assertEquals(104, stream.read());
+ assertEquals(1, stream.available());
+ assertEquals(105, stream.read());
+ assertEquals(0, stream.available());
+ assertEquals(-1, stream.read());
+ assertEquals(0, stream.available());
+ assertEquals(-1, stream.read());
+ assertEquals(0, stream.available());
+ assertEquals(-1, stream.read());
+ assertEquals(0, stream.available());
+ assertEquals(-1, stream.read());
+ assertEquals(0, stream.available());
+ assertEquals(-1, stream.read());
+ assertEquals(0, stream.available());
+ }
+
+ @Test
+ public void requireThatChunkedReadWorks() throws IOException {
+ ByteLimitedInputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9);
+
+ assertEquals(9, stream.available());
+ byte[] toBuf = new byte[4];
+ assertEquals(4, stream.read(toBuf));
+ assertEquals(97, toBuf[0]);
+ assertEquals(98, toBuf[1]);
+ assertEquals(99, toBuf[2]);
+ assertEquals(100, toBuf[3]);
+ assertEquals(5, stream.available());
+
+ assertEquals(4, stream.read(toBuf));
+ assertEquals(101, toBuf[0]);
+ assertEquals(102, toBuf[1]);
+ assertEquals(103, toBuf[2]);
+ assertEquals(104, toBuf[3]);
+ assertEquals(1, stream.available());
+
+ assertEquals(1, stream.read(toBuf));
+ assertEquals(105, toBuf[0]);
+ assertEquals(0, stream.available());
+
+ assertEquals(-1, stream.read(toBuf));
+ assertEquals(0, stream.available());
+ }
+
+ @Test
+ public void requireMarkWorks() throws IOException {
+ InputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9);
+ assertEquals(97, stream.read());
+ assertTrue(stream.markSupported());
+ stream.mark(5);
+ assertEquals(98, stream.read());
+ assertEquals(99, stream.read());
+ stream.reset();
+ assertEquals(98, stream.read());
+ assertEquals(99, stream.read());
+ assertEquals(100, stream.read());
+ assertEquals(101, stream.read());
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java
new file mode 100644
index 00000000000..513892af213
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java
@@ -0,0 +1,32 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespaxmlparser;
+
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.server.FeedReaderFactory;
+
+import java.io.InputStream;
+
+/**
+ * For creating MockReader of innput stream.
+ * @author dybis
+ */
+public class MockFeedReaderFactory extends FeedReaderFactory {
+
+ public MockFeedReaderFactory() {
+ super(true);
+ }
+
+ @Override
+ public FeedReader createReader(
+ InputStream inputStream,
+ DocumentTypeManager docTypeManager,
+ FeedParams.DataFormat dataFormat) {
+ try {
+ return new MockReader(inputStream);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockReader.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockReader.java
new file mode 100644
index 00000000000..c751849b84e
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockReader.java
@@ -0,0 +1,69 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespaxmlparser;
+
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.vespa.http.server.MetaStream;
+import com.yahoo.vespa.http.server.util.ByteLimitedInputStream;
+
+import java.io.InputStream;
+import java.lang.reflect.Field;
+
+/**
+ * Mock for ExternalFeedTestCase which had to override package private methods.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class MockReader implements FeedReader {
+
+ MetaStream stream;
+ boolean finished = false;
+
+ public MockReader(InputStream stream) throws Exception {
+ this.stream = getMetaStream(stream);
+ }
+
+ private static MetaStream getMetaStream(InputStream stream) {
+ if (stream instanceof MetaStream) {
+ return (MetaStream) stream;
+ }
+ if (!(stream instanceof ByteLimitedInputStream)) {
+ throw new IllegalStateException("Given unknown stream type.");
+ }
+ //Ooooooo this is so ugly
+ try {
+ ByteLimitedInputStream byteLimitedInputStream = (ByteLimitedInputStream) stream;
+ Field f = byteLimitedInputStream.getClass().getDeclaredField("wrappedStream"); //NoSuchFieldException
+ f.setAccessible(true);
+ return (MetaStream) f.get(byteLimitedInputStream);
+ } catch (Exception e) {
+ throw new IllegalStateException("Implementation of ByteLimitedInputStream has changed.", e);
+ }
+ }
+
+ @Override
+ public FeedOperation read() throws Exception {
+ if (finished) {
+ return FeedOperation.INVALID;
+ }
+
+ byte whatToDo = stream.getNextOperation();
+ DocumentId id = new DocumentId("id:banana:banana::doc1");
+ DocumentType docType = new DocumentType("banana");
+ switch (whatToDo) {
+ case 0:
+ return FeedOperation.INVALID;
+ case 1:
+ return new DocumentFeedOperation(new Document(docType, id));
+ case 2:
+ return new RemoveFeedOperation(id);
+ case 3:
+ return new DocumentUpdateFeedOperation(new DocumentUpdate(docType, id));
+ default:
+ throw new RuntimeException("boom");
+ }
+ }
+
+}