diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2018-11-13 14:38:50 +0100 |
---|---|---|
committer | gjoranv <gv@oath.com> | 2019-01-21 15:09:26 +0100 |
commit | 08b6fe261b5b88635cb561208476fd1ac2607040 (patch) | |
tree | 86ee45fa34d554f571918eb530e4f340159b8ac4 /vespaclient-container-plugin | |
parent | 01705ccfef4a08cebd98a4eb4251e7032b1eac98 (diff) |
Remove internal http feed handler protocol version 2.
Diffstat (limited to 'vespaclient-container-plugin')
8 files changed, 31 insertions, 2016 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java index 5294545ad50..bd7d195b48b 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java @@ -51,16 +51,8 @@ import java.util.zip.GZIPInputStream; */ public class FeedHandler extends LoggingRequestHandler { - private final ExecutorService workers = Executors.newCachedThreadPool(ThreadFactoryFactory.getThreadFactory("feedhandler")); - private final DocumentTypeManager docTypeManager; - private final Map<String, ClientState> clients; - private final ScheduledThreadPoolExecutor cron; - private final SessionCache sessionCache; protected final ReplyHandler feedReplyHandler; - private final AtomicLong sessionId; - private final Metric metric; - private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(2, 3)); - private final String localHostname; + private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(3)); private final FeedHandlerV3 feedHandlerV3; @Inject @@ -73,46 +65,7 @@ public class FeedHandler extends LoggingRequestHandler { super(parentCtx); DocumentApiMetrics metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server"); feedHandlerV3 = new FeedHandlerV3(parentCtx, documentManagerConfig, sessionCache, threadpoolConfig, metricsHelper); - docTypeManager = createDocumentManager(documentManagerConfig); - clients = new HashMap<>(); - this.sessionCache = sessionCache; - sessionId = new AtomicLong(ThreadLocalRandom.current().nextLong()); feedReplyHandler = new FeedReplyReader(parentCtx.getMetric(), metricsHelper); - cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feedhandler.cron")); - cron.scheduleWithFixedDelay(new CleanClients(), 16, 11, TimeUnit.MINUTES); - this.metric = parentCtx.getMetric(); - this.localHostname = resolveLocalHostname(); - } - - /** - * Exposed for creating mocks. - */ - protected DocumentTypeManager createDocumentManager(DocumentmanagerConfig documentManagerConfig) { - return new DocumentTypeManager(documentManagerConfig); - } - - private class CleanClients implements Runnable { - - @Override - public void run() { - List<ClientState> clientsToShutdown = new ArrayList<>(); - long now = System.currentTimeMillis(); - - synchronized (clients) { - for (Iterator<Map.Entry<String, ClientState>> i = clients - .entrySet().iterator(); i.hasNext();) { - ClientState client = i.next().getValue(); - - if (now - client.creationTime > 10 * 60 * 1000) { - clientsToShutdown.add(client); - i.remove(); - } - } - } - for (ClientState client : clientsToShutdown) { - client.sourceSession.getReference().close(); - } - } } private Tuple2<HttpResponse, Integer> checkProtocolVersion(HttpRequest request) { @@ -135,8 +88,6 @@ public class FeedHandler extends LoggingRequestHandler { int version; if (washedClientVersions.contains("3")) { version = 3; - } else if (washedClientVersions.contains("2")) { // TODO: Vespa 7: Remove support for Version 2 - version = 2; } else { return new Tuple2<>(new ErrorHttpResponse( Headers.HTTP_NOT_ACCEPTABLE, @@ -175,43 +126,7 @@ public class FeedHandler extends LoggingRequestHandler { if (protocolVersion.first != null) { return protocolVersion.first; } - if (3 == protocolVersion.second) { - return feedHandlerV3.handle(request); - } - final BlockingQueue<OperationStatus> operations = new LinkedBlockingQueue<>(); - Tuple2<String, Boolean> clientId; - clientId = sessionId(request); - - if (clientId.second != null && clientId.second) { - if (log.isLoggable(LogLevel.DEBUG)) { - log.log(LogLevel.DEBUG, "Received initial request from client with session ID " + - clientId.first + ", protocol version " + protocolVersion.second); - } - } - - Feeder feeder; - try { - feeder = createFeeder(request, request.getData(), operations, clientId.first, - clientId.second, protocolVersion.second); - // the synchronous FeedResponse blocks draining the InputStream, letting the Feeder read it - workers.submit(feeder); - } catch (UnknownClientException uce) { - String msg = Exceptions.toMessageString(uce); - log.log(LogLevel.WARNING, msg); - return new ErrorHttpResponse(Status.BAD_REQUEST, msg); - } catch (Exception e) { - String msg = "Could not initialize document parsing"; - log.log(LogLevel.WARNING, "Could not initialize document parsing", e); - return new ErrorHttpResponse(Status.INTERNAL_SERVER_ERROR, msg + ": " + Exceptions.toMessageString(e)); - } - - try { - feeder.waitForRequestReceived(); - } catch (InterruptedException e) { - return new ErrorHttpResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage()); - } - - return new FeedResponse(200, operations, protocolVersion.second, clientId.first); + return feedHandlerV3.handle(request); } // Protected for testing @@ -225,82 +140,6 @@ public class FeedHandler extends LoggingRequestHandler { } } - /** - * Exposed for creating mocks. - */ - protected Feeder createFeeder( - HttpRequest request, - InputStream requestInputStream, - BlockingQueue<OperationStatus> operations, - String clientId, - boolean sessionIdWasGeneratedJustNow, - int protocolVersion) throws Exception { - if (protocolVersion != 2) - throw new IllegalStateException("Protocol version " + protocolVersion + " not supported."); - - return new Feeder( - unzipStreamIfNeeded(requestInputStream, request), - new FeedReaderFactory(), - docTypeManager, - operations, - popClient(clientId), - new FeederSettings(request), - clientId, - sessionIdWasGeneratedJustNow, - sourceSessionParams(request), - sessionCache, - this, - metric, - feedReplyHandler, - localHostname); - } - - private Tuple2<String, Boolean> sessionId(HttpRequest request) { - boolean sessionIdWasGeneratedJustNow = false; - String sessionId = request.getHeader(Headers.SESSION_ID); - if (sessionId == null) { - sessionId = Long.toString(this.sessionId.incrementAndGet()) + "-" + - remoteHostAddressAndPort(request.getJDiscRequest()) + "#" + - localHostname; - sessionIdWasGeneratedJustNow = true; - } - return new Tuple2<>(sessionId, sessionIdWasGeneratedJustNow); - } - - private static String remoteHostAddressAndPort(com.yahoo.jdisc.http.HttpRequest httpRequest) { - SocketAddress remoteAddress = httpRequest.getRemoteAddress(); - if (remoteAddress instanceof InetSocketAddress) { - InetSocketAddress isa = (InetSocketAddress) remoteAddress; - return isa.getAddress().getHostAddress() + "-" + isa.getPort(); - } - return ""; - } - - private static String resolveLocalHostname() { - String hostname = HostName.getLocalhost(); - if (hostname.equals("localhost")) { - return ""; - } - return hostname; - } - - /** - * Exposed for use when creating mocks. - */ - protected SourceSessionParams sourceSessionParams(HttpRequest request) { - SourceSessionParams params = new SourceSessionParams(); - String timeout = request.getHeader(Headers.TIMEOUT); - - if (timeout != null) { - try { - params.setTimeout(Double.parseDouble(timeout)); - } catch (NumberFormatException e) { - // NOP - } - } - return params; - } - @Override protected void destroy() { feedHandlerV3.destroy(); @@ -316,33 +155,5 @@ public class FeedHandler extends LoggingRequestHandler { private void internalDestroy() { super.destroy(); - workers.shutdown(); - cron.shutdown(); - synchronized (clients) { - for (ClientState client : clients.values()) { - client.sourceSession.getReference().close(); - } - clients.clear(); - } - } - - void putClient(final String sessionId, final ClientState value) { - synchronized (clients) { - clients.put(sessionId, value); - } } - - ClientState popClient(String sessionId) { - synchronized (clients) { - return clients.remove(sessionId); - } - } - - /** - * Guess what, testing only. - */ - void forceRunCleanClients() { - new CleanClients().run(); - } - } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Feeder.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Feeder.java deleted file mode 100644 index f7890db3b35..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/Feeder.java +++ /dev/null @@ -1,537 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.collections.Tuple2; -import com.yahoo.container.jdisc.messagebus.SessionCache; -import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentUpdate; -import com.yahoo.document.DocumentRemove; -import com.yahoo.document.DocumentPut; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; -import com.yahoo.documentapi.metrics.DocumentOperationType; -import com.yahoo.jdisc.Metric; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.log.LogLevel; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.Result; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.routing.ErrorDirective; -import com.yahoo.messagebus.routing.Hop; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.yolean.Exceptions; -import com.yahoo.text.Utf8String; -import com.yahoo.vespa.http.client.core.Encoder; -import com.yahoo.vespa.http.client.core.ErrorCode; -import com.yahoo.vespa.http.client.core.OperationStatus; -import com.yahoo.vespa.http.server.util.ByteLimitedInputStream; -import com.yahoo.vespaxmlparser.FeedReader; -import com.yahoo.vespaxmlparser.VespaXMLFeedReader; -import com.yahoo.vespaxmlparser.VespaXMLFeedReader.Operation; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -import static com.yahoo.messagebus.ErrorCode.SEND_QUEUE_FULL; - -/** - * Read documents from client, and send them through message bus. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -public class Feeder implements Runnable { - - protected static final Logger log = Logger.getLogger(Feeder.class.getName()); - - final InputStream requestInputStream; - final DocumentTypeManager docTypeManager; - final BlockingQueue<OperationStatus> operations; - final BlockingQueue<OperationStatus> feedReplies; - int numPending; - final FeederSettings settings; - final String clientId; - final ReferencedResource<SharedSourceSession> sourceSession; - final FeedHandler handler; - final Metric metric; - final Metric.Context metricContext; - private long prevOpsPerSecTime; // previous measurement time of OPS - private double operationsForOpsPerSec; - private final ReplyHandler feedReplyHandler; - protected final static String EOF = "End of stream"; - protected final boolean sessionIdWasGeneratedJustNow; - private final CountDownLatch requestReceived = new CountDownLatch(1); - private final FeedReaderFactory feedReaderFactory; - - public Feeder(InputStream requestInputStream, - FeedReaderFactory feedReaderFactory, - DocumentTypeManager docTypeManager, - BlockingQueue<OperationStatus> operations, - ClientState storedState, - FeederSettings settings, - String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams, - SessionCache sessionCache, - FeedHandler handler, Metric metric, ReplyHandler feedReplyHandler, - String localHostname) throws Exception { - super(); - this.feedReaderFactory = feedReaderFactory; - if (storedState == null) { - if (!sessionIdWasGeneratedJustNow) { - // We do not have a stored state, BUT the session ID came in with the request. - // Possible session timeout, server restart, server reconfig, or VIP usage. Examine. - examineClientId(clientId, localHostname); - } - numPending = 0; - feedReplies = new LinkedBlockingQueue<>(); - sourceSession = retainSession(sessionParams, sessionCache); - metricContext = createClientMetricContext(metric, clientId); - prevOpsPerSecTime = System.currentTimeMillis(); - operationsForOpsPerSec = 0.0; - } else { - //we have a stored state, and the session ID was obviously not generated now. All OK. - numPending = storedState.pending; - feedReplies = storedState.feedReplies; - sourceSession = storedState.sourceSession; - metricContext = storedState.metricContext; - prevOpsPerSecTime = storedState.prevOpsPerSecTime; - operationsForOpsPerSec = storedState.operationsForOpsPerSec; - } - this.clientId = clientId; - this.sessionIdWasGeneratedJustNow = sessionIdWasGeneratedJustNow; - this.requestInputStream = requestInputStream; - this.docTypeManager = docTypeManager; - this.operations = operations; - this.settings = settings; - this.handler = handler; - this.metric = metric; - this.feedReplyHandler = feedReplyHandler; - } - protected void examineClientId(String clientId, String localHostname) { - if (!clientId.contains("#")) { - throw new UnknownClientException("Got request from client with id '" + clientId + - "', but found no session for this client. " + - "This is expected during upgrades of gateways and infrastructure nodes."); - } - int hashPos = clientId.indexOf("#"); - String supposedHostname = clientId.substring(hashPos + 1, clientId.length()); - if (supposedHostname.isEmpty()) { - throw new UnknownClientException("Got request from client with id '" + clientId + - "', but found no session for this client. Possible session " + - "timeout due to inactivity, server restart or reconfig, " + - "or bad VIP usage. " + - "This is expected during upgrades of gateways and infrastructure nodes."); - } - - if (!supposedHostname.equals(localHostname)) { - throw new UnknownClientException("Got request from client with id '" + clientId + - "', but found no session for this client. " + - "Session was originally established towards host " + - supposedHostname + ", but our hostname is " + - localHostname + ". " + - "If using VIP rotation, this could be due to a session was rotated from one server to another. " + - "Configure VIP with persistence=enabled. " + - "This is expected during upgrades of gateways and infrastructure nodes."); - } - log.log(LogLevel.DEBUG, "Client '" + clientId + "' reconnected after session inactivity, or server restart " + - "or reconfig. Re-establishing session."); - } - - - - private static Metric.Context createClientMetricContext(Metric metric, String clientId) { - // No real value in separate metric dimensions per client. - return null; - } - - /** - * Exposed for creating mocks. - */ - protected ReferencedResource<SharedSourceSession> retainSession( - SourceSessionParams sessionParams, SessionCache sessionCache) { - return sessionCache.retainSource(sessionParams); - } - - @Override - public void run() { - try { - if (handshake()) { - return; //will putClient in finally block below - } - flushResponseQueue(); - feed(); - requestReceived.countDown(); - drain(); - } catch (InterruptedException e) { - // NOP, just terminate - } catch (Exception e) { - log.log(LogLevel.WARNING, "Unhandled exception while feeding: " - + Exceptions.toMessageString(e), e); - } catch (Throwable e) { - log.log(LogLevel.WARNING, "Unhandled error while feeding: " - + Exceptions.toMessageString(e), e); - throw e; - } finally { - requestReceived.countDown(); - putClient(); - try { - enqueue("-", "-", ErrorCode.END_OF_FEED, false, null); - } catch (InterruptedException e) { - // NOP, we are already exiting the thread - } - } - } - - protected boolean handshake() throws IOException { - if (sessionIdWasGeneratedJustNow) { - if (log.isLoggable(LogLevel.DEBUG)) { - log.log(LogLevel.DEBUG, "Handshake completed for client " + clientId + "."); - } - requestInputStream.close(); - return true; - } - return false; - } - - void feed() throws InterruptedException { - while (true) { - Result result; - String operationId; - try { - operationId = getNextOperationId(); - } catch (IOException ioe) { - if (log.isLoggable(LogLevel.DEBUG)) { - log.log(LogLevel.DEBUG, Exceptions.toMessageString(ioe), ioe); - } - break; - } - - //noinspection StringEquality - if (operationId == EOF) { - break; - } - - Tuple2<String, Message> msg; - try { - msg = getNextMessage(operationId); - setRoute(msg); - } catch (Exception e) { - if (log.isLoggable(LogLevel.DEBUG)) { - log.log(LogLevel.DEBUG, Exceptions.toMessageString(e), e); - } - //noinspection StringEquality - if (operationId != null) { //v1 always returns null, all others return something useful, or throw an exception above - msg = newErrorMessage(operationId, e); - } else { - break; - } - } - - if (msg == null) { - break; - } - - setMessageParameters(msg); - - while (true) { - try { - msg.second.pushHandler(feedReplyHandler); - if (settings.denyIfBusy) { - result = sourceSession.getResource().sendMessage(msg.second); - } else { - result = sourceSession.getResource().sendMessageBlocking(msg.second); - } - } catch (RuntimeException e) { - enqueue(msg.first, Exceptions.toMessageString(e), - ErrorCode.ERROR, false, msg.second); - return; - } - if (result.isAccepted() || result.getError().getCode() != SEND_QUEUE_FULL) { - break; - } - if (settings.denyIfBusy) { - break; - } else { - //This will never happen - Thread.sleep(100); - } - } - - if (result.isAccepted()) { - ++numPending; - updateMetrics(msg.second); - updateOpsPerSec(); - log(LogLevel.DEBUG, "Sent message successfully, document id: ", msg.first); - } else if (!result.getError().isFatal()) { - enqueue(msg.first, result.getError().getMessage(), ErrorCode.TRANSIENT_ERROR, false, msg.second); - break; - } else { - // should probably not happen, but everybody knows stuff that - // shouldn't happen, happens all the time - boolean isConditionNotMet = result.getError().getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED; - enqueue(msg.first, result.getError().getMessage(), ErrorCode.ERROR, isConditionNotMet, msg.second); - break; - } - } - } - - private Tuple2<String, Message> newErrorMessage(String operationId, Exception e) { - Message m = new FeedErrorMessage(operationId); - Tuple2<String, Message> msg = new Tuple2<>(operationId, m); - Hop hop = new Hop(); - hop.addDirective(new ErrorDirective(Exceptions.toMessageString(e))); - Route route = new Route(); - route.addHop(hop); - m.setRoute(route); - return msg; - } - - private void updateMetrics(Message m) { - metric.set( - MetricNames.PENDING, - Double.valueOf(sourceSession.getResource().session().getPendingCount()), - null); - - metric.add(MetricNames.NUM_OPERATIONS, 1, metricContext); - - if (m instanceof PutDocumentMessage) { - metric.add(MetricNames.NUM_PUTS, 1, metricContext); - } else if (m instanceof RemoveDocumentMessage) { - metric.add(MetricNames.NUM_REMOVES, 1, metricContext); - } else if (m instanceof UpdateDocumentMessage) { - metric.add(MetricNames.NUM_UPDATES, 1, metricContext); - } - } - - private void updateOpsPerSec() { - long now = System.currentTimeMillis(); - if ((now - prevOpsPerSecTime) >= 1000) { //every second - double ms = (double) (now - prevOpsPerSecTime); - final double opsPerSec = operationsForOpsPerSec / (ms / 1000); - metric.set(MetricNames.OPERATIONS_PER_SEC, opsPerSec, metricContext); - operationsForOpsPerSec = 1.0d; - prevOpsPerSecTime = now; - } else { - operationsForOpsPerSec += 1.0d; - } - } - - private Tuple2<String, Message> getNextMessage(String operationId) throws Exception { - VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); - Tuple2<String, Message> msg; - - getNextOperation(op); - - switch (op.getType()) { - case DOCUMENT: - msg = newPutMessage(op, operationId); - break; - case REMOVE: - msg = newRemoveMessage(op, operationId); - break; - case UPDATE: - msg = newUpdateMessage(op, operationId); - break; - default: - // typical end of feed - return null; - } - log(LogLevel.DEBUG, "Successfully deserialized document id: ", msg.first); - return msg; - } - - private void setMessageParameters(Tuple2<String, Message> msg) { - msg.second.setContext(new ReplyContext(msg.first, feedReplies, DocumentOperationType.fromMessage(msg.second))); - if (settings.traceLevel != null) { - msg.second.getTrace().setLevel(settings.traceLevel); - } - if (settings.priority != null) { - try { - DocumentProtocol.Priority priority = DocumentProtocol.Priority.valueOf(settings.priority); - if (msg.second instanceof DocumentMessage) { - ((DocumentMessage) msg.second).setPriority(priority); - } - } - catch (IllegalArgumentException i) { - log.severe(i.getMessage()); - } - } - } - - private void setRoute(Tuple2<String, Message> msg) { - if (settings.route != null) { - msg.second.setRoute(settings.route); - } - } - - protected void getNextOperation(VespaXMLFeedReader.Operation op) throws Exception { - int length = readByteLength(); - - try (InputStream limitedInputStream = new ByteLimitedInputStream(requestInputStream, length)){ - FeedReader reader = feedReaderFactory.createReader(limitedInputStream, docTypeManager, settings.dataFormat); - reader.read(op); - } - } - - protected String getNextOperationId() throws IOException { - return readOperationId(); - } - - private String readOperationId() throws IOException { - StringBuilder idBuf = new StringBuilder(100); - int c; - while ((c = requestInputStream.read()) != -1) { - if (c == 32) { - break; - } - idBuf.append((char) c); //it's ASCII - } - if (idBuf.length() == 0) { - return EOF; - } - return Encoder.decode(idBuf.toString(), new StringBuilder(idBuf.length())).toString(); - } - - private int readByteLength() throws IOException { - StringBuilder lenBuf = new StringBuilder(8); - int c; - while ((c = requestInputStream.read()) != -1) { - if (c == 10) { - break; - } - lenBuf.append((char) c); //it's ASCII - } - if (lenBuf.length() == 0) { - throw new IllegalStateException("Operation length missing."); - } - return Integer.valueOf(lenBuf.toString(), 16); - } - - protected final void log(LogLevel level, Object... msgParts) { - StringBuilder s; - - if (!log.isLoggable(level)) { - return; - } - - s = new StringBuilder(); - for (Object part : msgParts) { - s.append(part.toString()); - } - - log.log(level, s.toString()); - } - - private Tuple2<String, Message> newUpdateMessage(Operation op, String operationId) { - DocumentUpdate update = op.getDocumentUpdate(); - update.setCondition(op.getCondition()); - Message msg = new UpdateDocumentMessage(update); - - String id = (operationId == null) ? update.getId().toString() : operationId; - return new Tuple2<>(id, msg); - } - - private Tuple2<String, Message> newRemoveMessage(Operation op, String operationId) { - DocumentRemove remove = new DocumentRemove(op.getRemove()); - remove.setCondition(op.getCondition()); - Message msg = new RemoveDocumentMessage(remove); - - String id = (operationId == null) ? remove.getId().toString() : operationId; - return new Tuple2<>(id, msg); - } - - private Tuple2<String, Message> newPutMessage(Operation op, String operationId) { - DocumentPut put = new DocumentPut(op.getDocument()); - put.setCondition(op.getCondition()); - Message msg = new PutDocumentMessage(put); - - String id = (operationId == null) ? put.getId().toString() : operationId; - return new Tuple2<>(id, msg); - } - - - void flushResponseQueue() throws InterruptedException { - OperationStatus status = feedReplies.poll(); - while (status != null) { - decreasePending(status); - status = feedReplies.poll(); - } - } - - void putClient() { - handler.putClient(clientId, - new ClientState(numPending, - feedReplies, sourceSession, metricContext, - prevOpsPerSecTime, operationsForOpsPerSec)); - } - - void drain() throws InterruptedException { - if (settings.drain) { - while (numPending > 0) { - OperationStatus o = feedReplies.take(); - decreasePending(o); - } - } - } - - private void decreasePending(OperationStatus o) throws InterruptedException { - operations.put(o); - --numPending; - } - - private void enqueue(String id, String message, ErrorCode code, boolean isConditionalNotMet, Message msg) - throws InterruptedException { - String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0 - ? msg.getTrace().toString() - : ""; - operations.put(new OperationStatus(message, id, code, isConditionalNotMet, traceMessage)); - } - - public void waitForRequestReceived() throws InterruptedException { - requestReceived.await(1, TimeUnit.HOURS); - } - - public class FeedErrorMessage extends Message { - private long sequenceId; - - private FeedErrorMessage(String operationId) { - try { - DocumentId id = new DocumentId(operationId); - sequenceId = Arrays.hashCode(id.getGlobalId()); - } catch (Exception e) { - sequenceId = 0; - } - } - - @Override - public Utf8String getProtocol() { - return new Utf8String("vespa-feed-handler-internal-bogus-protocol"); - } - - @Override - public int getType() { - return 1234; - } - - @Override - public boolean hasSequenceId() { - return true; - } - - @Override - public long getSequenceId() { - return sequenceId; - } - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/FeedHandlerTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/FeedHandlerTest.java deleted file mode 100644 index c0cc907c671..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/feedhandler/FeedHandlerTest.java +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.feedhandler; - -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.jdisc.HeaderFields; -import com.yahoo.jdisc.Metric; -import com.yahoo.container.logging.AccessLog; -import com.yahoo.metrics.simple.MetricReceiver; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespa.http.client.core.OperationStatus; -import com.yahoo.vespa.http.server.FeedHandler; -import com.yahoo.vespa.http.server.Feeder; -import org.junit.Test; - -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Unit test for FeedHandler class. - * - * @author dybis - */ -public class FeedHandlerTest { - - /** - * This class extends FeedHandler and allows to create a custom Feeder. - */ - static class TestFeedHandler extends FeedHandler { - private final CountDownLatch countDownLatch = new CountDownLatch(1); - - public TestFeedHandler() throws Exception { - super(new FeedHandler.Context(Executors.newCachedThreadPool(), - mock(AccessLog.class), - mock(Metric.class)), - null, null, null, MetricReceiver.nullImplementation); - } - - /** - * Builds a feeder that blocks until countDownLatch is stepped down. - */ - @Override - protected Feeder createFeeder( - com.yahoo.container.jdisc.HttpRequest request, - InputStream requestInputStream, - final BlockingQueue<OperationStatus> operations, - String clientId, - boolean sessionIdWasGeneratedJustNow, - int protocolVersion) throws Exception { - Feeder feeder = mock(Feeder.class); - doAnswer(invocation -> { - try { - countDownLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return null; - }).when(feeder).waitForRequestReceived(); - return feeder; - } - } - - /** - * nginx require that a post is finished before the server ack with a response. This behaviour is verified - * in this test - */ - @Test - public void testResponseIsSentAfterWaitForRequestReceivedReturns() throws Exception { - HttpRequest request = mock(HttpRequest.class); - - // Create a request with valid version. - com.yahoo.jdisc.http.HttpRequest jdiscRequest = mock(com.yahoo.jdisc.http.HttpRequest.class); - HeaderFields headerFields = mock(HeaderFields.class); - List<String> version = new ArrayList<>(); - version.add("2"); - when(headerFields.get(Headers.VERSION)).thenReturn(version); - when(jdiscRequest.headers()).thenReturn(headerFields); - when(request.getJDiscRequest()).thenReturn(jdiscRequest); - - TestFeedHandler feedHandler = new TestFeedHandler(); - // After a short period, make the feed finish. - new Thread(() -> { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - feedHandler.countDownLatch.countDown(); - }).start(); - // This should not return before countdown latch is stepped down. - feedHandler.handle(request); - // This should always returns after the countDownLatch has become zero. This might cause false positive, - // but not false negatives. This is fine. - assertThat(feedHandler.countDownLatch.getCount(), is(0L)); - - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ErrorsInResultTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ErrorsInResultTestCase.java deleted file mode 100644 index 47745b29032..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ErrorsInResultTestCase.java +++ /dev/null @@ -1,240 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.container.jdisc.messagebus.SessionCache; -import com.yahoo.container.logging.AccessLog; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.jdisc.References; -import com.yahoo.jdisc.http.HttpRequest.Method; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.metrics.simple.MetricReceiver; -import com.yahoo.text.Utf8; -import com.yahoo.text.Utf8String; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespa.http.client.core.OperationStatus; -import com.yahoo.vespaxmlparser.MockFeedReaderFactory; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; - -/** - * Check FeedHandler APIs. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -public class V2ErrorsInResultTestCase { - - LessConfiguredHandler handler; - ExecutorService workers; - - @Before - public void setUp() throws Exception { - workers = Executors.newCachedThreadPool(); - handler = new LessConfiguredHandler(workers); - } - - @After - public void tearDown() throws Exception { - handler.destroy(); - workers.shutdown(); - } - - private static class LessConfiguredHandler extends FeedHandler { - - public LessConfiguredHandler(Executor executor) throws Exception { - super(new FeedHandler.Context(executor, - AccessLog.voidAccessLog(), - new DummyMetric()), - null, null, null, MetricReceiver.nullImplementation); - } - - - @Override - protected Feeder createFeeder(HttpRequest request, InputStream requestInputStream, - BlockingQueue<OperationStatus> operations, String clientId, - boolean sessionIdWasGeneratedJustNow, int protocolVersion) - throws Exception { - return new LessConfiguredFeeder(requestInputStream, operations, - popClient(clientId), new FeederSettings(request), clientId, sessionIdWasGeneratedJustNow, - sourceSessionParams(request), null, this, this.feedReplyHandler, ""); - } - - @Override - protected DocumentTypeManager createDocumentManager( - DocumentmanagerConfig documentManagerConfig) { - return null; - } - } - - private static class MockSharedSession extends SharedSourceSession { - int count; - - public MockSharedSession(SourceSessionParams params) { - super(new SharedMessageBus(new MessageBus(new MockNetwork(), - new MessageBusParams())), params); - count = 0; - } - - @Override - public Result sendMessageBlocking(Message msg) throws InterruptedException { - return sendMessage(msg); - } - - @Override - public Result sendMessage(Message msg) { - Result r; - ReplyHandler handler = msg.popHandler(); - - switch (count++) { - case 0: - r = new Result(ErrorCode.FATAL_ERROR, - "boom"); - break; - case 1: - r = new Result(ErrorCode.TRANSIENT_ERROR, - "transient boom"); - break; - case 2: - final FailedReply reply = new FailedReply(msg.getContext()); - reply.addError(new Error( - ErrorCode.FATAL_ERROR, - "bad mojo, dude")); - handler.handleReply(reply); - r = Result.ACCEPTED; - break; - default: - handler.handleReply(new MockReply(msg.getContext())); - r = Result.ACCEPTED; - } - return r; - } - - } - - private static class FailedReply extends Reply { - Object context; - - public FailedReply(Object context) { - this.context = context; - } - - @Override - public Utf8String getProtocol() { - return null; - } - - @Override - public int getType() { - return 0; - } - - @Override - public Object getContext() { - return context; - } - } - - private static class LessConfiguredFeeder extends Feeder { - - public LessConfiguredFeeder(InputStream stream, - BlockingQueue<OperationStatus> operations, - ClientState storedState, FeederSettings settings, - String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams, - SessionCache sessionCache, FeedHandler handler, ReplyHandler feedReplyHandler, - String localHostname) throws Exception { - super(stream, new MockFeedReaderFactory(), null, operations, storedState, settings, clientId, sessionIdWasGeneratedJustNow, - sessionParams, sessionCache, handler, new DummyMetric(), feedReplyHandler, localHostname); - } - - protected ReferencedResource<SharedSourceSession> retainSession( - SourceSessionParams sessionParams, SessionCache sessionCache) { - final SharedSourceSession session = new MockSharedSession(sessionParams); - return new ReferencedResource<>(session, References.fromResource(session)); - } - } - - @Test - public final void test() throws IOException { - String sessionId; - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - HttpResponse r = handler.handle(nalle); - sessionId = r.headers().getFirst(Headers.SESSION_ID); - r.render(out); - assertEquals("", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 ERROR boom \n", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 TRANSIENT_ERROR transient{20}boom \n", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 ERROR bad{20}mojo,{20}dude \n", - Utf8.toString(out.toByteArray())); - } - - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ExternalFeedTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ExternalFeedTestCase.java deleted file mode 100644 index 9960d98f7f1..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2ExternalFeedTestCase.java +++ /dev/null @@ -1,530 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.container.jdisc.messagebus.SessionCache; -import com.yahoo.container.logging.AccessLog; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.jdisc.References; -import com.yahoo.jdisc.http.HttpRequest.Method; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageBus; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.Result; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.network.Network; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.metrics.simple.MetricReceiver; -import com.yahoo.text.Utf8; -import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespa.http.client.core.OperationStatus; -import com.yahoo.vespaxmlparser.MockFeedReaderFactory; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.logging.Handler; -import java.util.logging.Level; -import java.util.logging.LogRecord; -import java.util.logging.Logger; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -/** - * Check FeedHandler APIs. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -public class V2ExternalFeedTestCase { - - LessConfiguredHandler handler; - ExecutorService workers; - Level logLevel; - Logger logger; - boolean initUseParentHandlers; - LogBuffer logChecker; - - @Before - public void setUp() throws Exception { - workers = Executors.newCachedThreadPool(); - handler = new LessConfiguredHandler(workers); - logger = Logger.getLogger(Feeder.class.getName()); - logLevel = logger.getLevel(); - logger.setLevel(Level.ALL); - initUseParentHandlers = logger.getUseParentHandlers(); - logChecker = new LogBuffer(); - logger.setUseParentHandlers(false); - logger.addHandler(logChecker); - } - - @After - public void tearDown() throws Exception { - handler.destroy(); - workers.shutdown(); - logger.setLevel(logLevel); - logger.removeHandler(logChecker); - logger.setUseParentHandlers(initUseParentHandlers); - } - - private static class LogBuffer extends Handler { - public final BlockingQueue<LogRecord> records = new LinkedBlockingQueue<>(); - - @Override - public void publish(LogRecord record) { - try { - records.put(record); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - @Override - public void flush() { - } - - @Override - public void close() throws SecurityException { - } - } - - private static class LessConfiguredHandler extends FeedHandler { - volatile DataFormat lastFormatSeen; - - public LessConfiguredHandler(Executor executor) throws Exception { - super(new FeedHandler.Context(executor, - AccessLog.voidAccessLog(), - new DummyMetric()), - null, null, null, MetricReceiver.nullImplementation); - } - - @Override - protected Feeder createFeeder(HttpRequest request, - InputStream requestInputStream, - BlockingQueue<OperationStatus> operations, - String clientId, - boolean sessionIdWasGeneratedJustNow, int protocolVersion) - throws Exception { - LessConfiguredFeeder f = new LessConfiguredFeeder(requestInputStream, operations, - popClient(clientId), new FeederSettings(request), clientId, sessionIdWasGeneratedJustNow, - sourceSessionParams(request), null, this, this.feedReplyHandler, "ourHostname"); - lastFormatSeen = f.settings.dataFormat; - return f; - } - - @Override - protected DocumentTypeManager createDocumentManager( - DocumentmanagerConfig documentManagerConfig) { - return null; - } - } - - private static class MockSharedSession extends SharedSourceSession { - - public MockSharedSession(SourceSessionParams params) { - super(new SharedMessageBus(new MessageBus(new MockNetwork(), - new MessageBusParams())), params); - } - - @Override - public Result sendMessageBlocking(Message msg) throws InterruptedException { - return sendMessage(msg); - } - - @Override - public Result sendMessage(Message msg) { - ReplyHandler handler = msg.popHandler(); - MockReply mockReply = new MockReply(msg.getContext()); - if (msg instanceof Feeder.FeedErrorMessage) { - mockReply.addError(new com.yahoo.messagebus.Error(123, "Could not feed this")); - } - if (msg instanceof PutDocumentMessage) { - assert(msg.getTrace().getLevel() == 4); - assert(((PutDocumentMessage) msg).getPriority().name().equals("LOWEST")); - } - handler.handleReply(mockReply); - return Result.ACCEPTED; - } - - } - - private static class LessConfiguredFeeder extends Feeder { - public LessConfiguredFeeder(InputStream stream, - BlockingQueue<OperationStatus> operations, - ClientState storedState, FeederSettings settings, - String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams, - SessionCache sessionCache, FeedHandler handler, ReplyHandler feedReplyHandler, - String localHostname) throws Exception { - super(stream, new MockFeedReaderFactory(), null, operations, storedState, settings, clientId, sessionIdWasGeneratedJustNow, - sessionParams, sessionCache, handler, new DummyMetric(), feedReplyHandler, localHostname); - } - - protected ReferencedResource<SharedSourceSession> retainSession( - SourceSessionParams sessionParams, SessionCache sessionCache) { - final SharedSourceSession session = new MockSharedSession(sessionParams); - return new ReferencedResource<>(session, References.fromResource(session)); - } - } - - @Test - public final void test() throws IOException, InterruptedException { - String sessionId; - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - HttpResponse r = handler.handle(nalle); - sessionId = r.headers().getFirst(Headers.SESSION_ID); - r.render(out); - assertEquals("", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[]{1, 3, 2}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST"); - nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n", - Utf8.toString(out.toByteArray())); - assertEquals("text/plain", r.getContentType()); - assertEquals(StandardCharsets.US_ASCII.name(), r.getCharacterEncoding()); - assertEquals(7, logChecker.records.size()); - String actualHandshake = logChecker.records.take().getMessage(); - assertThat(actualHandshake, actualHandshake.matches("Handshake completed for client (-?)(.+?)-#(.*?)\\."), is(true)); - assertEquals("Successfully deserialized document id: id:banana:banana::doc1", - logChecker.records.take().getMessage()); - assertEquals("Sent message successfully, document id: id:banana:banana::doc1", - logChecker.records.take().getMessage()); - } - - //test session ID without #, i.e. something fishy related to VIPs is going on - sessionId = "something"; - - { - InputStream in = new MetaStream(new byte[]{1, 3, 2}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST"); - nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4"); - nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "2"); - - HttpResponse r = handler.handle(nalle); - r.render(out); - String expectedErrorMsg = "Got request from client with id 'something', but found no session for this client."; - assertThat(Utf8.toString(out.toByteArray()), containsString(expectedErrorMsg)); - assertEquals("text/plain", r.getContentType()); - assertEquals(StandardCharsets.UTF_8.name(), r.getCharacterEncoding()); - } - - //test session ID with trailing # but no hostname - sessionId = "something#"; - - { - InputStream in = new MetaStream(new byte[]{1, 3, 2}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST"); - nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4"); - HttpResponse r = handler.handle(nalle); - r.render(out); - String expectedErrorMsg = "Got request from client with id 'something#', but found no session for this client."; - assertThat(Utf8.toString(out.toByteArray()), containsString(expectedErrorMsg)); - assertEquals("text/plain", r.getContentType()); - assertEquals(StandardCharsets.UTF_8.name(), r.getCharacterEncoding()); - } - - //test session ID with trailing # and some unknown hostname at the end - sessionId = "something#thisHostnameDoesNotExistAnywhere"; - - { - InputStream in = new MetaStream(new byte[]{1, 3, 2}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST"); - nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4"); - HttpResponse r = handler.handle(nalle); - r.render(out); - String expectedErrorMsg = "Got request from client with id 'something#thisHostnameDoesNotExistAnywhere', " + - "but found no session for this client. Session was originally established " + - "towards host thisHostnameDoesNotExistAnywhere, but our hostname is " + - "ourHostname."; - assertThat(Utf8.toString(out.toByteArray()), containsString(expectedErrorMsg)); - assertEquals("text/plain", r.getContentType()); - assertEquals(StandardCharsets.UTF_8.name(), r.getCharacterEncoding()); - } - - //test session ID with trailing # and some unknown hostname at the end - sessionId = "something#ourHostname"; - - { - InputStream in = new MetaStream(new byte[]{1, 3, 2}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST"); - nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n" + - "id:banana:banana::doc1 OK Document{20}processed. \n" + - "id:banana:banana::doc1 OK Document{20}processed. \n", - Utf8.toString(out.toByteArray())); - assertEquals("text/plain", r.getContentType()); - assertEquals(StandardCharsets.US_ASCII.name(), r.getCharacterEncoding()); - Thread.sleep(1000); - } - } - - @Test - public final void testFailedReading() throws IOException { - String sessionId; - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - HttpResponse r = handler.handle(nalle); - sessionId = r.headers().getFirst(Headers.SESSION_ID); - r.render(out); - assertEquals("", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[] { 4 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 ERROR Could{20}not{20}feed{20}this \n", - Utf8.toString(out.toByteArray())); - } - } - - @Test - public final void testCleaningDoesNotBlowUp() throws IOException { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("", - Utf8.toString(out.toByteArray())); - handler.forceRunCleanClients(); - } - - @Test - public final void testMockNetworkDoesNotBlowUp() { - Network n = new MockNetwork(); - n.registerSession(null); - n.unregisterSession(null); - assertTrue(n.allocServiceAddress(null)); - n.freeServiceAddress(null); - n.send(null, null); - assertNull(n.getConnectionSpec()); - assertNull(n.getMirror()); - } - - @Test - public final void testMockReplyDoesNotBlowUp() { - MockReply r = new MockReply(null); - assertNull(r.getProtocol()); - assertEquals(0, r.getType()); - assertFalse(r.hasFatalErrors()); - } - - @Test - public final void testFlush() throws IOException { - String sessionId; - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - HttpResponse r = handler.handle(nalle); - sessionId = r.headers().getFirst(Headers.SESSION_ID); - r.render(out); - assertEquals("", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[] { 1, 1, 1, 1, 1, 1, 1}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST"); - nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n", - Utf8.toString(out.toByteArray())); - } - } - - @Test - public final void testIllegalVersion() throws IOException { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers() - .add(Headers.VERSION, Integer.toString(Integer.MAX_VALUE)); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals(Headers.HTTP_NOT_ACCEPTABLE, r.getStatus()); - } - - @Test - public final void testSettings() { - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - nalle.getJDiscRequest().headers().add(Headers.ROUTE, "bamse brakar"); - nalle.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "true"); - FeederSettings settings = new FeederSettings(nalle); - assertEquals(false, settings.drain); - assertEquals(2, settings.route.getNumHops()); - assertEquals(true, settings.denyIfBusy); - } - - @Test - public final void testJsonInputFormat() throws IOException, InterruptedException { - String sessionId; - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - HttpResponse r = handler.handle(nalle); - sessionId = r.headers().getFirst(Headers.SESSION_ID); - r.render(out); - assertEquals("", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[]{1, 3, 2}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.TIMEOUT, "1000000000"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.DATA_FORMAT, DataFormat.JSON_UTF8.name()); - nalle.getJDiscRequest().headers().add(Headers.PRIORITY, "LOWEST"); - nalle.getJDiscRequest().headers().add(Headers.TRACE_LEVEL, "4"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n" - + "id:banana:banana::doc1 OK Document{20}processed. \n", - Utf8.toString(out.toByteArray())); - assertEquals("text/plain", r.getContentType()); - assertEquals(StandardCharsets.US_ASCII.name(), r.getCharacterEncoding()); - assertEquals(7, logChecker.records.size()); - String actualHandshake = logChecker.records.take().getMessage(); - assertThat(actualHandshake, actualHandshake.matches("Handshake completed for client (-?)(.+?)-#(.*?)\\."), is(true)); - assertEquals("Successfully deserialized document id: id:banana:banana::doc1", - logChecker.records.take().getMessage()); - assertEquals("Sent message successfully, document id: id:banana:banana::doc1", - logChecker.records.take().getMessage()); - assertSame(DataFormat.JSON_UTF8, handler.lastFormatSeen); - } - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2FailingMessagebusTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2FailingMessagebusTestCase.java deleted file mode 100644 index 6290c22f694..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2FailingMessagebusTestCase.java +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.container.jdisc.messagebus.SessionCache; -import com.yahoo.container.logging.AccessLog; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.jdisc.References; -import com.yahoo.jdisc.http.HttpRequest.Method; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.metrics.simple.MetricReceiver; -import com.yahoo.text.Utf8; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespa.http.client.core.OperationStatus; -import com.yahoo.vespaxmlparser.MockFeedReaderFactory; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; - -/** - * Check FeedHandler APIs. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -public class V2FailingMessagebusTestCase { - - LessConfiguredHandler handler; - ExecutorService workers; - int mbus; - - @Before - public void setUp() throws Exception { - workers = Executors.newCachedThreadPool(); - handler = new LessConfiguredHandler(workers); - mbus = 0; - } - - @After - public void tearDown() throws Exception { - handler.destroy(); - workers.shutdown(); - mbus = 0; - } - - private class LessConfiguredHandler extends FeedHandler { - - public LessConfiguredHandler(Executor executor) throws Exception { - super(new FeedHandler.Context(executor, AccessLog.voidAccessLog(), new DummyMetric()), - null, null, null, MetricReceiver.nullImplementation); - } - - @Override - protected Feeder createFeeder(HttpRequest request, - InputStream requestInputStream, - BlockingQueue<OperationStatus> operations, - String clientId, - boolean sessionIdWasGeneratedJustNow, int protocolVersion) throws Exception { - return new LessConfiguredFeeder(requestInputStream, operations, - popClient(clientId), new FeederSettings(request), clientId, sessionIdWasGeneratedJustNow, - sourceSessionParams(request), null, this, this.feedReplyHandler, ""); - } - - @Override - protected DocumentTypeManager createDocumentManager( - DocumentmanagerConfig documentManagerConfig) { - return null; - } - } - - private class MockSharedSession extends SharedSourceSession { - - public MockSharedSession(SourceSessionParams params) { - super(new SharedMessageBus(new MessageBus(new MockNetwork(), - new MessageBusParams())), params); - } - - @Override - public Result sendMessageBlocking(Message msg) throws InterruptedException { - return sendMessage(msg); - } - - @Override - public Result sendMessage(Message msg) { - ReplyHandler handler = msg.popHandler(); - - switch (mbus) { - case 0: - throw new RuntimeException("boom"); - case 1: - Result r = new Result(ErrorCode.SEND_QUEUE_FULL, "tralala"); - mbus = 2; - return r; - case 2: - handler.handleReply(new MockReply(msg.getContext())); - return Result.ACCEPTED; - default: - throw new IllegalStateException("WTF?!"); - } - } - } - - private class LessConfiguredFeeder extends Feeder { - - public LessConfiguredFeeder(InputStream inputStream, - BlockingQueue<OperationStatus> operations, - ClientState storedState, FeederSettings settings, - String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams, - SessionCache sessionCache, FeedHandler handler, ReplyHandler feedReplyHandler, - String localHostname) throws Exception { - super(inputStream, new MockFeedReaderFactory(), null, operations, storedState, settings, clientId, sessionIdWasGeneratedJustNow, - sessionParams, sessionCache, handler, new DummyMetric(), feedReplyHandler, localHostname); - } - - protected ReferencedResource<SharedSourceSession> retainSession( - SourceSessionParams sessionParams, SessionCache sessionCache) { - final SharedSourceSession session = new MockSharedSession(sessionParams); - return new ReferencedResource<>(session, References.fromResource(session)); - } - } - - @Test - public final void testFailingMbus() throws IOException { - String sessionId; - { - InputStream in = new MetaStream(new byte[]{1}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - HttpResponse r = handler.handle(nalle); - sessionId = r.headers().getFirst(Headers.SESSION_ID); - r.render(out); - assertEquals("", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[]{1}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 ERROR boom \n", - Utf8.toString(out.toByteArray())); - } - } - - @Test - public final void testBusyMbus() throws IOException { - String sessionId; - { - InputStream in = new MetaStream(new byte[]{1}); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - mbus = 2; - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - HttpResponse r = handler.handle(nalle); - sessionId = r.headers().getFirst(Headers.SESSION_ID); - r.render(out); - assertEquals("", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - mbus = 1; - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - nalle.getJDiscRequest().headers() - .add(Headers.DENY_IF_BUSY, "false"); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 OK Document{20}processed. \n", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - mbus = 1; - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - nalle.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "true"); - HttpResponse r = handler.handle(nalle); - r.render(out); - assertEquals("id:banana:banana::doc1 TRANSIENT_ERROR tralala \n", - Utf8.toString(out.toByteArray())); - } - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2NoXmlReaderTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2NoXmlReaderTestCase.java deleted file mode 100644 index 633477dcc79..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V2NoXmlReaderTestCase.java +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; -import com.yahoo.container.jdisc.messagebus.SessionCache; -import com.yahoo.container.logging.AccessLog; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.jdisc.References; -import com.yahoo.jdisc.http.HttpRequest.Method; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.metrics.simple.MetricReceiver; -import com.yahoo.text.Utf8; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespa.http.client.core.OperationStatus; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; - -/** - * Check FeedHandler APIs. - * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> - */ -public class V2NoXmlReaderTestCase { - - LessConfiguredHandler handler; - ExecutorService workers; - - @Before - public void setUp() throws Exception { - workers = Executors.newCachedThreadPool(); - handler = new LessConfiguredHandler(workers); - } - - @After - public void tearDown() throws Exception { - handler.destroy(); - workers.shutdown(); - } - - private static class LessConfiguredHandler extends FeedHandler { - - public LessConfiguredHandler(Executor executor) throws Exception { - super(new FeedHandler.Context(executor, AccessLog.voidAccessLog(), new DummyMetric()), - null, null, null, MetricReceiver.nullImplementation); - } - - - @Override - protected Feeder createFeeder(HttpRequest request, InputStream requestInputStream, - BlockingQueue<OperationStatus> operations, String clientId, - boolean sessionIdWasGeneratedJustNow, int protocolVersion) - throws Exception { - return new LessConfiguredFeeder(requestInputStream, operations, - popClient(clientId), new FeederSettings(request), clientId, sessionIdWasGeneratedJustNow, - sourceSessionParams(request), null, this, this.feedReplyHandler, ""); - } - - @Override - protected DocumentTypeManager createDocumentManager( - DocumentmanagerConfig documentManagerConfig) { - return null; - } - } - - private static class MockSharedSession extends SharedSourceSession { - - public MockSharedSession(SourceSessionParams params) { - super(new SharedMessageBus(new MessageBus(new MockNetwork(), - new MessageBusParams())), params); - } - - @Override - public Result sendMessageBlocking(Message msg) throws InterruptedException { - return sendMessage(msg); - } - - @Override - public Result sendMessage(Message msg) { - ReplyHandler handler = msg.popHandler(); - MockReply mockReply = new MockReply(msg.getContext()); - if (msg instanceof Feeder.FeedErrorMessage) { - mockReply.addError(new Error(123, "Could not feed this")); - } - handler.handleReply(mockReply); - return Result.ACCEPTED; - } - - } - - private static class LessConfiguredFeeder extends Feeder { - - public LessConfiguredFeeder(InputStream inputStream, - BlockingQueue<OperationStatus> operations, - ClientState storedState, FeederSettings settings, - String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams, - SessionCache sessionCache, FeedHandler handler, ReplyHandler feedReplyHandler, - String localHostname) throws Exception { - super(inputStream, null, null, operations, storedState, settings, clientId, sessionIdWasGeneratedJustNow, - sessionParams, sessionCache, handler, new DummyMetric(), feedReplyHandler, localHostname); - } - - protected ReferencedResource<SharedSourceSession> retainSession( - SourceSessionParams sessionParams, SessionCache sessionCache) { - final SharedSourceSession session = new MockSharedSession(sessionParams); - return new ReferencedResource<>(session, References.fromResource(session)); - } - } - - @Test - public final void test() throws IOException { - String sessionId; - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest - .createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); - HttpResponse r = handler.handle(nalle); - sessionId = r.headers().getFirst(Headers.SESSION_ID); - r.render(out); - assertEquals("", - Utf8.toString(out.toByteArray())); - } - { - InputStream in = new MetaStream(new byte[] { 1 }); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - HttpRequest nalle = HttpRequest.createTestRequest( - "http://test4-steinar:19020/reserved-for-internal-use/feedapi", - Method.POST, in); - nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); - nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); - nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); - HttpResponse r = handler.handle(nalle); - r.render(out); - //This is different from v1. If we cannot parse XML, we will still get response code 200, but with a sensible - //error message in the response. - assertEquals(200, r.getStatus()); - assertEquals("id:banana:banana::doc1 ERROR Could{20}not{20}feed{20}this \n", - Utf8.toString(out.toByteArray())); - } - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java index d6f605b0379..128664dda9e 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/VersionsTestCase.java @@ -23,13 +23,14 @@ public class VersionsTestCase { private static final List<String> EMPTY = Collections.emptyList(); private static final List<String> ONE_TWO = Arrays.asList("1", "2"); + private static final List<String> ONE_THREE = Arrays.asList("1", "3"); private static final List<String> TWO_THREE = Arrays.asList("3", "2"); - private static final List<String> ONE_NULL_TWO = Arrays.asList("1", null, "2"); - private static final List<String> ONE_COMMA_TWO = Collections.singletonList("1, 2"); - private static final List<String> ONE_EMPTY_TWO = Arrays.asList("1", "", "2"); + private static final List<String> ONE_NULL_THREE = Arrays.asList("1", null, "3"); + private static final List<String> ONE_COMMA_THREE = Collections.singletonList("1, 3"); + private static final List<String> ONE_EMPTY_THREE = Arrays.asList("1", "", "3"); private static final List<String> TOO_LARGE_NUMBER = Collections.singletonList("1000000000"); - private static final List<String> TWO_TOO_LARGE_NUMBER = Arrays.asList("2", "1000000000"); - private static final List<String> TWO_COMMA_TOO_LARGE_NUMBER = Arrays.asList("2,1000000000"); + private static final List<String> THREE_TOO_LARGE_NUMBER = Arrays.asList("3", "1000000000"); + private static final List<String> THREE_COMMA_TOO_LARGE_NUMBER = Arrays.asList("3,1000000000"); private static final List<String> GARBAGE = Collections.singletonList("garbage"); @Test @@ -42,8 +43,15 @@ public class VersionsTestCase { @Test public void testOneTwo() throws Exception { Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_TWO); + assertThat(v.first, instanceOf(ErrorHttpResponse.class)); + assertThat(v.second, is(-1)); + } + + @Test + public void testOneThree() throws Exception { + Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_THREE); assertThat(v.first, nullValue()); - assertThat(v.second, is(2)); + assertThat(v.second, is(3)); } @Test @@ -54,24 +62,24 @@ public class VersionsTestCase { } @Test - public void testOneNullTwo() throws Exception { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_NULL_TWO); + public void testOneNullThree() throws Exception { + Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_NULL_THREE); assertThat(v.first, nullValue()); - assertThat(v.second, is(2)); + assertThat(v.second, is(3)); } @Test - public void testOneCommaTwo() throws Exception { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_COMMA_TWO); + public void testOneCommaThree() throws Exception { + Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_COMMA_THREE); assertThat(v.first, nullValue()); - assertThat(v.second, is(2)); + assertThat(v.second, is(3)); } @Test - public void testOneEmptyTwo() throws Exception { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_EMPTY_TWO); + public void testOneEmptyThree() throws Exception { + Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(ONE_EMPTY_THREE); assertThat(v.first, nullValue()); - assertThat(v.second, is(2)); + assertThat(v.second, is(3)); } @Test @@ -83,22 +91,22 @@ public class VersionsTestCase { errorResponse.render(errorMsg); assertThat(errorMsg.toString(), is("Could not parse X-Yahoo-Feed-Protocol-Versionheader of request (values: [1000000000]). " + - "Server supports protocol versions [2, 3]")); + "Server supports protocol versions [3]")); assertThat(v.second, is(-1)); } @Test - public void testTwoTooLarge() throws Exception { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(TWO_TOO_LARGE_NUMBER); + public void testThreeTooLarge() throws Exception { + Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(THREE_TOO_LARGE_NUMBER); assertThat(v.first, nullValue()); - assertThat(v.second, is(2)); + assertThat(v.second, is(3)); } @Test public void testTwoCommaTooLarge() throws Exception { - Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(TWO_COMMA_TOO_LARGE_NUMBER); + Tuple2<HttpResponse, Integer> v = FeedHandler.doCheckProtocolVersion(THREE_COMMA_TOO_LARGE_NUMBER); assertThat(v.first, nullValue()); - assertThat(v.second, is(2)); + assertThat(v.second, is(3)); } @Test |