diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java new file mode 100644 index 00000000000..c8828df6d54 --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java @@ -0,0 +1,152 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.container.jdisc.HttpRequest; +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; +import com.yahoo.container.jdisc.messagebus.SessionCache; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.metrics.DocumentApiMetrics; +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.ReferencedResource; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.vespa.http.client.core.Headers; +import com.yahoo.yolean.Exceptions; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This code is based on v2 code, however, in v3, one client has one ClientFeederV3 shared between all client threads. + * The new API has more logic for shutting down cleanly as the server is more likely to be upgraded. + * The code is restructured a bit. + * + * @author dybis + */ +public class FeedHandlerV3 extends ThreadedHttpRequestHandler { + + private DocumentTypeManager docTypeManager; + private final Map<String, ClientFeederV3> clientFeederByClientId = new HashMap<>(); + private final ScheduledThreadPoolExecutor cron; + private final SessionCache sessionCache; + protected final ReplyHandler feedReplyHandler; + private final Metric metric; + private final Object monitor = new Object(); + private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName()); + + public FeedHandlerV3(Executor executor, + Metric metric, + DocumentTypeManager documentTypeManager, + SessionCache sessionCache, + DocumentApiMetrics metricsHelper) { + super(executor, metric); + docTypeManager = documentTypeManager; + this.sessionCache = sessionCache; + feedReplyHandler = new FeedReplyReader(metric, metricsHelper); + cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feed-handler-v3-janitor")); + cron.scheduleWithFixedDelay(this::removeOldClients, 3, 3, TimeUnit.SECONDS); + this.metric = metric; + } + + public void injectDocumentManangerForTests(DocumentTypeManager docTypeManager) { + this.docTypeManager = docTypeManager; + } + + // TODO: If this is set up to run without first invoking the old FeedHandler code, we should + // verify the version header first. This is done in the old code. + @Override + public HttpResponse handle(HttpRequest request) { + String clientId = clientId(request); + ClientFeederV3 clientFeederV3; + synchronized (monitor) { + if (! clientFeederByClientId.containsKey(clientId)) { + SourceSessionParams sourceSessionParams = sourceSessionParams(request); + clientFeederByClientId.put(clientId, + new ClientFeederV3(retainSource(sessionCache, sourceSessionParams), + new FeedReaderFactory(true), //TODO make error debugging configurable + docTypeManager, + clientId, + metric, + feedReplyHandler)); + } + clientFeederV3 = clientFeederByClientId.get(clientId); + } + try { + return clientFeederV3.handleRequest(request); + } catch (UnknownClientException uce) { + String msg = Exceptions.toMessageString(uce); + log.log(Level.WARNING, msg); + return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.BAD_REQUEST, msg); + } catch (Exception e) { + String msg = "Could not initialize document parsing: " + Exceptions.toMessageString(e); + log.log(Level.WARNING, msg); + return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.INTERNAL_SERVER_ERROR, msg); + } + } + + // SessionCache is final and no easy way to mock it so we need this to be able to do testing. + protected ReferencedResource<SharedSourceSession> retainSource(SessionCache sessionCache, SourceSessionParams params) { + return sessionCache.retainSource(params); + } + + @Override + protected void destroy() { + // We are forking this to avoid that accidental de-referencing causes any random thread doing destruction. + // This caused a deadlock when the single Messenger thread in MessageBus was the last one referring this + // and started destructing something that required something only the messenger thread could provide. + Thread destroyer = new Thread(() -> { + cron.shutdown(); + synchronized (monitor) { + for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) { + iterator.next().kill(); + iterator.remove(); + } + } + }, "feed-handler-v3-adhoc-destroyer"); + destroyer.setDaemon(true); + destroyer.start(); + } + + private String clientId(HttpRequest request) { + String clientDictatedId = request.getHeader(Headers.CLIENT_ID); + if (clientDictatedId == null || clientDictatedId.isEmpty()) { + throw new IllegalArgumentException("Did not get any CLIENT_ID header (" + Headers.CLIENT_ID + ")"); + } + return clientDictatedId; + } + + private SourceSessionParams sourceSessionParams(HttpRequest request) { + SourceSessionParams params = new SourceSessionParams(); + String timeout = request.getHeader(Headers.TIMEOUT); + + if (timeout != null) { + try { + params.setTimeout(Double.parseDouble(timeout)); + } catch (NumberFormatException e) { + // NOP + } + } + return params; + } + + private void removeOldClients() { + synchronized (monitor) { + for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) { + ClientFeederV3 client = iterator.next(); + if (client.timedOut()) { + client.kill(); + iterator.remove(); + } + } + } + } + +} |