diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java | 193 |
1 files changed, 2 insertions, 191 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java index 5294545ad50..bd7d195b48b 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java @@ -51,16 +51,8 @@ import java.util.zip.GZIPInputStream; */ public class FeedHandler extends LoggingRequestHandler { - private final ExecutorService workers = Executors.newCachedThreadPool(ThreadFactoryFactory.getThreadFactory("feedhandler")); - private final DocumentTypeManager docTypeManager; - private final Map<String, ClientState> clients; - private final ScheduledThreadPoolExecutor cron; - private final SessionCache sessionCache; protected final ReplyHandler feedReplyHandler; - private final AtomicLong sessionId; - private final Metric metric; - private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(2, 3)); - private final String localHostname; + private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(3)); private final FeedHandlerV3 feedHandlerV3; @Inject @@ -73,46 +65,7 @@ public class FeedHandler extends LoggingRequestHandler { super(parentCtx); DocumentApiMetrics metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server"); feedHandlerV3 = new FeedHandlerV3(parentCtx, documentManagerConfig, sessionCache, threadpoolConfig, metricsHelper); - docTypeManager = createDocumentManager(documentManagerConfig); - clients = new HashMap<>(); - this.sessionCache = sessionCache; - sessionId = new AtomicLong(ThreadLocalRandom.current().nextLong()); feedReplyHandler = new FeedReplyReader(parentCtx.getMetric(), metricsHelper); - cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feedhandler.cron")); - cron.scheduleWithFixedDelay(new CleanClients(), 16, 11, TimeUnit.MINUTES); - this.metric = parentCtx.getMetric(); - this.localHostname = resolveLocalHostname(); - } - - /** - * Exposed for creating mocks. - */ - protected DocumentTypeManager createDocumentManager(DocumentmanagerConfig documentManagerConfig) { - return new DocumentTypeManager(documentManagerConfig); - } - - private class CleanClients implements Runnable { - - @Override - public void run() { - List<ClientState> clientsToShutdown = new ArrayList<>(); - long now = System.currentTimeMillis(); - - synchronized (clients) { - for (Iterator<Map.Entry<String, ClientState>> i = clients - .entrySet().iterator(); i.hasNext();) { - ClientState client = i.next().getValue(); - - if (now - client.creationTime > 10 * 60 * 1000) { - clientsToShutdown.add(client); - i.remove(); - } - } - } - for (ClientState client : clientsToShutdown) { - client.sourceSession.getReference().close(); - } - } } private Tuple2<HttpResponse, Integer> checkProtocolVersion(HttpRequest request) { @@ -135,8 +88,6 @@ public class FeedHandler extends LoggingRequestHandler { int version; if (washedClientVersions.contains("3")) { version = 3; - } else if (washedClientVersions.contains("2")) { // TODO: Vespa 7: Remove support for Version 2 - version = 2; } else { return new Tuple2<>(new ErrorHttpResponse( Headers.HTTP_NOT_ACCEPTABLE, @@ -175,43 +126,7 @@ public class FeedHandler extends LoggingRequestHandler { if (protocolVersion.first != null) { return protocolVersion.first; } - if (3 == protocolVersion.second) { - return feedHandlerV3.handle(request); - } - final BlockingQueue<OperationStatus> operations = new LinkedBlockingQueue<>(); - Tuple2<String, Boolean> clientId; - clientId = sessionId(request); - - if (clientId.second != null && clientId.second) { - if (log.isLoggable(LogLevel.DEBUG)) { - log.log(LogLevel.DEBUG, "Received initial request from client with session ID " + - clientId.first + ", protocol version " + protocolVersion.second); - } - } - - Feeder feeder; - try { - feeder = createFeeder(request, request.getData(), operations, clientId.first, - clientId.second, protocolVersion.second); - // the synchronous FeedResponse blocks draining the InputStream, letting the Feeder read it - workers.submit(feeder); - } catch (UnknownClientException uce) { - String msg = Exceptions.toMessageString(uce); - log.log(LogLevel.WARNING, msg); - return new ErrorHttpResponse(Status.BAD_REQUEST, msg); - } catch (Exception e) { - String msg = "Could not initialize document parsing"; - log.log(LogLevel.WARNING, "Could not initialize document parsing", e); - return new ErrorHttpResponse(Status.INTERNAL_SERVER_ERROR, msg + ": " + Exceptions.toMessageString(e)); - } - - try { - feeder.waitForRequestReceived(); - } catch (InterruptedException e) { - return new ErrorHttpResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage()); - } - - return new FeedResponse(200, operations, protocolVersion.second, clientId.first); + return feedHandlerV3.handle(request); } // Protected for testing @@ -225,82 +140,6 @@ public class FeedHandler extends LoggingRequestHandler { } } - /** - * Exposed for creating mocks. - */ - protected Feeder createFeeder( - HttpRequest request, - InputStream requestInputStream, - BlockingQueue<OperationStatus> operations, - String clientId, - boolean sessionIdWasGeneratedJustNow, - int protocolVersion) throws Exception { - if (protocolVersion != 2) - throw new IllegalStateException("Protocol version " + protocolVersion + " not supported."); - - return new Feeder( - unzipStreamIfNeeded(requestInputStream, request), - new FeedReaderFactory(), - docTypeManager, - operations, - popClient(clientId), - new FeederSettings(request), - clientId, - sessionIdWasGeneratedJustNow, - sourceSessionParams(request), - sessionCache, - this, - metric, - feedReplyHandler, - localHostname); - } - - private Tuple2<String, Boolean> sessionId(HttpRequest request) { - boolean sessionIdWasGeneratedJustNow = false; - String sessionId = request.getHeader(Headers.SESSION_ID); - if (sessionId == null) { - sessionId = Long.toString(this.sessionId.incrementAndGet()) + "-" + - remoteHostAddressAndPort(request.getJDiscRequest()) + "#" + - localHostname; - sessionIdWasGeneratedJustNow = true; - } - return new Tuple2<>(sessionId, sessionIdWasGeneratedJustNow); - } - - private static String remoteHostAddressAndPort(com.yahoo.jdisc.http.HttpRequest httpRequest) { - SocketAddress remoteAddress = httpRequest.getRemoteAddress(); - if (remoteAddress instanceof InetSocketAddress) { - InetSocketAddress isa = (InetSocketAddress) remoteAddress; - return isa.getAddress().getHostAddress() + "-" + isa.getPort(); - } - return ""; - } - - private static String resolveLocalHostname() { - String hostname = HostName.getLocalhost(); - if (hostname.equals("localhost")) { - return ""; - } - return hostname; - } - - /** - * Exposed for use when creating mocks. - */ - protected SourceSessionParams sourceSessionParams(HttpRequest request) { - SourceSessionParams params = new SourceSessionParams(); - String timeout = request.getHeader(Headers.TIMEOUT); - - if (timeout != null) { - try { - params.setTimeout(Double.parseDouble(timeout)); - } catch (NumberFormatException e) { - // NOP - } - } - return params; - } - @Override protected void destroy() { feedHandlerV3.destroy(); @@ -316,33 +155,5 @@ public class FeedHandler extends LoggingRequestHandler { private void internalDestroy() { super.destroy(); - workers.shutdown(); - cron.shutdown(); - synchronized (clients) { - for (ClientState client : clients.values()) { - client.sourceSession.getReference().close(); - } - clients.clear(); - } - } - - void putClient(final String sessionId, final ClientState value) { - synchronized (clients) { - clients.put(sessionId, value); - } } - - ClientState popClient(String sessionId) { - synchronized (clients) { - return clients.remove(sessionId); - } - } - - /** - * Guess what, testing only. - */ - void forceRunCleanClients() { - new CleanClients().run(); - } - } |