aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java193
1 files changed, 2 insertions, 191 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
index 5294545ad50..bd7d195b48b 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
@@ -51,16 +51,8 @@ import java.util.zip.GZIPInputStream;
*/
public class FeedHandler extends LoggingRequestHandler {
- private final ExecutorService workers = Executors.newCachedThreadPool(ThreadFactoryFactory.getThreadFactory("feedhandler"));
- private final DocumentTypeManager docTypeManager;
- private final Map<String, ClientState> clients;
- private final ScheduledThreadPoolExecutor cron;
- private final SessionCache sessionCache;
protected final ReplyHandler feedReplyHandler;
- private final AtomicLong sessionId;
- private final Metric metric;
- private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(2, 3));
- private final String localHostname;
+ private static final List<Integer> serverSupportedVersions = Collections.unmodifiableList(Arrays.asList(3));
private final FeedHandlerV3 feedHandlerV3;
@Inject
@@ -73,46 +65,7 @@ public class FeedHandler extends LoggingRequestHandler {
super(parentCtx);
DocumentApiMetrics metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server");
feedHandlerV3 = new FeedHandlerV3(parentCtx, documentManagerConfig, sessionCache, threadpoolConfig, metricsHelper);
- docTypeManager = createDocumentManager(documentManagerConfig);
- clients = new HashMap<>();
- this.sessionCache = sessionCache;
- sessionId = new AtomicLong(ThreadLocalRandom.current().nextLong());
feedReplyHandler = new FeedReplyReader(parentCtx.getMetric(), metricsHelper);
- cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feedhandler.cron"));
- cron.scheduleWithFixedDelay(new CleanClients(), 16, 11, TimeUnit.MINUTES);
- this.metric = parentCtx.getMetric();
- this.localHostname = resolveLocalHostname();
- }
-
- /**
- * Exposed for creating mocks.
- */
- protected DocumentTypeManager createDocumentManager(DocumentmanagerConfig documentManagerConfig) {
- return new DocumentTypeManager(documentManagerConfig);
- }
-
- private class CleanClients implements Runnable {
-
- @Override
- public void run() {
- List<ClientState> clientsToShutdown = new ArrayList<>();
- long now = System.currentTimeMillis();
-
- synchronized (clients) {
- for (Iterator<Map.Entry<String, ClientState>> i = clients
- .entrySet().iterator(); i.hasNext();) {
- ClientState client = i.next().getValue();
-
- if (now - client.creationTime > 10 * 60 * 1000) {
- clientsToShutdown.add(client);
- i.remove();
- }
- }
- }
- for (ClientState client : clientsToShutdown) {
- client.sourceSession.getReference().close();
- }
- }
}
private Tuple2<HttpResponse, Integer> checkProtocolVersion(HttpRequest request) {
@@ -135,8 +88,6 @@ public class FeedHandler extends LoggingRequestHandler {
int version;
if (washedClientVersions.contains("3")) {
version = 3;
- } else if (washedClientVersions.contains("2")) { // TODO: Vespa 7: Remove support for Version 2
- version = 2;
} else {
return new Tuple2<>(new ErrorHttpResponse(
Headers.HTTP_NOT_ACCEPTABLE,
@@ -175,43 +126,7 @@ public class FeedHandler extends LoggingRequestHandler {
if (protocolVersion.first != null) {
return protocolVersion.first;
}
- if (3 == protocolVersion.second) {
- return feedHandlerV3.handle(request);
- }
- final BlockingQueue<OperationStatus> operations = new LinkedBlockingQueue<>();
- Tuple2<String, Boolean> clientId;
- clientId = sessionId(request);
-
- if (clientId.second != null && clientId.second) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "Received initial request from client with session ID " +
- clientId.first + ", protocol version " + protocolVersion.second);
- }
- }
-
- Feeder feeder;
- try {
- feeder = createFeeder(request, request.getData(), operations, clientId.first,
- clientId.second, protocolVersion.second);
- // the synchronous FeedResponse blocks draining the InputStream, letting the Feeder read it
- workers.submit(feeder);
- } catch (UnknownClientException uce) {
- String msg = Exceptions.toMessageString(uce);
- log.log(LogLevel.WARNING, msg);
- return new ErrorHttpResponse(Status.BAD_REQUEST, msg);
- } catch (Exception e) {
- String msg = "Could not initialize document parsing";
- log.log(LogLevel.WARNING, "Could not initialize document parsing", e);
- return new ErrorHttpResponse(Status.INTERNAL_SERVER_ERROR, msg + ": " + Exceptions.toMessageString(e));
- }
-
- try {
- feeder.waitForRequestReceived();
- } catch (InterruptedException e) {
- return new ErrorHttpResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
-
- return new FeedResponse(200, operations, protocolVersion.second, clientId.first);
+ return feedHandlerV3.handle(request);
}
// Protected for testing
@@ -225,82 +140,6 @@ public class FeedHandler extends LoggingRequestHandler {
}
}
- /**
- * Exposed for creating mocks.
- */
- protected Feeder createFeeder(
- HttpRequest request,
- InputStream requestInputStream,
- BlockingQueue<OperationStatus> operations,
- String clientId,
- boolean sessionIdWasGeneratedJustNow,
- int protocolVersion) throws Exception {
- if (protocolVersion != 2)
- throw new IllegalStateException("Protocol version " + protocolVersion + " not supported.");
-
- return new Feeder(
- unzipStreamIfNeeded(requestInputStream, request),
- new FeedReaderFactory(),
- docTypeManager,
- operations,
- popClient(clientId),
- new FeederSettings(request),
- clientId,
- sessionIdWasGeneratedJustNow,
- sourceSessionParams(request),
- sessionCache,
- this,
- metric,
- feedReplyHandler,
- localHostname);
- }
-
- private Tuple2<String, Boolean> sessionId(HttpRequest request) {
- boolean sessionIdWasGeneratedJustNow = false;
- String sessionId = request.getHeader(Headers.SESSION_ID);
- if (sessionId == null) {
- sessionId = Long.toString(this.sessionId.incrementAndGet()) + "-" +
- remoteHostAddressAndPort(request.getJDiscRequest()) + "#" +
- localHostname;
- sessionIdWasGeneratedJustNow = true;
- }
- return new Tuple2<>(sessionId, sessionIdWasGeneratedJustNow);
- }
-
- private static String remoteHostAddressAndPort(com.yahoo.jdisc.http.HttpRequest httpRequest) {
- SocketAddress remoteAddress = httpRequest.getRemoteAddress();
- if (remoteAddress instanceof InetSocketAddress) {
- InetSocketAddress isa = (InetSocketAddress) remoteAddress;
- return isa.getAddress().getHostAddress() + "-" + isa.getPort();
- }
- return "";
- }
-
- private static String resolveLocalHostname() {
- String hostname = HostName.getLocalhost();
- if (hostname.equals("localhost")) {
- return "";
- }
- return hostname;
- }
-
- /**
- * Exposed for use when creating mocks.
- */
- protected SourceSessionParams sourceSessionParams(HttpRequest request) {
- SourceSessionParams params = new SourceSessionParams();
- String timeout = request.getHeader(Headers.TIMEOUT);
-
- if (timeout != null) {
- try {
- params.setTimeout(Double.parseDouble(timeout));
- } catch (NumberFormatException e) {
- // NOP
- }
- }
- return params;
- }
-
@Override
protected void destroy() {
feedHandlerV3.destroy();
@@ -316,33 +155,5 @@ public class FeedHandler extends LoggingRequestHandler {
private void internalDestroy() {
super.destroy();
- workers.shutdown();
- cron.shutdown();
- synchronized (clients) {
- for (ClientState client : clients.values()) {
- client.sourceSession.getReference().close();
- }
- clients.clear();
- }
- }
-
- void putClient(final String sessionId, final ClientState value) {
- synchronized (clients) {
- clients.put(sessionId, value);
- }
}
-
- ClientState popClient(String sessionId) {
- synchronized (clients) {
- return clients.remove(sessionId);
- }
- }
-
- /**
- * Guess what, testing only.
- */
- void forceRunCleanClients() {
- new CleanClients().run();
- }
-
}