summaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src
diff options
context:
space:
mode:
authorArne H Juul <arnej@yahoo-inc.com>2016-12-22 14:56:47 +0100
committerArne H Juul <arnej@yahoo-inc.com>2016-12-22 14:56:47 +0100
commitd55caa9d68a66b87f5ecea344716d12fb19e6a27 (patch)
tree7e838389546c449516db87832a8dcf190c2806d2 /vespaclient-core/src
parent063a290e2bc16502e7cf691d29f3105c07cb768c (diff)
limit the number of threads blocked in old /feed/ API
Diffstat (limited to 'vespaclient-core/src')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java89
-rw-r--r--vespaclient-core/src/main/resources/configdefinitions/feeder.def3
2 files changed, 57 insertions, 35 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
index 08e1ca0482f..b00d47b8e61 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
@@ -5,6 +5,7 @@ import com.google.inject.Inject;
import com.yahoo.clientmetrics.RouteMetricSet;
import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.cloud.config.SlobroksConfig;
+import com.yahoo.container.jdisc.EmptyResponse;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.document.config.DocumentmanagerConfig;
@@ -20,7 +21,10 @@ import com.yahoo.vespa.config.content.LoadTypeConfig;
import com.yahoo.vespaclient.config.FeederConfig;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+
/**
* Feed documents from a com.yahoo.container.handler.Request.
@@ -30,8 +34,13 @@ import java.util.concurrent.Executor;
*/
public final class VespaFeedHandler extends VespaFeedHandlerBase {
+ private final static Logger log = Logger.getLogger(VespaFeedHandler.class.getName());
public static final String JSON_INPUT = "jsonInput";
+ private AtomicInteger busyThreads = new AtomicInteger(0);
+ private final int maxBusyThreads;
+
+
@Inject
public VespaFeedHandler(FeederConfig feederConfig,
LoadTypeConfig loadTypeConfig,
@@ -41,10 +50,12 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
Executor executor,
Metric metric) throws Exception {
super(feederConfig, loadTypeConfig, documentmanagerConfig, slobroksConfig, clusterListConfig, executor, metric);
+ this.maxBusyThreads = feederConfig.maxbusythreads();
}
VespaFeedHandler(FeedContext context, Executor executor) throws Exception {
super(context, executor);
+ this.maxBusyThreads = 32;
}
public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) throws Exception {
@@ -60,42 +71,50 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
if (request.getProperty("status") != null) {
return new MetricResponse(context.getMetrics().getMetricSet());
}
-
- boolean asynchronous = request.getBooleanProperty("asynchronous");
-
- MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);
-
- String route = properties.getRoute().toString();
- FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));
-
- SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
- sender.addMessageProcessor(properties);
- sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
-
- Feeder feeder = createFeeder(sender, request);
- feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
- feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
- response.setAbortOnFeedError(properties.getAbortOnFeedError());
-
- List<String> errors = feeder.parse();
- for (String s : errors) {
- response.addXMLParseError(s);
- }
- if (errors.size() > 0 && feeder instanceof XMLFeeder) {
- response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json.");
- }
-
- sender.done();
-
- if (asynchronous) {
- return response;
+ try {
+ int busy = busyThreads.incrementAndGet();
+ if (busy > maxBusyThreads) {
+ log.warning("too many threads ["+busy+"] busy, returning SERVICE UNAVAILABLE");
+ return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE);
+ }
+ boolean asynchronous = request.getBooleanProperty("asynchronous");
+
+ MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);
+
+ String route = properties.getRoute().toString();
+ FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));
+
+ SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
+ sender.addMessageProcessor(properties);
+ sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
+
+ Feeder feeder = createFeeder(sender, request);
+ feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
+ feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
+ response.setAbortOnFeedError(properties.getAbortOnFeedError());
+
+ List<String> errors = feeder.parse();
+ for (String s : errors) {
+ response.addXMLParseError(s);
+ }
+ if (errors.size() > 0 && feeder instanceof XMLFeeder) {
+ response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json.");
+ }
+
+ sender.done();
+
+ if (asynchronous) {
+ return response;
+ }
+ long millis = getTimeoutMillis(request);
+ boolean completed = sender.waitForPending(millis);
+ if ( ! completed)
+ response.addError("Timed out after "+millis+" ms waiting for responses");
+ response.done();
+ return response;
+ } finally {
+ busyThreads.decrementAndGet();
}
- long millis = getTimeoutMillis(request);
- boolean completed = sender.waitForPending(millis);
- if ( ! completed)
- response.addError("Timed out after "+millis+" ms waiting for responses");
- response.done();
- return response;
}
private Feeder createFeeder(SingleSender sender, HttpRequest request) {
diff --git a/vespaclient-core/src/main/resources/configdefinitions/feeder.def b/vespaclient-core/src/main/resources/configdefinitions/feeder.def
index c5a77ef59e6..e324d98e7a8 100644
--- a/vespaclient-core/src/main/resources/configdefinitions/feeder.def
+++ b/vespaclient-core/src/main/resources/configdefinitions/feeder.def
@@ -18,6 +18,9 @@ maxpendingdocs int default=0
## Max number of bytes in pending operations.
maxpendingbytes int default=0
+## Max number of busy threads
+maxbusythreads int default=32
+
## Max number of operations to perform per second (0 == no max)
maxfeedrate double default=0.0