summaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java112
1 files changed, 29 insertions, 83 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
index 4587c84f9dc..892f3763805 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
@@ -1,16 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedhandler;
-import com.google.inject.Inject;
import com.yahoo.clientmetrics.RouteMetricSet;
-import com.yahoo.cloud.config.ClusterListConfig;
-import com.yahoo.cloud.config.SlobroksConfig;
-import com.yahoo.container.jdisc.EmptyResponse;
import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.protect.Error;
-import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.feedapi.DocprocMessageProcessor;
import com.yahoo.feedapi.FeedContext;
import com.yahoo.feedapi.Feeder;
import com.yahoo.feedapi.JsonFeeder;
@@ -18,13 +11,8 @@ import com.yahoo.feedapi.MessagePropertyProcessor;
import com.yahoo.feedapi.SimpleFeedAccess;
import com.yahoo.feedapi.SingleSender;
import com.yahoo.feedapi.XMLFeeder;
-import com.yahoo.jdisc.Metric;
-import com.yahoo.vespa.config.content.LoadTypeConfig;
-import com.yahoo.vespaclient.config.FeederConfig;
import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -37,89 +25,47 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
public static final String JSON_INPUT = "jsonInput";
- private final AtomicInteger busyThreads = new AtomicInteger(0);
- private final int maxBusyThreads;
-
- @SuppressWarnings("unused")
- @Inject
- public VespaFeedHandler(FeederConfig feederConfig,
- LoadTypeConfig loadTypeConfig,
- DocumentmanagerConfig documentmanagerConfig,
- SlobroksConfig slobroksConfig,
- Executor executor,
- Metric metric) {
- super(feederConfig, loadTypeConfig, documentmanagerConfig, slobroksConfig, executor, metric);
- this.maxBusyThreads = feederConfig.maxbusythreads();
- }
-
- private VespaFeedHandler(FeedContext context, Executor executor) {
- super(context, executor);
- this.maxBusyThreads = 32;
+ private VespaFeedHandler(FeedContext context) {
+ super(context);
}
- public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) {
- return new VespaFeedHandler(context, executor);
+ public static VespaFeedHandler createFromContext(FeedContext context) {
+ return new VespaFeedHandler(context);
}
- @Override
- public HttpResponse handle(HttpRequest request) {
- return handle(request, null, 1);
- }
-
- public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) {
- if (request.getProperty("status") != null) {
- return new MetricResponse(context.getMetrics().getMetricSet());
- }
- try {
- int busy = busyThreads.incrementAndGet();
- if (busy > maxBusyThreads)
- return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE);
+ public FeedResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) {
+ MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);
- boolean asynchronous = request.getBooleanProperty("asynchronous");
+ String route = properties.getRoute().toString();
+ FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));
- MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);
+ SingleSender sender = new SingleSender(response, getSharedSender(route));
+ sender.addMessageProcessor(properties);
+ ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender);
+ Feeder feeder = createFeeder(feedAccess, request);
+ feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
+ feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
+ response.setAbortOnFeedError(properties.getAbortOnFeedError());
- String route = properties.getRoute().toString();
- FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));
-
- SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
- sender.addMessageProcessor(properties);
- sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
- ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender);
- Feeder feeder = createFeeder(feedAccess, request);
- feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
- feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
- response.setAbortOnFeedError(properties.getAbortOnFeedError());
-
- List<String> errors = feeder.parse();
- for (String s : errors) {
- response.addXMLParseError(s);
- }
- if (errors.size() > 0 && feeder instanceof XMLFeeder) {
- response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json.");
- }
-
- sender.done();
- feedAccess.close();
+ List<String> errors = feeder.parse();
+ for (String s : errors) {
+ response.addXMLParseError(s);
+ }
- if (asynchronous) {
- return response;
- }
- long millis = getTimeoutMillis(request);
- boolean completed = sender.waitForPending(millis);
- if (!completed) {
- response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses");
- }
- response.done();
- return response;
- } finally {
- busyThreads.decrementAndGet();
+ sender.done();
+ feedAccess.close();
+ long millis = getTimeoutMillis(request);
+ boolean completed = sender.waitForPending(millis);
+ if (!completed) {
+ response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses");
}
+ response.done();
+ return response;
+
}
private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) {
- String contentType = request.getHeader("Content-Type");
- if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) {
+ if (Boolean.valueOf(request.getProperty(JSON_INPUT))) {
return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));
} else {
return new XMLFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));