summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2018-11-13 14:38:50 +0100
committergjoranv <gv@oath.com>2019-01-21 15:09:26 +0100
commit08b6fe261b5b88635cb561208476fd1ac2607040 (patch)
tree86ee45fa34d554f571918eb530e4f340159b8ac4 /vespaclient-container-plugin
parent01705ccfef4a08cebd98a4eb4251e7032b1eac98 (diff)
Remove internal http feed handler protocol version 2.
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java193
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Feeder.java537
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/FeedHandlerTest.java107
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ErrorsInResultTestCase.java240
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ExternalFeedTestCase.java530
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2FailingMessagebusTestCase.java226
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2NoXmlReaderTestCase.java164
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java50
8 files changed, 31 insertions, 2016 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
index 5294545ad50..bd7d195b48b 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
@@ -51,16 +51,8 @@ import java.util.zip.GZIPInputStream;
*/
public class FeedHandler extends LoggingRequestHandler {
- private final ExecutorService workers = Executors.newCachedThreadPool(ThreadFactoryFactory.getThreadFactory("feedhandler"));
- private final DocumentTypeManager docTypeManager;
- private final Map<String, ClientState> clients;
- private final ScheduledThreadPoolExecutor cron;
- private final SessionCache sessionCache;
protected final ReplyHandler feedReplyHandler;
- private final AtomicLong sessionId;
- private final Metric metric;
- private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(2, 3));
- private final String localHostname;
+ private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(3));
private final FeedHandlerV3 feedHandlerV3;
@Inject
@@ -73,46 +65,7 @@ public class FeedHandler extends LoggingRequestHandler {
super(parentCtx);
DocumentApiMetrics metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server");
feedHandlerV3 = new FeedHandlerV3(parentCtx, documentManagerConfig, sessionCache, threadpoolConfig, metricsHelper);
- docTypeManager = createDocumentManager(documentManagerConfig);
- clients = new HashMap<>();
- this.sessionCache = sessionCache;
- sessionId = new AtomicLong(ThreadLocalRandom.current().nextLong());
feedReplyHandler = new FeedReplyReader(parentCtx.getMetric(), metricsHelper);
- cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feedhandler.cron"));
- cron.scheduleWithFixedDelay(new CleanClients(), 16, 11, TimeUnit.MINUTES);
- this.metric = parentCtx.getMetric();
- this.localHostname = resolveLocalHostname();
- }
-
- /**
- * Exposed for creating mocks.
- */
- protected DocumentTypeManager createDocumentManager(DocumentmanagerConfig documentManagerConfig) {
- return new DocumentTypeManager(documentManagerConfig);
- }
-
- private class CleanClients implements Runnable {
-
- @Override
- public void run() {
- List<ClientState> clientsToShutdown = new ArrayList<>();
- long now = System.currentTimeMillis();
-
- synchronized (clients) {
- for (Iterator<Map.Entry<String, ClientState>> i = clients
- .entrySet().iterator(); i.hasNext();) {
- ClientState client = i.next().getValue();
-
- if (now - client.creationTime > 10 * 60 * 1000) {
- clientsToShutdown.add(client);
- i.remove();
- }
- }
- }
- for (ClientState client : clientsToShutdown) {
- client.sourceSession.getReference().close();
- }
- }
}
private Tuple2<HttpResponse, Integer> checkProtocolVersion(HttpRequest request) {
@@ -135,8 +88,6 @@ public class FeedHandler extends LoggingRequestHandler {
int version;
if (washedClientVersions.contains("3")) {
version = 3;
- } else if (washedClientVersions.contains("2")) { // TODO: Vespa 7: Remove support for Version 2
- version = 2;
} else {
return new Tuple2<>(new ErrorHttpResponse(
Headers.HTTP_NOT_ACCEPTABLE,
@@ -175,43 +126,7 @@ public class FeedHandler extends LoggingRequestHandler {
if (protocolVersion.first != null) {
return protocolVersion.first;
}
- if (3 == protocolVersion.second) {
- return feedHandlerV3.handle(request);
- }
- final BlockingQueue<OperationStatus> operations = new LinkedBlockingQueue<>();
- Tuple2<String, Boolean> clientId;
- clientId = sessionId(request);
-
- if (clientId.second != null && clientId.second) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Received initial request from client with session ID " +
- clientId.first + ", protocol version " + protocolVersion.second);
- }
- }
-
- Feeder feeder;
- try {
- feeder = createFeeder(request, request.getData(), operations, clientId.first,
- clientId.second, protocolVersion.second);
- // the synchronous FeedResponse blocks draining the InputStream, letting the Feeder read it
- workers.submit(feeder);
- } catch (UnknownClientException uce) {
- String msg = Exceptions.toMessageString(uce);
- log.log(LogLevel.WARNING, msg);
- return new ErrorHttpResponse(Status.BAD_REQUEST, msg);
- } catch (Exception e) {
- String msg = "Could not initialize document parsing";
- log.log(LogLevel.WARNING, "Could not initialize document parsing", e);
- return new ErrorHttpResponse(Status.INTERNAL_SERVER_ERROR, msg + ": " + Exceptions.toMessageString(e));
- }
-
- try {
- feeder.waitForRequestReceived();
- } catch (InterruptedException e) {
- return new ErrorHttpResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
-
- return new FeedResponse(200, operations, protocolVersion.second, clientId.first);
+ return feedHandlerV3.handle(request);
}
// Protected for testing
@@ -225,82 +140,6 @@ public class FeedHandler extends LoggingRequestHandler {
}
}
- /**
- * Exposed for creating mocks.
- */
- protected Feeder createFeeder(
- HttpRequest request,
- InputStream requestInputStream,
- BlockingQueue<OperationStatus> operations,
- String clientId,
- boolean sessionIdWasGeneratedJustNow,
- int protocolVersion) throws Exception {
- if (protocolVersion != 2)
- throw new IllegalStateException("Protocol version " + protocolVersion + " not supported.");
-
- return new Feeder(
- unzipStreamIfNeeded(requestInputStream, request),
- new FeedReaderFactory(),
- docTypeManager,
- operations,
- popClient(clientId),
- new FeederSettings(request),
- clientId,
- sessionIdWasGeneratedJustNow,
- sourceSessionParams(request),
- sessionCache,
- this,
- metric,
- feedReplyHandler,
- localHostname);
- }
-
- private Tuple2<String, Boolean> sessionId(HttpRequest request) {
- boolean sessionIdWasGeneratedJustNow = false;
- String sessionId = request.getHeader(Headers.SESSION_ID);
- if (sessionId == null) {
- sessionId = Long.toString(this.sessionId.incrementAndGet()) + "-" +
- remoteHostAddressAndPort(request.getJDiscRequest()) + "#" +
- localHostname;
- sessionIdWasGeneratedJustNow = true;
- }
- return new Tuple2<>(sessionId, sessionIdWasGeneratedJustNow);
- }
-
- private static String remoteHostAddressAndPort(com.yahoo.jdisc.http.HttpRequest httpRequest) {
- SocketAddress remoteAddress = httpRequest.getRemoteAddress();
- if (remoteAddress instanceof InetSocketAddress) {
- InetSocketAddress isa = (InetSocketAddress) remoteAddress;
- return isa.getAddress().getHostAddress() + "-" + isa.getPort();
- }
- return "";
- }
-
- private static String resolveLocalHostname() {
- String hostname = HostName.getLocalhost();
- if (hostname.equals("localhost")) {
- return "";
- }
- return hostname;
- }
-
- /**
- * Exposed for use when creating mocks.
- */
- protected SourceSessionParams sourceSessionParams(HttpRequest request) {
- SourceSessionParams params = new SourceSessionParams();
- String timeout = request.getHeader(Headers.TIMEOUT);
-
- if (timeout != null) {
- try {
- params.setTimeout(Double.parseDouble(timeout));
- } catch (NumberFormatException e) {
- // NOP
- }
- }
- return params;
- }
-
@Override
protected void destroy() {
feedHandlerV3.destroy();
@@ -316,33 +155,5 @@ public class FeedHandler extends LoggingRequestHandler {
private void internalDestroy() {
super.destroy();
- workers.shutdown();
- cron.shutdown();
- synchronized (clients) {
- for (ClientState client : clients.values()) {
- client.sourceSession.getReference().close();
- }
- clients.clear();
- }
- }
-
- void putClient(final String sessionId, final ClientState value) {
- synchronized (clients) {
- clients.put(sessionId, value);
- }
}
-
- ClientState popClient(String sessionId) {
- synchronized (clients) {
- return clients.remove(sessionId);
- }
- }
-
- /**
- * Guess what, testing only.
- */
- void forceRunCleanClients() {
- new CleanClients().run();
- }
-
}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Feeder.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Feeder.java
deleted file mode 100644
index f7890db3b35..00000000000
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Feeder.java
+++ /dev/null
@@ -1,537 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.server;
-
-import com.yahoo.collections.Tuple2;
-import com.yahoo.container.jdisc.messagebus.SessionCache;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.document.DocumentRemove;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
-import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
-import com.yahoo.documentapi.metrics.DocumentOperationType;
-import com.yahoo.jdisc.Metric;
-import com.yahoo.jdisc.ReferencedResource;
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.Result;
-import com.yahoo.messagebus.SourceSessionParams;
-import com.yahoo.messagebus.routing.ErrorDirective;
-import com.yahoo.messagebus.routing.Hop;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.yolean.Exceptions;
-import com.yahoo.text.Utf8String;
-import com.yahoo.vespa.http.client.core.Encoder;
-import com.yahoo.vespa.http.client.core.ErrorCode;
-import com.yahoo.vespa.http.client.core.OperationStatus;
-import com.yahoo.vespa.http.server.util.ByteLimitedInputStream;
-import com.yahoo.vespaxmlparser.FeedReader;
-import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
-import com.yahoo.vespaxmlparser.VespaXMLFeedReader.Operation;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import static com.yahoo.messagebus.ErrorCode.SEND_QUEUE_FULL;
-
-/**
- * Read documents from client, and send them through message bus.
- *
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
- */
-public class Feeder implements Runnable {
-
- protected static final Logger log = Logger.getLogger(Feeder.class.getName());
-
- final InputStream requestInputStream;
- final DocumentTypeManager docTypeManager;
- final BlockingQueue<OperationStatus> operations;
- final BlockingQueue<OperationStatus> feedReplies;
- int numPending;
- final FeederSettings settings;
- final String clientId;
- final ReferencedResource<SharedSourceSession> sourceSession;
- final FeedHandler handler;
- final Metric metric;
- final Metric.Context metricContext;
- private long prevOpsPerSecTime; // previous measurement time of OPS
- private double operationsForOpsPerSec;
- private final ReplyHandler feedReplyHandler;
- protected final static String EOF = "End of stream";
- protected final boolean sessionIdWasGeneratedJustNow;
- private final CountDownLatch requestReceived = new CountDownLatch(1);
- private final FeedReaderFactory feedReaderFactory;
-
- public Feeder(InputStream requestInputStream,
- FeedReaderFactory feedReaderFactory,
- DocumentTypeManager docTypeManager,
- BlockingQueue<OperationStatus> operations,
- ClientState storedState,
- FeederSettings settings,
- String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams,
- SessionCache sessionCache,
- FeedHandler handler, Metric metric, ReplyHandler feedReplyHandler,
- String localHostname) throws Exception {
- super();
- this.feedReaderFactory = feedReaderFactory;
- if (storedState == null) {
- if (!sessionIdWasGeneratedJustNow) {
- // We do not have a stored state, BUT the session ID came in with the request.
- // Possible session timeout, server restart, server reconfig, or VIP usage. Examine.
- examineClientId(clientId, localHostname);
- }
- numPending = 0;
- feedReplies = new LinkedBlockingQueue<>();
- sourceSession = retainSession(sessionParams, sessionCache);
- metricContext = createClientMetricContext(metric, clientId);
- prevOpsPerSecTime = System.currentTimeMillis();
- operationsForOpsPerSec = 0.0;
- } else {
- //we have a stored state, and the session ID was obviously not generated now. All OK.
- numPending = storedState.pending;
- feedReplies = storedState.feedReplies;
- sourceSession = storedState.sourceSession;
- metricContext = storedState.metricContext;
- prevOpsPerSecTime = storedState.prevOpsPerSecTime;
- operationsForOpsPerSec = storedState.operationsForOpsPerSec;
- }
- this.clientId = clientId;
- this.sessionIdWasGeneratedJustNow = sessionIdWasGeneratedJustNow;
- this.requestInputStream = requestInputStream;
- this.docTypeManager = docTypeManager;
- this.operations = operations;
- this.settings = settings;
- this.handler = handler;
- this.metric = metric;
- this.feedReplyHandler = feedReplyHandler;
- }
- protected void examineClientId(String clientId, String localHostname) {
- if (!clientId.contains("#")) {
- throw new UnknownClientException("Got request from client with id '" + clientId +
- "', but found no session for this client. " +
- "This is expected during upgrades of gateways and infrastructure nodes.");
- }
- int hashPos = clientId.indexOf("#");
- String supposedHostname = clientId.substring(hashPos + 1, clientId.length());
- if (supposedHostname.isEmpty()) {
- throw new UnknownClientException("Got request from client with id '" + clientId +
- "', but found no session for this client. Possible session " +
- "timeout due to inactivity, server restart or reconfig, " +
- "or bad VIP usage. " +
- "This is expected during upgrades of gateways and infrastructure nodes.");
- }
-
- if (!supposedHostname.equals(localHostname)) {
- throw new UnknownClientException("Got request from client with id '" + clientId +
- "', but found no session for this client. " +
- "Session was originally established towards host " +
- supposedHostname + ", but our hostname is " +
- localHostname + ". " +
- "If using VIP rotation, this could be due to a session was rotated from one server to another. " +
- "Configure VIP with persistence=enabled. " +
- "This is expected during upgrades of gateways and infrastructure nodes.");
- }
- log.log(LogLevel.DEBUG, "Client '" + clientId + "' reconnected after session inactivity, or server restart " +
- "or reconfig. Re-establishing session.");
- }
-
-
-
- private static Metric.Context createClientMetricContext(Metric metric, String clientId) {
- // No real value in separate metric dimensions per client.
- return null;
- }
-
- /**
- * Exposed for creating mocks.
- */
- protected ReferencedResource<SharedSourceSession> retainSession(
- SourceSessionParams sessionParams, SessionCache sessionCache) {
- return sessionCache.retainSource(sessionParams);
- }
-
- @Override
- public void run() {
- try {
- if (handshake()) {
- return; //will putClient in finally block below
- }
- flushResponseQueue();
- feed();
- requestReceived.countDown();
- drain();
- } catch (InterruptedException e) {
- // NOP, just terminate
- } catch (Exception e) {
- log.log(LogLevel.WARNING, "Unhandled exception while feeding: "
- + Exceptions.toMessageString(e), e);
- } catch (Throwable e) {
- log.log(LogLevel.WARNING, "Unhandled error while feeding: "
- + Exceptions.toMessageString(e), e);
- throw e;
- } finally {
- requestReceived.countDown();
- putClient();
- try {
- enqueue("-", "-", ErrorCode.END_OF_FEED, false, null);
- } catch (InterruptedException e) {
- // NOP, we are already exiting the thread
- }
- }
- }
-
- protected boolean handshake() throws IOException {
- if (sessionIdWasGeneratedJustNow) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Handshake completed for client " + clientId + ".");
- }
- requestInputStream.close();
- return true;
- }
- return false;
- }
-
- void feed() throws InterruptedException {
- while (true) {
- Result result;
- String operationId;
- try {
- operationId = getNextOperationId();
- } catch (IOException ioe) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, Exceptions.toMessageString(ioe), ioe);
- }
- break;
- }
-
- //noinspection StringEquality
- if (operationId == EOF) {
- break;
- }
-
- Tuple2<String, Message> msg;
- try {
- msg = getNextMessage(operationId);
- setRoute(msg);
- } catch (Exception e) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, Exceptions.toMessageString(e), e);
- }
- //noinspection StringEquality
- if (operationId != null) { //v1 always returns null, all others return something useful, or throw an exception above
- msg = newErrorMessage(operationId, e);
- } else {
- break;
- }
- }
-
- if (msg == null) {
- break;
- }
-
- setMessageParameters(msg);
-
- while (true) {
- try {
- msg.second.pushHandler(feedReplyHandler);
- if (settings.denyIfBusy) {
- result = sourceSession.getResource().sendMessage(msg.second);
- } else {
- result = sourceSession.getResource().sendMessageBlocking(msg.second);
- }
- } catch (RuntimeException e) {
- enqueue(msg.first, Exceptions.toMessageString(e),
- ErrorCode.ERROR, false, msg.second);
- return;
- }
- if (result.isAccepted() || result.getError().getCode() != SEND_QUEUE_FULL) {
- break;
- }
- if (settings.denyIfBusy) {
- break;
- } else {
- //This will never happen
- Thread.sleep(100);
- }
- }
-
- if (result.isAccepted()) {
- ++numPending;
- updateMetrics(msg.second);
- updateOpsPerSec();
- log(LogLevel.DEBUG, "Sent message successfully, document id: ", msg.first);
- } else if (!result.getError().isFatal()) {
- enqueue(msg.first, result.getError().getMessage(), ErrorCode.TRANSIENT_ERROR, false, msg.second);
- break;
- } else {
- // should probably not happen, but everybody knows stuff that
- // shouldn't happen, happens all the time
- boolean isConditionNotMet = result.getError().getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED;
- enqueue(msg.first, result.getError().getMessage(), ErrorCode.ERROR, isConditionNotMet, msg.second);
- break;
- }
- }
- }
-
- private Tuple2<String, Message> newErrorMessage(String operationId, Exception e) {
- Message m = new FeedErrorMessage(operationId);
- Tuple2<String, Message> msg = new Tuple2<>(operationId, m);
- Hop hop = new Hop();
- hop.addDirective(new ErrorDirective(Exceptions.toMessageString(e)));
- Route route = new Route();
- route.addHop(hop);
- m.setRoute(route);
- return msg;
- }
-
- private void updateMetrics(Message m) {
- metric.set(
- MetricNames.PENDING,
- Double.valueOf(sourceSession.getResource().session().getPendingCount()),
- null);
-
- metric.add(MetricNames.NUM_OPERATIONS, 1, metricContext);
-
- if (m instanceof PutDocumentMessage) {
- metric.add(MetricNames.NUM_PUTS, 1, metricContext);
- } else if (m instanceof RemoveDocumentMessage) {
- metric.add(MetricNames.NUM_REMOVES, 1, metricContext);
- } else if (m instanceof UpdateDocumentMessage) {
- metric.add(MetricNames.NUM_UPDATES, 1, metricContext);
- }
- }
-
- private void updateOpsPerSec() {
- long now = System.currentTimeMillis();
- if ((now - prevOpsPerSecTime) >= 1000) { //every second
- double ms = (double) (now - prevOpsPerSecTime);
- final double opsPerSec = operationsForOpsPerSec / (ms / 1000);
- metric.set(MetricNames.OPERATIONS_PER_SEC, opsPerSec, metricContext);
- operationsForOpsPerSec = 1.0d;
- prevOpsPerSecTime = now;
- } else {
- operationsForOpsPerSec += 1.0d;
- }
- }
-
- private Tuple2<String, Message> getNextMessage(String operationId) throws Exception {
- VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation();
- Tuple2<String, Message> msg;
-
- getNextOperation(op);
-
- switch (op.getType()) {
- case DOCUMENT:
- msg = newPutMessage(op, operationId);
- break;
- case REMOVE:
- msg = newRemoveMessage(op, operationId);
- break;
- case UPDATE:
- msg = newUpdateMessage(op, operationId);
- break;
- default:
- // typical end of feed
- return null;
- }
- log(LogLevel.DEBUG, "Successfully deserialized document id: ", msg.first);
- return msg;
- }
-
- private void setMessageParameters(Tuple2<String, Message> msg) {
- msg.second.setContext(new ReplyContext(msg.first, feedReplies, DocumentOperationType.fromMessage(msg.second)));
- if (settings.traceLevel != null) {
- msg.second.getTrace().setLevel(settings.traceLevel);
- }
- if (settings.priority != null) {
- try {
- DocumentProtocol.Priority priority = DocumentProtocol.Priority.valueOf(settings.priority);
- if (msg.second instanceof DocumentMessage) {
- ((DocumentMessage) msg.second).setPriority(priority);
- }
- }
- catch (IllegalArgumentException i) {
- log.severe(i.getMessage());
- }
- }
- }
-
- private void setRoute(Tuple2<String, Message> msg) {
- if (settings.route != null) {
- msg.second.setRoute(settings.route);
- }
- }
-
- protected void getNextOperation(VespaXMLFeedReader.Operation op) throws Exception {
- int length = readByteLength();
-
- try (InputStream limitedInputStream = new ByteLimitedInputStream(requestInputStream, length)){
- FeedReader reader = feedReaderFactory.createReader(limitedInputStream, docTypeManager, settings.dataFormat);
- reader.read(op);
- }
- }
-
- protected String getNextOperationId() throws IOException {
- return readOperationId();
- }
-
- private String readOperationId() throws IOException {
- StringBuilder idBuf = new StringBuilder(100);
- int c;
- while ((c = requestInputStream.read()) != -1) {
- if (c == 32) {
- break;
- }
- idBuf.append((char) c); //it's ASCII
- }
- if (idBuf.length() == 0) {
- return EOF;
- }
- return Encoder.decode(idBuf.toString(), new StringBuilder(idBuf.length())).toString();
- }
-
- private int readByteLength() throws IOException {
- StringBuilder lenBuf = new StringBuilder(8);
- int c;
- while ((c = requestInputStream.read()) != -1) {
- if (c == 10) {
- break;
- }
- lenBuf.append((char) c); //it's ASCII
- }
- if (lenBuf.length() == 0) {
- throw new IllegalStateException("Operation length missing.");
- }
- return Integer.valueOf(lenBuf.toString(), 16);
- }
-
- protected final void log(LogLevel level, Object... msgParts) {
- StringBuilder s;
-
- if (!log.isLoggable(level)) {
- return;
- }
-
- s = new StringBuilder();
- for (Object part : msgParts) {
- s.append(part.toString());
- }
-
- log.log(level, s.toString());
- }
-
- private Tuple2<String, Message> newUpdateMessage(Operation op, String operationId) {
- DocumentUpdate update = op.getDocumentUpdate();
- update.setCondition(op.getCondition());
- Message msg = new UpdateDocumentMessage(update);
-
- String id = (operationId == null) ? update.getId().toString() : operationId;
- return new Tuple2<>(id, msg);
- }
-
- private Tuple2<String, Message> newRemoveMessage(Operation op, String operationId) {
- DocumentRemove remove = new DocumentRemove(op.getRemove());
- remove.setCondition(op.getCondition());
- Message msg = new RemoveDocumentMessage(remove);
-
- String id = (operationId == null) ? remove.getId().toString() : operationId;
- return new Tuple2<>(id, msg);
- }
-
- private Tuple2<String, Message> newPutMessage(Operation op, String operationId) {
- DocumentPut put = new DocumentPut(op.getDocument());
- put.setCondition(op.getCondition());
- Message msg = new PutDocumentMessage(put);
-
- String id = (operationId == null) ? put.getId().toString() : operationId;
- return new Tuple2<>(id, msg);
- }
-
-
- void flushResponseQueue() throws InterruptedException {
- OperationStatus status = feedReplies.poll();
- while (status != null) {
- decreasePending(status);
- status = feedReplies.poll();
- }
- }
-
- void putClient() {
- handler.putClient(clientId,
- new ClientState(numPending,
- feedReplies, sourceSession, metricContext,
- prevOpsPerSecTime, operationsForOpsPerSec));
- }
-
- void drain() throws InterruptedException {
- if (settings.drain) {
- while (numPending > 0) {
- OperationStatus o = feedReplies.take();
- decreasePending(o);
- }
- }
- }
-
- private void decreasePending(OperationStatus o) throws InterruptedException {
- operations.put(o);
- --numPending;
- }
-
- private void enqueue(String id, String message, ErrorCode code, boolean isConditionalNotMet, Message msg)
- throws InterruptedException {
- String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0
- ? msg.getTrace().toString()
- : "";
- operations.put(new OperationStatus(message, id, code, isConditionalNotMet, traceMessage));
- }
-
- public void waitForRequestReceived() throws InterruptedException {
- requestReceived.await(1, TimeUnit.HOURS);
- }
-
- public class FeedErrorMessage extends Message {
- private long sequenceId;
-
- private FeedErrorMessage(String operationId) {
- try {
- DocumentId id = new DocumentId(operationId);
- sequenceId = Arrays.hashCode(id.getGlobalId());
- } catch (Exception e) {
- sequenceId = 0;
- }
- }
-
- @Override
- public Utf8String getProtocol() {
- return new Utf8String("vespa-feed-handler-internal-bogus-protocol");
- }
-
- @Override
- public int getType() {
- return 1234;
- }
-
- @Override
- public boolean hasSequenceId() {
- return true;
- }
-
- @Override
- public long getSequenceId() {
- return sequenceId;
- }
- }
-
-}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/FeedHandlerTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/FeedHandlerTest.java
deleted file mode 100644
index c0cc907c671..00000000000
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/FeedHandlerTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.feedhandler;
-
-import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.jdisc.HeaderFields;
-import com.yahoo.jdisc.Metric;
-import com.yahoo.container.logging.AccessLog;
-import com.yahoo.metrics.simple.MetricReceiver;
-import com.yahoo.vespa.http.client.core.Headers;
-import com.yahoo.vespa.http.client.core.OperationStatus;
-import com.yahoo.vespa.http.server.FeedHandler;
-import com.yahoo.vespa.http.server.Feeder;
-import org.junit.Test;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Unit test for FeedHandler class.
- *
- * @author dybis
- */
-public class FeedHandlerTest {
-
- /**
- * This class extends FeedHandler and allows to create a custom Feeder.
- */
- static class TestFeedHandler extends FeedHandler {
- private final CountDownLatch countDownLatch = new CountDownLatch(1);
-
- public TestFeedHandler() throws Exception {
- super(new FeedHandler.Context(Executors.newCachedThreadPool(),
- mock(AccessLog.class),
- mock(Metric.class)),
- null, null, null, MetricReceiver.nullImplementation);
- }
-
- /**
- * Builds a feeder that blocks until countDownLatch is stepped down.
- */
- @Override
- protected Feeder createFeeder(
- com.yahoo.container.jdisc.HttpRequest request,
- InputStream requestInputStream,
- final BlockingQueue<OperationStatus> operations,
- String clientId,
- boolean sessionIdWasGeneratedJustNow,
- int protocolVersion) throws Exception {
- Feeder feeder = mock(Feeder.class);
- doAnswer(invocation -> {
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- return null;
- }).when(feeder).waitForRequestReceived();
- return feeder;
- }
- }
-
- /**
- * nginx require that a post is finished before the server ack with a response. This behaviour is verified
- * in this test
- */
- @Test
- public void testResponseIsSentAfterWaitForRequestReceivedReturns() throws Exception {
- HttpRequest request = mock(HttpRequest.class);
-
- // Create a request with valid version.
- com.yahoo.jdisc.http.HttpRequest jdiscRequest = mock(com.yahoo.jdisc.http.HttpRequest.class);
- HeaderFields headerFields = mock(HeaderFields.class);
- List<String> version = new ArrayList<>();
- version.add("2");
- when(headerFields.get(Headers.VERSION)).thenReturn(version);
- when(jdiscRequest.headers()).thenReturn(headerFields);
- when(request.getJDiscRequest()).thenReturn(jdiscRequest);
-
- TestFeedHandler feedHandler = new TestFeedHandler();
- // After a short period, make the feed finish.
- new Thread(() -> {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- feedHandler.countDownLatch.countDown();
- }).start();
- // This should not return before countdown latch is stepped down.
- feedHandler.handle(request);
- // This should always returns after the countDownLatch has become zero. This might cause false positive,
- // but not false negatives. This is fine.
- assertThat(feedHandler.countDownLatch.getCount(), is(0L));
-
- }
-
-}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ErrorsInResultTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ErrorsInResultTestCase.java
deleted file mode 100644
index 47745b29032..00000000000
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ErrorsInResultTestCase.java
+++ /dev/null
@@ -1,240 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.server;
-
-import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.HttpResponse;
-import com.yahoo.container.jdisc.messagebus.SessionCache;
-import com.yahoo.container.logging.AccessLog;
-import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.jdisc.ReferencedResource;
-import com.yahoo.jdisc.References;
-import com.yahoo.jdisc.http.HttpRequest.Method;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.metrics.simple.MetricReceiver;
-import com.yahoo.text.Utf8;
-import com.yahoo.text.Utf8String;
-import com.yahoo.vespa.http.client.core.Headers;
-import com.yahoo.vespa.http.client.core.OperationStatus;
-import com.yahoo.vespaxmlparser.MockFeedReaderFactory;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Check FeedHandler APIs.
- *
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
- */
-public class V2ErrorsInResultTestCase {
-
- LessConfiguredHandler handler;
- ExecutorService workers;
-
- @Before
- public void setUp() throws Exception {
- workers = Executors.newCachedThreadPool();
- handler = new LessConfiguredHandler(workers);
- }
-
- @After
- public void tearDown() throws Exception {
- handler.destroy();
- workers.shutdown();
- }
-
- private static class LessConfiguredHandler extends FeedHandler {
-
- public LessConfiguredHandler(Executor executor) throws Exception {
- super(new FeedHandler.Context(executor,
- AccessLog.voidAccessLog(),
- new DummyMetric()),
- null, null, null, MetricReceiver.nullImplementation);
- }
-
-
- @Override
- protected Feeder createFeeder(HttpRequest request, InputStream requestInputStream,
- BlockingQueue<OperationStatus> operations, String clientId,
- boolean sessionIdWasGeneratedJustNow, int protocolVersion)
- throws Exception {
- return new LessConfiguredFeeder(requestInputStream, operations,
- popClient(clientId), new FeederSettings(request), clientId, sessionIdWasGeneratedJustNow,
- sourceSessionParams(request), null, this, this.feedReplyHandler, "");
- }
-
- @Override
- protected DocumentTypeManager createDocumentManager(
- DocumentmanagerConfig documentManagerConfig) {
- return null;
- }
- }
-
- private static class MockSharedSession extends SharedSourceSession {
- int count;
-
- public MockSharedSession(SourceSessionParams params) {
- super(new SharedMessageBus(new MessageBus(new MockNetwork(),
- new MessageBusParams())), params);
- count = 0;
- }
-
- @Override
- public Result sendMessageBlocking(Message msg) throws InterruptedException {
- return sendMessage(msg);
- }
-
- @Override
- public Result sendMessage(Message msg) {
- Result r;
- ReplyHandler handler = msg.popHandler();
-
- switch (count++) {
- case 0:
- r = new Result(ErrorCode.FATAL_ERROR,
- "boom");
- break;
- case 1:
- r = new Result(ErrorCode.TRANSIENT_ERROR,
- "transient boom");
- break;
- case 2:
- final FailedReply reply = new FailedReply(msg.getContext());
- reply.addError(new Error(
- ErrorCode.FATAL_ERROR,
- "bad mojo, dude"));
- handler.handleReply(reply);
- r = Result.ACCEPTED;
- break;
- default:
- handler.handleReply(new MockReply(msg.getContext()));
- r = Result.ACCEPTED;
- }
- return r;
- }
-
- }
-
- private static class FailedReply extends Reply {
- Object context;
-
- public FailedReply(Object context) {
- this.context = context;
- }
-
- @Override
- public Utf8String getProtocol() {
- return null;
- }
-
- @Override
- public int getType() {
- return 0;
- }
-
- @Override
- public Object getContext() {
- return context;
- }
- }
-
- private static class LessConfiguredFeeder extends Feeder {
-
- public LessConfiguredFeeder(InputStream stream,
- BlockingQueue<OperationStatus> operations,
- ClientState storedState, FeederSettings settings,
- String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams,
- SessionCache sessionCache, FeedHandler handler, ReplyHandler feedReplyHandler,
- String localHostname) throws Exception {
- super(stream, new MockFeedReaderFactory(), null, operations, storedState, settings, clientId, sessionIdWasGeneratedJustNow,
- sessionParams, sessionCache, handler, new DummyMetric(), feedReplyHandler, localHostname);
- }
-
- protected ReferencedResource<SharedSourceSession> retainSession(
- SourceSessionParams sessionParams, SessionCache sessionCache) {
- final SharedSourceSession session = new MockSharedSession(sessionParams);
- return new ReferencedResource<>(session, References.fromResource(session));
- }
- }
-
- @Test
- public final void test() throws IOException {
- String sessionId;
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- HttpResponse r = handler.handle(nalle);
- sessionId = r.headers().getFirst(Headers.SESSION_ID);
- r.render(out);
- assertEquals("",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 ERROR boom \n",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 TRANSIENT_ERROR transient{20}boom \n",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 ERROR bad{20}mojo,{20}dude \n",
- Utf8.toString(out.toByteArray()));
- }
-
- }
-
-}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ExternalFeedTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ExternalFeedTestCase.java
deleted file mode 100644
index 9960d98f7f1..00000000000
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ExternalFeedTestCase.java
+++ /dev/null
@@ -1,530 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.server;
-
-import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.HttpResponse;
-import com.yahoo.container.jdisc.messagebus.SessionCache;
-import com.yahoo.container.logging.AccessLog;
-import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
-import com.yahoo.jdisc.ReferencedResource;
-import com.yahoo.jdisc.References;
-import com.yahoo.jdisc.http.HttpRequest.Method;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.Result;
-import com.yahoo.messagebus.SourceSessionParams;
-import com.yahoo.messagebus.network.Network;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.metrics.simple.MetricReceiver;
-import com.yahoo.text.Utf8;
-import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
-import com.yahoo.vespa.http.client.core.Headers;
-import com.yahoo.vespa.http.client.core.OperationStatus;
-import com.yahoo.vespaxmlparser.MockFeedReaderFactory;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.LogRecord;
-import java.util.logging.Logger;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Check FeedHandler APIs.
- *
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
- */
-public class V2ExternalFeedTestCase {
-
- LessConfiguredHandler handler;
- ExecutorService workers;
- Level logLevel;
- Logger logger;
- boolean initUseParentHandlers;
- LogBuffer logChecker;
-
- @Before
- public void setUp() throws Exception {
- workers = Executors.newCachedThreadPool();
- handler = new LessConfiguredHandler(workers);
- logger = Logger.getLogger(Feeder.class.getName());
- logLevel = logger.getLevel();
- logger.setLevel(Level.ALL);
- initUseParentHandlers = logger.getUseParentHandlers();
- logChecker = new LogBuffer();
- logger.setUseParentHandlers(false);
- logger.addHandler(logChecker);
- }
-
- @After
- public void tearDown() throws Exception {
- handler.destroy();
- workers.shutdown();
- logger.setLevel(logLevel);
- logger.removeHandler(logChecker);
- logger.setUseParentHandlers(initUseParentHandlers);
- }
-
- private static class LogBuffer extends Handler {
- public final BlockingQueue<LogRecord> records = new LinkedBlockingQueue<>();
-
- @Override
- public void publish(LogRecord record) {
- try {
- records.put(record);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void flush() {
- }
-
- @Override
- public void close() throws SecurityException {
- }
- }
-
- private static class LessConfiguredHandler extends FeedHandler {
- volatile DataFormat lastFormatSeen;
-
- public LessConfiguredHandler(Executor executor) throws Exception {
- super(new FeedHandler.Context(executor,
- AccessLog.voidAccessLog(),
- new DummyMetric()),
- null, null, null, MetricReceiver.nullImplementation);
- }
-
- @Override
- protected Feeder createFeeder(HttpRequest request,
- InputStream requestInputStream,
- BlockingQueue<OperationStatus> operations,
- String clientId,
- boolean sessionIdWasGeneratedJustNow, int protocolVersion)
- throws Exception {
- LessConfiguredFeeder f = new LessConfiguredFeeder(requestInputStream, operations,
- popClient(clientId), new FeederSettings(request), clientId, sessionIdWasGeneratedJustNow,
- sourceSessionParams(request), null, this, this.feedReplyHandler, "ourHostname");
- lastFormatSeen = f.settings.dataFormat;
- return f;
- }
-
- @Override
- protected DocumentTypeManager createDocumentManager(
- DocumentmanagerConfig documentManagerConfig) {
- return null;
- }
- }
-
- private static class MockSharedSession extends SharedSourceSession {
-
- public MockSharedSession(SourceSessionParams params) {
- super(new SharedMessageBus(new MessageBus(new MockNetwork(),
- new MessageBusParams())), params);
- }
-
- @Override
- public Result sendMessageBlocking(Message msg) throws InterruptedException {
- return sendMessage(msg);
- }
-
- @Override
- public Result sendMessage(Message msg) {
- ReplyHandler handler = msg.popHandler();
- MockReply mockReply = new MockReply(msg.getContext());
- if (msg instanceof Feeder.FeedErrorMessage) {
- mockReply.addError(new com.yahoo.messagebus.Error(123, "Could not feed this"));
- }
- if (msg instanceof PutDocumentMessage) {
- assert(msg.getTrace().getLevel() == 4);
- assert(((PutDocumentMessage) msg).getPriority().name().equals("LOWEST"));
- }
- handler.handleReply(mockReply);
- return Result.ACCEPTED;
- }
-
- }
-
- private static class LessConfiguredFeeder extends Feeder {
- public LessConfiguredFeeder(InputStream stream,
- BlockingQueue<OperationStatus> operations,
- ClientState storedState, FeederSettings settings,
- String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams,
- SessionCache sessionCache, FeedHandler handler, ReplyHandler feedReplyHandler,
- String localHostname) throws Exception {
- super(stream, new MockFeedReaderFactory(), null, operations, storedState, settings, clientId, sessionIdWasGeneratedJustNow,
- sessionParams, sessionCache, handler, new DummyMetric(), feedReplyHandler, localHostname);
- }
-
- protected ReferencedResource<SharedSourceSession> retainSession(
- SourceSessionParams sessionParams, SessionCache sessionCache) {
- final SharedSourceSession session = new MockSharedSession(sessionParams);
- return new ReferencedResource<>(session, References.fromResource(session));
- }
- }
-
- @Test
- public final void test() throws IOException, InterruptedException {
- String sessionId;
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- HttpResponse r = handler.handle(nalle);
- sessionId = r.headers().getFirst(Headers.SESSION_ID);
- r.render(out);
- assertEquals("",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[]{1, 3, 2});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST");
- nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n",
- Utf8.toString(out.toByteArray()));
- assertEquals("text/plain", r.getContentType());
- assertEquals(StandardCharsets.US_ASCII.name(), r.getCharacterEncoding());
- assertEquals(7, logChecker.records.size());
- String actualHandshake = logChecker.records.take().getMessage();
- assertThat(actualHandshake, actualHandshake.matches("Handshake completed for client (-?)(.+?)-#(.*?)\\."), is(true));
- assertEquals("Successfully deserialized document id: id:banana:banana::doc1",
- logChecker.records.take().getMessage());
- assertEquals("Sent message successfully, document id: id:banana:banana::doc1",
- logChecker.records.take().getMessage());
- }
-
- //test session ID without #, i.e. something fishy related to VIPs is going on
- sessionId = "something";
-
- {
- InputStream in = new MetaStream(new byte[]{1, 3, 2});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST");
- nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4");
- nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "2");
-
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- String expectedErrorMsg = "Got request from client with id 'something', but found no session for this client.";
- assertThat(Utf8.toString(out.toByteArray()), containsString(expectedErrorMsg));
- assertEquals("text/plain", r.getContentType());
- assertEquals(StandardCharsets.UTF_8.name(), r.getCharacterEncoding());
- }
-
- //test session ID with trailing # but no hostname
- sessionId = "something#";
-
- {
- InputStream in = new MetaStream(new byte[]{1, 3, 2});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST");
- nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- String expectedErrorMsg = "Got request from client with id 'something#', but found no session for this client.";
- assertThat(Utf8.toString(out.toByteArray()), containsString(expectedErrorMsg));
- assertEquals("text/plain", r.getContentType());
- assertEquals(StandardCharsets.UTF_8.name(), r.getCharacterEncoding());
- }
-
- //test session ID with trailing # and some unknown hostname at the end
- sessionId = "something#thisHostnameDoesNotExistAnywhere";
-
- {
- InputStream in = new MetaStream(new byte[]{1, 3, 2});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST");
- nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- String expectedErrorMsg = "Got request from client with id 'something#thisHostnameDoesNotExistAnywhere', " +
- "but found no session for this client. Session was originally established " +
- "towards host thisHostnameDoesNotExistAnywhere, but our hostname is " +
- "ourHostname.";
- assertThat(Utf8.toString(out.toByteArray()), containsString(expectedErrorMsg));
- assertEquals("text/plain", r.getContentType());
- assertEquals(StandardCharsets.UTF_8.name(), r.getCharacterEncoding());
- }
-
- //test session ID with trailing # and some unknown hostname at the end
- sessionId = "something#ourHostname";
-
- {
- InputStream in = new MetaStream(new byte[]{1, 3, 2});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST");
- nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n" +
- "id:banana:banana::doc1 OK Document{20}processed. \n" +
- "id:banana:banana::doc1 OK Document{20}processed. \n",
- Utf8.toString(out.toByteArray()));
- assertEquals("text/plain", r.getContentType());
- assertEquals(StandardCharsets.US_ASCII.name(), r.getCharacterEncoding());
- Thread.sleep(1000);
- }
- }
-
- @Test
- public final void testFailedReading() throws IOException {
- String sessionId;
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- HttpResponse r = handler.handle(nalle);
- sessionId = r.headers().getFirst(Headers.SESSION_ID);
- r.render(out);
- assertEquals("",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[] { 4 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 ERROR Could{20}not{20}feed{20}this \n",
- Utf8.toString(out.toByteArray()));
- }
- }
-
- @Test
- public final void testCleaningDoesNotBlowUp() throws IOException {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("",
- Utf8.toString(out.toByteArray()));
- handler.forceRunCleanClients();
- }
-
- @Test
- public final void testMockNetworkDoesNotBlowUp() {
- Network n = new MockNetwork();
- n.registerSession(null);
- n.unregisterSession(null);
- assertTrue(n.allocServiceAddress(null));
- n.freeServiceAddress(null);
- n.send(null, null);
- assertNull(n.getConnectionSpec());
- assertNull(n.getMirror());
- }
-
- @Test
- public final void testMockReplyDoesNotBlowUp() {
- MockReply r = new MockReply(null);
- assertNull(r.getProtocol());
- assertEquals(0, r.getType());
- assertFalse(r.hasFatalErrors());
- }
-
- @Test
- public final void testFlush() throws IOException {
- String sessionId;
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- HttpResponse r = handler.handle(nalle);
- sessionId = r.headers().getFirst(Headers.SESSION_ID);
- r.render(out);
- assertEquals("",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[] { 1, 1, 1, 1, 1, 1, 1});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST");
- nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n",
- Utf8.toString(out.toByteArray()));
- }
- }
-
- @Test
- public final void testIllegalVersion() throws IOException {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers()
- .add(Headers.VERSION, Integer.toString(Integer.MAX_VALUE));
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals(Headers.HTTP_NOT_ACCEPTABLE, r.getStatus());
- }
-
- @Test
- public final void testSettings() {
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST);
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- nalle.getJDiscRequest().headers().add(Headers.ROUTE, "bamse brakar");
- nalle.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "true");
- FeederSettings settings = new FeederSettings(nalle);
- assertEquals(false, settings.drain);
- assertEquals(2, settings.route.getNumHops());
- assertEquals(true, settings.denyIfBusy);
- }
-
- @Test
- public final void testJsonInputFormat() throws IOException, InterruptedException {
- String sessionId;
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- HttpResponse r = handler.handle(nalle);
- sessionId = r.headers().getFirst(Headers.SESSION_ID);
- r.render(out);
- assertEquals("",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[]{1, 3, 2});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.DATA_FORMAT, DataFormat.JSON_UTF8.name());
- nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST");
- nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n"
- + "id:banana:banana::doc1 OK Document{20}processed. \n",
- Utf8.toString(out.toByteArray()));
- assertEquals("text/plain", r.getContentType());
- assertEquals(StandardCharsets.US_ASCII.name(), r.getCharacterEncoding());
- assertEquals(7, logChecker.records.size());
- String actualHandshake = logChecker.records.take().getMessage();
- assertThat(actualHandshake, actualHandshake.matches("Handshake completed for client (-?)(.+?)-#(.*?)\\."), is(true));
- assertEquals("Successfully deserialized document id: id:banana:banana::doc1",
- logChecker.records.take().getMessage());
- assertEquals("Sent message successfully, document id: id:banana:banana::doc1",
- logChecker.records.take().getMessage());
- assertSame(DataFormat.JSON_UTF8, handler.lastFormatSeen);
- }
- }
-
-}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2FailingMessagebusTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2FailingMessagebusTestCase.java
deleted file mode 100644
index 6290c22f694..00000000000
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2FailingMessagebusTestCase.java
+++ /dev/null
@@ -1,226 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.server;
-
-import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.HttpResponse;
-import com.yahoo.container.jdisc.messagebus.SessionCache;
-import com.yahoo.container.logging.AccessLog;
-import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.jdisc.ReferencedResource;
-import com.yahoo.jdisc.References;
-import com.yahoo.jdisc.http.HttpRequest.Method;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.metrics.simple.MetricReceiver;
-import com.yahoo.text.Utf8;
-import com.yahoo.vespa.http.client.core.Headers;
-import com.yahoo.vespa.http.client.core.OperationStatus;
-import com.yahoo.vespaxmlparser.MockFeedReaderFactory;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Check FeedHandler APIs.
- *
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
- */
-public class V2FailingMessagebusTestCase {
-
- LessConfiguredHandler handler;
- ExecutorService workers;
- int mbus;
-
- @Before
- public void setUp() throws Exception {
- workers = Executors.newCachedThreadPool();
- handler = new LessConfiguredHandler(workers);
- mbus = 0;
- }
-
- @After
- public void tearDown() throws Exception {
- handler.destroy();
- workers.shutdown();
- mbus = 0;
- }
-
- private class LessConfiguredHandler extends FeedHandler {
-
- public LessConfiguredHandler(Executor executor) throws Exception {
- super(new FeedHandler.Context(executor, AccessLog.voidAccessLog(), new DummyMetric()),
- null, null, null, MetricReceiver.nullImplementation);
- }
-
- @Override
- protected Feeder createFeeder(HttpRequest request,
- InputStream requestInputStream,
- BlockingQueue<OperationStatus> operations,
- String clientId,
- boolean sessionIdWasGeneratedJustNow, int protocolVersion) throws Exception {
- return new LessConfiguredFeeder(requestInputStream, operations,
- popClient(clientId), new FeederSettings(request), clientId, sessionIdWasGeneratedJustNow,
- sourceSessionParams(request), null, this, this.feedReplyHandler, "");
- }
-
- @Override
- protected DocumentTypeManager createDocumentManager(
- DocumentmanagerConfig documentManagerConfig) {
- return null;
- }
- }
-
- private class MockSharedSession extends SharedSourceSession {
-
- public MockSharedSession(SourceSessionParams params) {
- super(new SharedMessageBus(new MessageBus(new MockNetwork(),
- new MessageBusParams())), params);
- }
-
- @Override
- public Result sendMessageBlocking(Message msg) throws InterruptedException {
- return sendMessage(msg);
- }
-
- @Override
- public Result sendMessage(Message msg) {
- ReplyHandler handler = msg.popHandler();
-
- switch (mbus) {
- case 0:
- throw new RuntimeException("boom");
- case 1:
- Result r = new Result(ErrorCode.SEND_QUEUE_FULL, "tralala");
- mbus = 2;
- return r;
- case 2:
- handler.handleReply(new MockReply(msg.getContext()));
- return Result.ACCEPTED;
- default:
- throw new IllegalStateException("WTF?!");
- }
- }
- }
-
- private class LessConfiguredFeeder extends Feeder {
-
- public LessConfiguredFeeder(InputStream inputStream,
- BlockingQueue<OperationStatus> operations,
- ClientState storedState, FeederSettings settings,
- String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams,
- SessionCache sessionCache, FeedHandler handler, ReplyHandler feedReplyHandler,
- String localHostname) throws Exception {
- super(inputStream, new MockFeedReaderFactory(), null, operations, storedState, settings, clientId, sessionIdWasGeneratedJustNow,
- sessionParams, sessionCache, handler, new DummyMetric(), feedReplyHandler, localHostname);
- }
-
- protected ReferencedResource<SharedSourceSession> retainSession(
- SourceSessionParams sessionParams, SessionCache sessionCache) {
- final SharedSourceSession session = new MockSharedSession(sessionParams);
- return new ReferencedResource<>(session, References.fromResource(session));
- }
- }
-
- @Test
- public final void testFailingMbus() throws IOException {
- String sessionId;
- {
- InputStream in = new MetaStream(new byte[]{1});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- HttpResponse r = handler.handle(nalle);
- sessionId = r.headers().getFirst(Headers.SESSION_ID);
- r.render(out);
- assertEquals("",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[]{1});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 ERROR boom \n",
- Utf8.toString(out.toByteArray()));
- }
- }
-
- @Test
- public final void testBusyMbus() throws IOException {
- String sessionId;
- {
- InputStream in = new MetaStream(new byte[]{1});
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- mbus = 2;
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- HttpResponse r = handler.handle(nalle);
- sessionId = r.headers().getFirst(Headers.SESSION_ID);
- r.render(out);
- assertEquals("",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- mbus = 1;
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- nalle.getJDiscRequest().headers()
- .add(Headers.DENY_IF_BUSY, "false");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- mbus = 1;
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- nalle.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "true");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- assertEquals("id:banana:banana::doc1 TRANSIENT_ERROR tralala \n",
- Utf8.toString(out.toByteArray()));
- }
- }
-
-}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2NoXmlReaderTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2NoXmlReaderTestCase.java
deleted file mode 100644
index 633477dcc79..00000000000
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2NoXmlReaderTestCase.java
+++ /dev/null
@@ -1,164 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.server;
-
-import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.HttpResponse;
-import com.yahoo.container.jdisc.messagebus.SessionCache;
-import com.yahoo.container.logging.AccessLog;
-import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.jdisc.ReferencedResource;
-import com.yahoo.jdisc.References;
-import com.yahoo.jdisc.http.HttpRequest.Method;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.metrics.simple.MetricReceiver;
-import com.yahoo.text.Utf8;
-import com.yahoo.vespa.http.client.core.Headers;
-import com.yahoo.vespa.http.client.core.OperationStatus;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Check FeedHandler APIs.
- *
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
- */
-public class V2NoXmlReaderTestCase {
-
- LessConfiguredHandler handler;
- ExecutorService workers;
-
- @Before
- public void setUp() throws Exception {
- workers = Executors.newCachedThreadPool();
- handler = new LessConfiguredHandler(workers);
- }
-
- @After
- public void tearDown() throws Exception {
- handler.destroy();
- workers.shutdown();
- }
-
- private static class LessConfiguredHandler extends FeedHandler {
-
- public LessConfiguredHandler(Executor executor) throws Exception {
- super(new FeedHandler.Context(executor, AccessLog.voidAccessLog(), new DummyMetric()),
- null, null, null, MetricReceiver.nullImplementation);
- }
-
-
- @Override
- protected Feeder createFeeder(HttpRequest request, InputStream requestInputStream,
- BlockingQueue<OperationStatus> operations, String clientId,
- boolean sessionIdWasGeneratedJustNow, int protocolVersion)
- throws Exception {
- return new LessConfiguredFeeder(requestInputStream, operations,
- popClient(clientId), new FeederSettings(request), clientId, sessionIdWasGeneratedJustNow,
- sourceSessionParams(request), null, this, this.feedReplyHandler, "");
- }
-
- @Override
- protected DocumentTypeManager createDocumentManager(
- DocumentmanagerConfig documentManagerConfig) {
- return null;
- }
- }
-
- private static class MockSharedSession extends SharedSourceSession {
-
- public MockSharedSession(SourceSessionParams params) {
- super(new SharedMessageBus(new MessageBus(new MockNetwork(),
- new MessageBusParams())), params);
- }
-
- @Override
- public Result sendMessageBlocking(Message msg) throws InterruptedException {
- return sendMessage(msg);
- }
-
- @Override
- public Result sendMessage(Message msg) {
- ReplyHandler handler = msg.popHandler();
- MockReply mockReply = new MockReply(msg.getContext());
- if (msg instanceof Feeder.FeedErrorMessage) {
- mockReply.addError(new Error(123, "Could not feed this"));
- }
- handler.handleReply(mockReply);
- return Result.ACCEPTED;
- }
-
- }
-
- private static class LessConfiguredFeeder extends Feeder {
-
- public LessConfiguredFeeder(InputStream inputStream,
- BlockingQueue<OperationStatus> operations,
- ClientState storedState, FeederSettings settings,
- String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams,
- SessionCache sessionCache, FeedHandler handler, ReplyHandler feedReplyHandler,
- String localHostname) throws Exception {
- super(inputStream, null, null, operations, storedState, settings, clientId, sessionIdWasGeneratedJustNow,
- sessionParams, sessionCache, handler, new DummyMetric(), feedReplyHandler, localHostname);
- }
-
- protected ReferencedResource<SharedSourceSession> retainSession(
- SourceSessionParams sessionParams, SessionCache sessionCache) {
- final SharedSourceSession session = new MockSharedSession(sessionParams);
- return new ReferencedResource<>(session, References.fromResource(session));
- }
- }
-
- @Test
- public final void test() throws IOException {
- String sessionId;
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest
- .createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false");
- HttpResponse r = handler.handle(nalle);
- sessionId = r.headers().getFirst(Headers.SESSION_ID);
- r.render(out);
- assertEquals("",
- Utf8.toString(out.toByteArray()));
- }
- {
- InputStream in = new MetaStream(new byte[] { 1 });
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HttpRequest nalle = HttpRequest.createTestRequest(
- "http://test4-steinar:19020/reserved-for-internal-use/feedapi",
- Method.POST, in);
- nalle.getJDiscRequest().headers().add(Headers.VERSION, "2");
- nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId);
- nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true");
- HttpResponse r = handler.handle(nalle);
- r.render(out);
- //This is different from v1. If we cannot parse XML, we will still get response code 200, but with a sensible
- //error message in the response.
- assertEquals(200, r.getStatus());
- assertEquals("id:banana:banana::doc1 ERROR Could{20}not{20}feed{20}this \n",
- Utf8.toString(out.toByteArray()));
- }
- }
-
-}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java
index d6f605b0379..128664dda9e 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java
@@ -23,13 +23,14 @@ public class VersionsTestCase {
private static final List<String> EMPTY = Collections.emptyList();
private static final List<String> ONE_TWO = Arrays.asList("1", "2");
+ private static final List<String> ONE_THREE = Arrays.asList("1", "3");
private static final List<String> TWO_THREE = Arrays.asList("3", "2");
- private static final List<String> ONE_NULL_TWO = Arrays.asList("1", null, "2");
- private static final List<String> ONE_COMMA_TWO = Collections.singletonList("1, 2");
- private static final List<String> ONE_EMPTY_TWO = Arrays.asList("1", "", "2");
+ private static final List<String> ONE_NULL_THREE = Arrays.asList("1", null, "3");
+ private static final List<String> ONE_COMMA_THREE = Collections.singletonList("1, 3");
+ private static final List<String> ONE_EMPTY_THREE = Arrays.asList("1", "", "3");
private static final List<String> TOO_LARGE_NUMBER = Collections.singletonList("1000000000");
- private static final List<String> TWO_TOO_LARGE_NUMBER = Arrays.asList("2", "1000000000");
- private static final List<String> TWO_COMMA_TOO_LARGE_NUMBER = Arrays.asList("2,1000000000");
+ private static final List<String> THREE_TOO_LARGE_NUMBER = Arrays.asList("3", "1000000000");
+ private static final List<String> THREE_COMMA_TOO_LARGE_NUMBER = Arrays.asList("3,1000000000");
private static final List<String> GARBAGE = Collections.singletonList("garbage");
@Test
@@ -42,8 +43,15 @@ public class VersionsTestCase {
@Test
public void testOneTwo() throws Exception {
Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_TWO);
+ assertThat(v.first, instanceOf(ErrorHttpResponse.class));
+ assertThat(v.second, is(-1));
+ }
+
+ @Test
+ public void testOneThree() throws Exception {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_THREE);
assertThat(v.first, nullValue());
- assertThat(v.second, is(2));
+ assertThat(v.second, is(3));
}
@Test
@@ -54,24 +62,24 @@ public class VersionsTestCase {
}
@Test
- public void testOneNullTwo() throws Exception {
- Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_NULL_TWO);
+ public void testOneNullThree() throws Exception {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_NULL_THREE);
assertThat(v.first, nullValue());
- assertThat(v.second, is(2));
+ assertThat(v.second, is(3));
}
@Test
- public void testOneCommaTwo() throws Exception {
- Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_COMMA_TWO);
+ public void testOneCommaThree() throws Exception {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_COMMA_THREE);
assertThat(v.first, nullValue());
- assertThat(v.second, is(2));
+ assertThat(v.second, is(3));
}
@Test
- public void testOneEmptyTwo() throws Exception {
- Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_EMPTY_TWO);
+ public void testOneEmptyThree() throws Exception {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_EMPTY_THREE);
assertThat(v.first, nullValue());
- assertThat(v.second, is(2));
+ assertThat(v.second, is(3));
}
@Test
@@ -83,22 +91,22 @@ public class VersionsTestCase {
errorResponse.render(errorMsg);
assertThat(errorMsg.toString(),
is("Could not parse X-Yahoo-Feed-Protocol-Versionheader of request (values: [1000000000]). " +
- "Server supports protocol versions [2, 3]"));
+ "Server supports protocol versions [3]"));
assertThat(v.second, is(-1));
}
@Test
- public void testTwoTooLarge() throws Exception {
- Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(TWO_TOO_LARGE_NUMBER);
+ public void testThreeTooLarge() throws Exception {
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(THREE_TOO_LARGE_NUMBER);
assertThat(v.first, nullValue());
- assertThat(v.second, is(2));
+ assertThat(v.second, is(3));
}
@Test
public void testTwoCommaTooLarge() throws Exception {
- Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(TWO_COMMA_TOO_LARGE_NUMBER);
+ Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(THREE_COMMA_TOO_LARGE_NUMBER);
assertThat(v.first, nullValue());
- assertThat(v.second, is(2));
+ assertThat(v.second, is(3));
}
@Test