diff options
author | Jon Bratseth <bratseth@gmail.com> | 2022-06-07 11:59:03 +0200 |
---|---|---|
committer | gjoranv <gv@verizonmedia.com> | 2022-06-08 11:45:30 +0200 |
commit | 033d6494edc17b554ab841c3f5ea70bc5f8925de (patch) | |
tree | fc24f1564b91ee6e7009f4a92adb0981ffb92924 /vespaclient-container-plugin/src/main/java | |
parent | 72e82db1739fd88a78aba7d55c7ee4ef7f953863 (diff) |
Revert "Remove http client use"
This reverts commit a7fd13540d34de50ed3526576c62eebc476a1e1c.
Diffstat (limited to 'vespaclient-container-plugin/src/main/java')
17 files changed, 1244 insertions, 2 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index ed068c77e11..8c2e39d595e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -20,6 +20,7 @@ import com.yahoo.document.DocumentUpdate; import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.document.fieldset.AllFields; import com.yahoo.document.fieldset.DocIdOnly; import com.yahoo.document.fieldset.DocumentOnly; import com.yahoo.document.idstring.IdIdString; @@ -45,7 +46,6 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.metrics.DocumentApiMetrics; import com.yahoo.documentapi.metrics.DocumentOperationStatus; -import com.yahoo.documentapi.metrics.MetricNames; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.Response; @@ -68,6 +68,7 @@ import com.yahoo.restapi.Path; import com.yahoo.search.query.ParameterParser; import com.yahoo.text.Text; import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.vespa.http.server.MetricNames; import com.yahoo.yolean.Exceptions; import com.yahoo.yolean.Exceptions.RunnableThrowingIOException; diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java new file mode 100644 index 00000000000..875ff3e5bf0 --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java @@ -0,0 +1,287 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.container.jdisc.HttpRequest; +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.ReferencedResource; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Result; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.net.HostName; +import com.yahoo.vespa.http.client.core.ErrorCode; +import com.yahoo.vespa.http.client.core.Headers; +import com.yahoo.vespa.http.client.core.OperationStatus; +import com.yahoo.vespaxmlparser.FeedOperation; +import com.yahoo.yolean.Exceptions; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An instance of this class handles all requests from one client using VespaHttpClient. + * + * The implementation is based on the code from V2, but the object model is rewritten to simplify the logic and + * avoid using a threadpool that has no effect with all the extra that comes with it. V2 has one instance per thread + * on the client, while this is one instance for all threads. + * + * @author dybis + */ +class ClientFeederV3 { + + protected static final Logger log = Logger.getLogger(ClientFeederV3.class.getName()); + // This is for all clients on this gateway, for load balancing from client. + private final static AtomicInteger outstandingOperations = new AtomicInteger(0); + private final BlockingQueue<OperationStatus> feedReplies = new LinkedBlockingQueue<>(); + private final ReferencedResource<SharedSourceSession> sourceSession; + private final String clientId; + private final ReplyHandler feedReplyHandler; + private final Metric metric; + private Instant prevOpsPerSecTime = Instant.now(); + private double operationsForOpsPerSec = 0d; + private final Object monitor = new Object(); + private final StreamReaderV3 streamReaderV3; + private final AtomicInteger ongoingRequests = new AtomicInteger(0); + private final String hostName; + + ClientFeederV3( + ReferencedResource<SharedSourceSession> sourceSession, + FeedReaderFactory feedReaderFactory, + DocumentTypeManager docTypeManager, + String clientId, + Metric metric, + ReplyHandler feedReplyHandler) { + this.sourceSession = sourceSession; + this.clientId = clientId; + this.feedReplyHandler = feedReplyHandler; + this.metric = metric; + this.streamReaderV3 = new StreamReaderV3(feedReaderFactory, docTypeManager); + this.hostName = HostName.getLocalhost(); + } + + boolean timedOut() { + synchronized (monitor) { + return Instant.now().isAfter(prevOpsPerSecTime.plusSeconds(6000)) && ongoingRequests.get() == 0; + } + } + + void kill() { + try (ResourceReference ignored = sourceSession.getReference()) { + // No new requests should be sent to this object, but there can be old one, even though this is very unlikely. + while (ongoingRequests.get() > 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + return; + } + } + } catch (Exception e) { + log.log(Level.WARNING, "Failed to close reference to source session", e); + } + } + + private void transferPreviousRepliesToResponse(BlockingQueue<OperationStatus> operations) throws InterruptedException { + OperationStatus status = feedReplies.poll(); + while (status != null) { + outstandingOperations.decrementAndGet(); + operations.put(status); + status = feedReplies.poll(); + } + } + + HttpResponse handleRequest(HttpRequest request) throws IOException { + ongoingRequests.incrementAndGet(); + try { + FeederSettings feederSettings = new FeederSettings(request); + InputStream inputStream = StreamReaderV3.unzipStreamIfNeeded(request); + BlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<>(); + try { + feed(feederSettings, inputStream, replies); + synchronized (monitor) { + // Handshake requests do not have DATA_FORMAT, we do not want to give responses to + // handshakes as it won't be processed by the client. + if (request.getJDiscRequest().headers().get(Headers.DATA_FORMAT) != null) { + transferPreviousRepliesToResponse(replies); + } + } + } catch (InterruptedException e) { + log.log(Level.FINE, e, () -> "Feed handler was interrupted: " + e.getMessage()); + // NOP, just terminate + } catch (Throwable e) { + log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e); + } finally { + replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null)); + } + return new FeedResponse(200, replies, 3, clientId, outstandingOperations.get(), hostName); + } finally { + ongoingRequests.decrementAndGet(); + } + } + + private Optional<DocumentOperationMessageV3> pullMessageFromRequest(FeederSettings settings, + InputStream requestInputStream, + BlockingQueue<OperationStatus> repliesFromOldMessages) { + while (true) { + Optional<String> operationId; + try { + operationId = streamReaderV3.getNextOperationId(requestInputStream); + if (operationId.isEmpty()) return Optional.empty(); + } catch (IOException ioe) { + log.log(Level.FINE, () -> Exceptions.toMessageString(ioe)); + return Optional.empty(); + } + + try { + DocumentOperationMessageV3 message = getNextMessage(operationId.get(), requestInputStream, settings); + if (message != null) + setRoute(message, settings); + return Optional.ofNullable(message); + } catch (Exception e) { + log.log(Level.WARNING, () -> Exceptions.toMessageString(e)); + metric.add(MetricNames.PARSE_ERROR, 1, null); + + repliesFromOldMessages.add(new OperationStatus(Exceptions.toMessageString(e), + operationId.get(), + ErrorCode.ERROR, + false, + "")); + } + } + } + + private Result sendMessage(DocumentOperationMessageV3 msg) throws InterruptedException { + msg.getMessage().pushHandler(feedReplyHandler); + return sourceSession.getResource().sendMessageBlocking(msg.getMessage()); + } + + private void feed(FeederSettings settings, + InputStream requestInputStream, + BlockingQueue<OperationStatus> repliesFromOldMessages) throws InterruptedException { + while (true) { + Optional<DocumentOperationMessageV3> message = pullMessageFromRequest(settings, + requestInputStream, + repliesFromOldMessages); + + if (message.isEmpty()) break; + setMessageParameters(message.get(), settings); + + Result result; + try { + result = sendMessage(message.get()); + + } catch (RuntimeException e) { + repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), + Exceptions.toMessageString(e), + ErrorCode.ERROR, + message.get().getMessage())); + continue; + } + + if (result.isAccepted()) { + outstandingOperations.incrementAndGet(); + updateOpsPerSec(); + log(Level.FINE, "Sent message successfully, document id: ", message.get().getOperationId()); + } else if (!result.getError().isFatal()) { + repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), + result.getError().getMessage(), + ErrorCode.TRANSIENT_ERROR, + message.get().getMessage())); + } else { + repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), + result.getError().getMessage(), + ErrorCode.ERROR, + message.get().getMessage())); + } + } + } + + private OperationStatus createOperationStatus(String id, String message, ErrorCode code, Message msg) { + String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0 + ? msg.getTrace().toString() + : ""; + return new OperationStatus(message, id, code, false, traceMessage); + } + + // protected for mocking + /** Returns the next message in the stream, or null if none */ + protected DocumentOperationMessageV3 getNextMessage(String operationId, + InputStream requestInputStream, + FeederSettings settings) throws Exception { + FeedOperation operation = streamReaderV3.getNextOperation(requestInputStream, settings); + + // This is a bit hard to set up while testing, so we accept that things are not perfect. + if (sourceSession.getResource().session() != null) { + metric.set( + MetricNames.PENDING, + Double.valueOf(sourceSession.getResource().session().getPendingCount()), + null); + } + + DocumentOperationMessageV3 message = DocumentOperationMessageV3.create(operation, operationId, metric); + if (message == null) { + // typical end of feed + return null; + } + metric.add(MetricNames.NUM_OPERATIONS, 1, null /*metricContext*/); + log(Level.FINE, "Successfully deserialized document id: ", message.getOperationId()); + return message; + } + + private void setMessageParameters(DocumentOperationMessageV3 msg, FeederSettings settings) { + msg.getMessage().setContext(new ReplyContext(msg.getOperationId(), feedReplies)); + if (settings.traceLevel != null) { + msg.getMessage().getTrace().setLevel(settings.traceLevel); + } + if (settings.priority != null) { + try { + DocumentProtocol.Priority priority = DocumentProtocol.Priority.valueOf(settings.priority); + } + catch (IllegalArgumentException i) { + log.severe(i.getMessage()); + } + } + } + + private void setRoute(DocumentOperationMessageV3 msg, FeederSettings settings) { + if (settings.route != null) { + msg.getMessage().setRoute(settings.route); + } + } + + protected final void log(Level level, Object... msgParts) { + if (!log.isLoggable(level)) return; + + StringBuilder s = new StringBuilder(); + for (Object part : msgParts) + s.append(part.toString()); + log.log(level, s.toString()); + } + + private void updateOpsPerSec() { + Instant now = Instant.now(); + synchronized (monitor) { + if (now.plusSeconds(1).isAfter(prevOpsPerSecTime)) { + Duration duration = Duration.between(now, prevOpsPerSecTime); + double opsPerSec = operationsForOpsPerSec / (duration.toMillis() / 1000.); + metric.set(MetricNames.OPERATIONS_PER_SEC, opsPerSec, null /*metricContext*/); + operationsForOpsPerSec = 1.0d; + prevOpsPerSecTime = now; + } else { + operationsForOpsPerSec += 1.0d; + } + } + } +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java new file mode 100644 index 00000000000..13a12f707d9 --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientState.java @@ -0,0 +1,40 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.ReferencedResource; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.vespa.http.client.core.OperationStatus; + +import java.util.concurrent.BlockingQueue; + +/** + * The state of a client session, used to save replies when client disconnects. + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + */ +public class ClientState { + + public final int pending; + public final long creationTime; + public final BlockingQueue<OperationStatus> feedReplies; + public final ReferencedResource<SharedSourceSession> sourceSession; + public final Metric.Context metricContext; + + public final long prevOpsPerSecTime; // previous measurement time of OPS + public final double operationsForOpsPerSec; + + public ClientState(int pending, BlockingQueue<OperationStatus> feedReplies, + ReferencedResource<SharedSourceSession> sourceSession, Metric.Context metricContext, + long prevOpsPerSecTime, double operationsForOpsPerSec) { + super(); + this.pending = pending; + this.feedReplies = feedReplies; + this.sourceSession = sourceSession; + this.metricContext = metricContext; + creationTime = System.currentTimeMillis(); + this.prevOpsPerSecTime = prevOpsPerSecTime; + this.operationsForOpsPerSec = operationsForOpsPerSec; + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java new file mode 100644 index 00000000000..25bf5815907 --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/DocumentOperationMessageV3.java @@ -0,0 +1,83 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.jdisc.Metric; +import com.yahoo.messagebus.Message; +import com.yahoo.vespaxmlparser.FeedOperation; + +/** + * Keeps an operation with its message. + * + * This implementation is based on V2, but the code is restructured. + * + * @author dybis + */ +class DocumentOperationMessageV3 { + + private final String operationId; + private final Message message; + + private DocumentOperationMessageV3(String operationId, Message message) { + this.operationId = operationId; + this.message = message; + } + + Message getMessage() { + return message; + } + + String getOperationId() { + return operationId; + } + + private static DocumentOperationMessageV3 newUpdateMessage(FeedOperation op, String operationId) { + DocumentUpdate update = op.getDocumentUpdate(); + update.setCondition(op.getCondition()); + Message msg = new UpdateDocumentMessage(update); + + String id = (operationId == null) ? update.getId().toString() : operationId; + return new DocumentOperationMessageV3(id, msg); + } + + static DocumentOperationMessageV3 newRemoveMessage(FeedOperation op, String operationId) { + DocumentRemove remove = new DocumentRemove(op.getRemove()); + remove.setCondition(op.getCondition()); + Message msg = new RemoveDocumentMessage(remove); + + String id = (operationId == null) ? remove.getId().toString() : operationId; + return new DocumentOperationMessageV3(id, msg); + } + + private static DocumentOperationMessageV3 newPutMessage(FeedOperation op, String operationId) { + DocumentPut put = new DocumentPut(op.getDocument()); + put.setCondition(op.getCondition()); + Message msg = new PutDocumentMessage(put); + + String id = (operationId == null) ? put.getId().toString() : operationId; + return new DocumentOperationMessageV3(id, msg); + } + + static DocumentOperationMessageV3 create(FeedOperation operation, String operationId, Metric metric) { + switch (operation.getType()) { + case DOCUMENT: + metric.add(MetricNames.NUM_PUTS, 1, null /*metricContext*/); + return newPutMessage(operation, operationId); + case REMOVE: + metric.add(MetricNames.NUM_REMOVES, 1, null /*metricContext*/); + return newRemoveMessage(operation, operationId); + case UPDATE: + metric.add(MetricNames.NUM_UPDATES, 1, null /*metricContext*/); + return newUpdateMessage(operation, operationId); + default: + // typical end of feed + return null; + } + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java new file mode 100644 index 00000000000..a12cd1ec089 --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ErrorHttpResponse.java @@ -0,0 +1,24 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.text.Utf8; + +import java.io.IOException; +import java.io.OutputStream; + +public class ErrorHttpResponse extends HttpResponse { + + private final String msg; + + public ErrorHttpResponse(final int statusCode, final String msg) { + super(statusCode); + this.msg = msg; + } + + @Override + public void render(OutputStream outputStream) throws IOException { + outputStream.write(Utf8.toBytes(msg)); + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java new file mode 100644 index 00000000000..f99274d3f2b --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java @@ -0,0 +1,148 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.collections.Tuple2; +import com.yahoo.container.handler.threadpool.ContainerThreadPool; +import com.yahoo.container.jdisc.HttpRequest; +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; +import com.yahoo.container.jdisc.messagebus.SessionCache; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.metrics.DocumentApiMetrics; +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.metrics.simple.MetricReceiver; +import com.yahoo.vespa.http.client.core.Headers; + +import javax.inject.Inject; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; + +/** + * Accept feeds from outside of the Vespa cluster. + * + * @author Steinar Knutsen + */ +public class FeedHandler extends ThreadedHttpRequestHandler { + + protected final ReplyHandler feedReplyHandler; + private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(3)); + private static final Pattern USER_AGENT_PATTERN = Pattern.compile("vespa-http-client \\((.+)\\)"); + private final FeedHandlerV3 feedHandlerV3; + private final DocumentApiMetrics metricsHelper; + + @Inject + public FeedHandler(ContainerThreadPool threadpool, + Metric metric, + DocumentTypeManager documentTypeManager, + SessionCache sessionCache, + MetricReceiver metricReceiver) { + super(threadpool.executor(), metric); + metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server"); + feedHandlerV3 = new FeedHandlerV3(threadpool.executor(), metric, documentTypeManager, sessionCache, metricsHelper); + feedReplyHandler = new FeedReplyReader(metric, metricsHelper); + } + + private Tuple2<HttpResponse, Integer> checkProtocolVersion(HttpRequest request) { + return doCheckProtocolVersion(request.getJDiscRequest().headers().get(Headers.VERSION)); + } + + static Tuple2<HttpResponse, Integer> doCheckProtocolVersion(List<String> clientSupportedVersions) { + List<String> washedClientVersions = splitVersions(clientSupportedVersions); + + if (washedClientVersions == null || washedClientVersions.isEmpty()) { + return new Tuple2<>(new ErrorHttpResponse( + Headers.HTTP_NOT_ACCEPTABLE, + "Request did not contain " + Headers.VERSION + + "header. Server supports protocol versions " + + serverSupportedVersions), -1); + } + + //select the highest version supported by both parties + //this could be extended when we support a gazillion versions - but right now: keep it simple. + int version; + if (washedClientVersions.contains("3")) { + version = 3; + } else { + return new Tuple2<>(new ErrorHttpResponse( + Headers.HTTP_NOT_ACCEPTABLE, + "Could not parse " + Headers.VERSION + + "header of request (values: " + washedClientVersions + + "). Server supports protocol versions " + + serverSupportedVersions), -1); + } + return new Tuple2<>(null, version); + } + + private static List<String> splitVersions(List<String> clientSupportedVersions) { + List<String> splittedVersions = new ArrayList<>(); + for (String v : clientSupportedVersions) { + if (v == null || v.trim().isEmpty()) { + continue; + } + if (!v.contains(",")) { + splittedVersions.add(v.trim()); + continue; + } + for (String part : v.split(",")) { + part = part.trim(); + if (!part.isEmpty()) { + splittedVersions.add(part); + } + } + } + return splittedVersions; + } + + @Override + public HttpResponse handle(HttpRequest request) { + metricsHelper.reportHttpRequest(findClientVersion(request).orElse(null)); + Tuple2<HttpResponse, Integer> protocolVersion = checkProtocolVersion(request); + + if (protocolVersion.first != null) { + return protocolVersion.first; + } + return feedHandlerV3.handle(request); + } + + @Override + protected void writeErrorResponseOnOverload(Request request, ResponseHandler responseHandler) { + int responseCode = request.headers().getFirst(Headers.SILENTUPGRADE) != null ? 299 : 429; + responseHandler.handleResponse(new Response(responseCode)).close(null); + } + + private static Optional<String> findClientVersion(HttpRequest request) { + String versionHeader = request.getHeader(Headers.CLIENT_VERSION); + if (versionHeader != null) { + return Optional.of(versionHeader); + } + return Optional.ofNullable(request.getHeader("User-Agent")) + .map(USER_AGENT_PATTERN::matcher) + .filter(Matcher::matches) + .map(matcher -> matcher.group(1)); + } + + // Protected for testing + protected static InputStream unzipStreamIfNeeded(InputStream inputStream, HttpRequest httpRequest) + throws IOException { + String contentEncodingHeader = httpRequest.getHeader("content-encoding"); + if ("gzip".equals(contentEncodingHeader)) { + return new GZIPInputStream(inputStream); + } else { + return inputStream; + } + } + + @Override protected void destroy() { feedHandlerV3.destroy(); } +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java new file mode 100644 index 00000000000..c8828df6d54 --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java @@ -0,0 +1,152 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.container.jdisc.HttpRequest; +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; +import com.yahoo.container.jdisc.messagebus.SessionCache; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.metrics.DocumentApiMetrics; +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.ReferencedResource; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.vespa.http.client.core.Headers; +import com.yahoo.yolean.Exceptions; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This code is based on v2 code, however, in v3, one client has one ClientFeederV3 shared between all client threads. + * The new API has more logic for shutting down cleanly as the server is more likely to be upgraded. + * The code is restructured a bit. + * + * @author dybis + */ +public class FeedHandlerV3 extends ThreadedHttpRequestHandler { + + private DocumentTypeManager docTypeManager; + private final Map<String, ClientFeederV3> clientFeederByClientId = new HashMap<>(); + private final ScheduledThreadPoolExecutor cron; + private final SessionCache sessionCache; + protected final ReplyHandler feedReplyHandler; + private final Metric metric; + private final Object monitor = new Object(); + private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName()); + + public FeedHandlerV3(Executor executor, + Metric metric, + DocumentTypeManager documentTypeManager, + SessionCache sessionCache, + DocumentApiMetrics metricsHelper) { + super(executor, metric); + docTypeManager = documentTypeManager; + this.sessionCache = sessionCache; + feedReplyHandler = new FeedReplyReader(metric, metricsHelper); + cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feed-handler-v3-janitor")); + cron.scheduleWithFixedDelay(this::removeOldClients, 3, 3, TimeUnit.SECONDS); + this.metric = metric; + } + + public void injectDocumentManangerForTests(DocumentTypeManager docTypeManager) { + this.docTypeManager = docTypeManager; + } + + // TODO: If this is set up to run without first invoking the old FeedHandler code, we should + // verify the version header first. This is done in the old code. + @Override + public HttpResponse handle(HttpRequest request) { + String clientId = clientId(request); + ClientFeederV3 clientFeederV3; + synchronized (monitor) { + if (! clientFeederByClientId.containsKey(clientId)) { + SourceSessionParams sourceSessionParams = sourceSessionParams(request); + clientFeederByClientId.put(clientId, + new ClientFeederV3(retainSource(sessionCache, sourceSessionParams), + new FeedReaderFactory(true), //TODO make error debugging configurable + docTypeManager, + clientId, + metric, + feedReplyHandler)); + } + clientFeederV3 = clientFeederByClientId.get(clientId); + } + try { + return clientFeederV3.handleRequest(request); + } catch (UnknownClientException uce) { + String msg = Exceptions.toMessageString(uce); + log.log(Level.WARNING, msg); + return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.BAD_REQUEST, msg); + } catch (Exception e) { + String msg = "Could not initialize document parsing: " + Exceptions.toMessageString(e); + log.log(Level.WARNING, msg); + return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.INTERNAL_SERVER_ERROR, msg); + } + } + + // SessionCache is final and no easy way to mock it so we need this to be able to do testing. + protected ReferencedResource<SharedSourceSession> retainSource(SessionCache sessionCache, SourceSessionParams params) { + return sessionCache.retainSource(params); + } + + @Override + protected void destroy() { + // We are forking this to avoid that accidental de-referencing causes any random thread doing destruction. + // This caused a deadlock when the single Messenger thread in MessageBus was the last one referring this + // and started destructing something that required something only the messenger thread could provide. + Thread destroyer = new Thread(() -> { + cron.shutdown(); + synchronized (monitor) { + for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) { + iterator.next().kill(); + iterator.remove(); + } + } + }, "feed-handler-v3-adhoc-destroyer"); + destroyer.setDaemon(true); + destroyer.start(); + } + + private String clientId(HttpRequest request) { + String clientDictatedId = request.getHeader(Headers.CLIENT_ID); + if (clientDictatedId == null || clientDictatedId.isEmpty()) { + throw new IllegalArgumentException("Did not get any CLIENT_ID header (" + Headers.CLIENT_ID + ")"); + } + return clientDictatedId; + } + + private SourceSessionParams sourceSessionParams(HttpRequest request) { + SourceSessionParams params = new SourceSessionParams(); + String timeout = request.getHeader(Headers.TIMEOUT); + + if (timeout != null) { + try { + params.setTimeout(Double.parseDouble(timeout)); + } catch (NumberFormatException e) { + // NOP + } + } + return params; + } + + private void removeOldClients() { + synchronized (monitor) { + for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) { + ClientFeederV3 client = iterator.next(); + if (client.timedOut()) { + client.kill(); + iterator.remove(); + } + } + } + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java new file mode 100644 index 00000000000..069ccfd84f0 --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java @@ -0,0 +1,62 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.json.JsonFeedReader; +import com.yahoo.text.Utf8; +import com.yahoo.vespa.http.client.config.FeedParams; +import com.yahoo.vespaxmlparser.FeedReader; +import com.yahoo.vespaxmlparser.VespaXMLFeedReader; + +import java.io.InputStream; + +/** + * Class for creating FeedReader based on dataFormat. + * @author dybis + */ +public class FeedReaderFactory { + private static final int MARK_READLIMIT = 200; + + private final boolean debug; + public FeedReaderFactory(boolean debug) { + this.debug = debug; + } + + /** + * Creates FeedReader + * @param inputStream source of feed data + * @param docTypeManager handles the parsing of the document + * @param dataFormat specifies the format + * @return a feedreader + */ + public FeedReader createReader( + InputStream inputStream, + DocumentTypeManager docTypeManager, + FeedParams.DataFormat dataFormat) { + switch (dataFormat) { + case XML_UTF8: + byte [] peek = null; + int bytesPeeked = 0; + try { + if (debug && inputStream.markSupported()) { + peek = new byte[MARK_READLIMIT]; + inputStream.mark(MARK_READLIMIT); + bytesPeeked = inputStream.read(peek); + inputStream.reset(); + } + return new VespaXMLFeedReader(inputStream, docTypeManager); + } catch (Exception e) { + if (bytesPeeked > 0) { + throw new RuntimeException("Could not create VespaXMLFeedReader. First characters are: '" + Utf8.toString(peek, 0, bytesPeeked) + "'", e); + } else { + throw new RuntimeException("Could not create VespaXMLFeedReader.", e); + } + } + case JSON_UTF8: + return new JsonFeedReader(inputStream, docTypeManager); + default: + throw new IllegalStateException("Can not create feed reader for format: " + dataFormat); + } + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java new file mode 100644 index 00000000000..2fbb80d9fcc --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReplyReader.java @@ -0,0 +1,94 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentReply; +import com.yahoo.documentapi.metrics.DocumentApiMetrics; +import com.yahoo.documentapi.metrics.DocumentOperationStatus; +import com.yahoo.documentapi.metrics.DocumentOperationType; +import com.yahoo.jdisc.Metric; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Trace; +import com.yahoo.vespa.http.client.core.ErrorCode; +import com.yahoo.vespa.http.client.core.OperationStatus; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static java.util.function.Predicate.not; + +/** + * Catch message bus replies and make the available to a given session. + * + * @author Steinar Knutsen + */ +public class FeedReplyReader implements ReplyHandler { + + private static final Logger log = Logger.getLogger(FeedReplyReader.class.getName()); + private final Metric metric; + private final DocumentApiMetrics metricsHelper; + private final Metric.Context testAndSetMetricCtx; + + public FeedReplyReader(Metric metric, DocumentApiMetrics metricsHelper) { + this.metric = metric; + this.metricsHelper = metricsHelper; + this.testAndSetMetricCtx = metric.createContext(Map.of("operationType", "testAndSet")); + } + + @Override + public void handleReply(Reply reply) { + Object o = reply.getContext(); + if (!(o instanceof ReplyContext)) { + return; + } + ReplyContext context = (ReplyContext) o; + final double latencyInSeconds = (System.currentTimeMillis() - context.creationTime) / 1000.0d; + metric.set(MetricNames.LATENCY, latencyInSeconds, null); + + DocumentOperationType type = DocumentOperationType.fromMessage(reply.getMessage()); + boolean conditionMet = conditionMet(reply); + if (reply.hasErrors() && conditionMet) { + DocumentOperationStatus status = DocumentOperationStatus.fromMessageBusErrorCodes(reply.getErrorCodes()); + metricsHelper.reportFailure(type, status); + metric.add(MetricNames.FAILED, 1, null); + enqueue(context, reply.getError(0).getMessage(), ErrorCode.ERROR, false, reply.getTrace()); + } else { + metricsHelper.reportSuccessful(type, latencyInSeconds); + metric.add(MetricNames.SUCCEEDED, 1, null); + if ( ! conditionMet) + metric.add(MetricNames.CONDITION_NOT_MET, 1, testAndSetMetricCtx); + if ( ! updateNotFound(reply)) + metric.add(MetricNames.NOT_FOUND, 1, null); + enqueue(context, "Document processed.", ErrorCode.OK, !conditionMet, reply.getTrace()); + } + } + + private static boolean conditionMet(Reply reply) { + return ! reply.hasErrors() || reply.getError(0).getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED; + } + + private static boolean updateNotFound(Reply reply) { + return reply instanceof UpdateDocumentReply + && ! ((UpdateDocumentReply) reply).wasFound() + && reply.getMessage() instanceof UpdateDocumentMessage + && ((UpdateDocumentMessage) reply.getMessage()).getDocumentUpdate() != null + && ! ((UpdateDocumentMessage) reply.getMessage()).getDocumentUpdate().getCreateIfNonExistent(); + } + + private void enqueue(ReplyContext context, String message, ErrorCode status, boolean isConditionNotMet, Trace trace) { + try { + String traceMessage = (trace != null && trace.getLevel() > 0) ? trace.toString() : ""; + + context.feedReplies.put(new OperationStatus(message, context.docId, status, isConditionNotMet, traceMessage)); + } catch (InterruptedException e) { + log.log(Level.WARNING, + "Interrupted while enqueueing result from putting document with id: " + context.docId); + Thread.currentThread().interrupt(); + } + } +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java new file mode 100644 index 00000000000..3e2a4a8795f --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedResponse.java @@ -0,0 +1,88 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.vespa.http.client.core.Headers; +import com.yahoo.vespa.http.client.core.ErrorCode; +import com.yahoo.vespa.http.client.core.OperationStatus; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BlockingQueue; + +/** + * Reads feed responses from a queue and renders them continuously to the + * feeder. + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + * @since 5.1 + */ +public class FeedResponse extends HttpResponse { + + BlockingQueue<OperationStatus> operations; + + public FeedResponse( + int status, + BlockingQueue<OperationStatus> operations, + int protocolVersion, + String sessionId) { + super(status); + this.operations = operations; + headers().add(Headers.SESSION_ID, sessionId); + headers().add(Headers.VERSION, Integer.toString(protocolVersion)); + } + + // This is used by the V3 protocol. + public FeedResponse( + int status, + BlockingQueue<OperationStatus> operations, + int protocolVersion, + String sessionId, + int outstandingClientOperations, + String hostName) { + super(status); + this.operations = operations; + headers().add(Headers.SESSION_ID, sessionId); + headers().add(Headers.VERSION, Integer.toString(protocolVersion)); + headers().add(Headers.OUTSTANDING_REQUESTS, Integer.toString(outstandingClientOperations)); + headers().add(Headers.HOSTNAME, hostName); + } + + @Override + public void render(OutputStream output) throws IOException { + int i = 0; + OperationStatus status; + try { + status = operations.take(); + while (status.errorCode != ErrorCode.END_OF_FEED) { + output.write(toBytes(status.render())); + if (++i % 5 == 0) { + output.flush(); + } + status = operations.take(); + } + } catch (InterruptedException e) { + output.flush(); + } + } + + private byte[] toBytes(String s) { + byte[] b = new byte[s.length()]; + for (int i = 0; i < b.length; ++i) { + b[i] = (byte) s.charAt(i); // renderSingleStatus ensures ASCII only + } + return b; + } + + @Override + public String getContentType() { + return "text/plain"; + } + + @Override + public String getCharacterEncoding() { + return StandardCharsets.US_ASCII.name(); + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java new file mode 100644 index 00000000000..725349f6ebe --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java @@ -0,0 +1,33 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.container.jdisc.HttpRequest; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; +import com.yahoo.vespa.http.client.core.Headers; + +import java.util.Optional; + +/** + * Wrapper for the feed feederSettings read from HTTP request. + * + * @author Steinar Knutsen + */ +public class FeederSettings { + + private static final Route DEFAULT_ROUTE = Route.parse("default"); + public final boolean drain; // TODO: Implement drain=true + public final Route route; + public final DataFormat dataFormat; + public final String priority; + public final Integer traceLevel; + + public FeederSettings(HttpRequest request) { + this.drain = Optional.ofNullable(request.getHeader(Headers.DRAIN)).map(Boolean::parseBoolean).orElse(false); + this.route = Optional.ofNullable(request.getHeader(Headers.ROUTE)).map(Route::parse).orElse(DEFAULT_ROUTE); + this.dataFormat = Optional.ofNullable(request.getHeader(Headers.DATA_FORMAT)).map(DataFormat::valueOf).orElse(DataFormat.JSON_UTF8); + this.priority = request.getHeader(Headers.PRIORITY); + this.traceLevel = Optional.ofNullable(request.getHeader(Headers.TRACE_LEVEL)).map(Integer::valueOf).orElse(null); + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java index c2d9f0b292e..a5987f2398e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/documentapi/metrics/MetricNames.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/MetricNames.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.metrics; +package com.yahoo.vespa.http.server; /** * Place to store the metric names so where the metrics are logged can be found diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java new file mode 100644 index 00000000000..aa2651595ef --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ReplyContext.java @@ -0,0 +1,25 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.vespa.http.client.core.OperationStatus; + +import java.util.concurrent.BlockingQueue; + +/** + * Mapping between document ID and client session. + * + * @author Steinar Knutsen + */ +public class ReplyContext { + + public final String docId; + public final BlockingQueue<OperationStatus> feedReplies; + public final long creationTime; + + public ReplyContext(String docId, BlockingQueue<OperationStatus> feedReplies) { + this.docId = docId; + this.feedReplies = feedReplies; + this.creationTime = System.currentTimeMillis(); + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java new file mode 100644 index 00000000000..4ddc430b35f --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/StreamReaderV3.java @@ -0,0 +1,85 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.container.jdisc.HttpRequest; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.vespa.http.client.core.Encoder; +import com.yahoo.vespa.http.server.util.ByteLimitedInputStream; +import com.yahoo.vespaxmlparser.FeedOperation; +import com.yahoo.vespaxmlparser.FeedReader; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import java.util.logging.Logger; +import java.util.zip.GZIPInputStream; + +/** + * This code is based on v2 code, but restructured so stream reading code is in one dedicated class. + * @author dybis + */ +public class StreamReaderV3 { + + protected static final Logger log = Logger.getLogger(StreamReaderV3.class.getName()); + + private final FeedReaderFactory feedReaderFactory; + private final DocumentTypeManager docTypeManager; + + public StreamReaderV3(FeedReaderFactory feedReaderFactory, DocumentTypeManager docTypeManager) { + this.feedReaderFactory = feedReaderFactory; + this.docTypeManager = docTypeManager; + } + + public FeedOperation getNextOperation(InputStream requestInputStream, FeederSettings settings) throws Exception { + FeedOperation op = null; + + int length = readByteLength(requestInputStream); + + try (InputStream limitedInputStream = new ByteLimitedInputStream(requestInputStream, length)){ + FeedReader reader = feedReaderFactory.createReader(limitedInputStream, docTypeManager, settings.dataFormat); + op = reader.read(); + } + return op; + } + + public Optional<String> getNextOperationId(InputStream requestInputStream) throws IOException { + StringBuilder idBuf = new StringBuilder(100); + int c; + while ((c = requestInputStream.read()) != -1) { + if (c == 32) { + break; + } + idBuf.append((char) c); //it's ASCII + } + if (idBuf.length() == 0) { + return Optional.empty(); + } + return Optional.of(Encoder.decode(idBuf.toString(), new StringBuilder(idBuf.length())).toString()); + } + + private int readByteLength(InputStream requestInputStream) throws IOException { + StringBuilder lenBuf = new StringBuilder(8); + int c; + while ((c = requestInputStream.read()) != -1) { + if (c == 10) { + break; + } + lenBuf.append((char) c); //it's ASCII + } + if (lenBuf.length() == 0) { + throw new IllegalStateException("Operation length missing."); + } + return Integer.valueOf(lenBuf.toString(), 16); + } + + public static InputStream unzipStreamIfNeeded(final HttpRequest httpRequest) + throws IOException { + final String contentEncodingHeader = httpRequest.getHeader("content-encoding"); + if ("gzip".equals(contentEncodingHeader)) { + return new GZIPInputStream(httpRequest.getData()); + } else { + return httpRequest.getData(); + } + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java new file mode 100644 index 00000000000..5324b86a98a --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/UnknownClientException.java @@ -0,0 +1,14 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + * @since 5.5.0 + */ +public class UnknownClientException extends RuntimeException { + + public UnknownClientException(String message) { + super(message); + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java new file mode 100644 index 00000000000..ea01137d9af --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/package-info.java @@ -0,0 +1,7 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Server side of programmatic API for feeding into Vespa from outside of the + * clusters. Not a public API, not meant for direct use. + */ +@com.yahoo.api.annotations.PackageMarker +package com.yahoo.vespa.http.server; diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java new file mode 100644 index 00000000000..270ebe7796b --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java @@ -0,0 +1,99 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server.util; + +import java.io.IOException; +import java.io.InputStream; + +/** + * @author Einar M R Rosenvinge + * + * @since 5.1.23 + */ +public class ByteLimitedInputStream extends InputStream { + + private final InputStream wrappedStream; + private int remaining; + private int remainingWhenMarked; + + public ByteLimitedInputStream(InputStream wrappedStream, int limit) { + this.wrappedStream = wrappedStream; + if (limit < 0) { + throw new IllegalArgumentException("limit cannot be 0"); + } + this.remaining = limit; + } + + @Override + public int read() throws IOException { + if (remaining <= 0) { + return -1; + } + int retval = wrappedStream.read(); + if (retval < 0) { + remaining = 0; + } else { + --remaining; + } + return retval; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + if (remaining <= 0) { + return -1; + } + + int bytesToRead = Math.min(remaining, len); + int retval = wrappedStream.read(b, off, bytesToRead); + + if (retval < 0) { + //end of underlying stream was reached, and nothing was read. + remaining = 0; + } else { + remaining -= retval; + } + return retval; + } + + @Override + public int available() throws IOException { + return remaining; + } + + @Override + public void close() throws IOException { + //we will never close the underlying stream + if (remaining <= 0) { + return; + } + while (remaining > 0) { + skip(remaining); + } + } + + @Override + public synchronized void mark(int readlimit) { + wrappedStream.mark(readlimit); + remainingWhenMarked = remaining; + } + + @Override + public synchronized void reset() throws IOException { + wrappedStream.reset(); + remaining = remainingWhenMarked; + } + + @Override + public boolean markSupported() { + return wrappedStream.markSupported(); + } + +} |