diff options
author | Arne H Juul <arnej@yahoo-inc.com> | 2016-12-22 14:56:47 +0100 |
---|---|---|
committer | Arne H Juul <arnej@yahoo-inc.com> | 2016-12-22 14:56:47 +0100 |
commit | d55caa9d68a66b87f5ecea344716d12fb19e6a27 (patch) | |
tree | 7e838389546c449516db87832a8dcf190c2806d2 /vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java | |
parent | 063a290e2bc16502e7cf691d29f3105c07cb768c (diff) |
limit the number of threads blocked in old /feed/ API
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java')
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java | 89 |
1 files changed, 54 insertions, 35 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 08e1ca0482f..b00d47b8e61 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -5,6 +5,7 @@ import com.google.inject.Inject; import com.yahoo.clientmetrics.RouteMetricSet; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.cloud.config.SlobroksConfig; +import com.yahoo.container.jdisc.EmptyResponse; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.document.config.DocumentmanagerConfig; @@ -20,7 +21,10 @@ import com.yahoo.vespa.config.content.LoadTypeConfig; import com.yahoo.vespaclient.config.FeederConfig; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.Executor; +import java.util.logging.Logger; + /** * Feed documents from a com.yahoo.container.handler.Request. @@ -30,8 +34,13 @@ import java.util.concurrent.Executor; */ public final class VespaFeedHandler extends VespaFeedHandlerBase { + private final static Logger log = Logger.getLogger(VespaFeedHandler.class.getName()); public static final String JSON_INPUT = "jsonInput"; + private AtomicInteger busyThreads = new AtomicInteger(0); + private final int maxBusyThreads; + + @Inject public VespaFeedHandler(FeederConfig feederConfig, LoadTypeConfig loadTypeConfig, @@ -41,10 +50,12 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { Executor executor, Metric metric) throws Exception { super(feederConfig, loadTypeConfig, documentmanagerConfig, slobroksConfig, clusterListConfig, executor, metric); + this.maxBusyThreads = feederConfig.maxbusythreads(); } VespaFeedHandler(FeedContext context, Executor executor) throws Exception { super(context, executor); + this.maxBusyThreads = 32; } public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) throws Exception { @@ -60,42 +71,50 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { if (request.getProperty("status") != null) { return new MetricResponse(context.getMetrics().getMetricSet()); } - - boolean asynchronous = request.getBooleanProperty("asynchronous"); - - MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); - - String route = properties.getRoute().toString(); - FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); - - SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); - sender.addMessageProcessor(properties); - sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - - Feeder feeder = createFeeder(sender, request); - feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); - feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); - response.setAbortOnFeedError(properties.getAbortOnFeedError()); - - List<String> errors = feeder.parse(); - for (String s : errors) { - response.addXMLParseError(s); - } - if (errors.size() > 0 && feeder instanceof XMLFeeder) { - response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); - } - - sender.done(); - - if (asynchronous) { - return response; + try { + int busy = busyThreads.incrementAndGet(); + if (busy > maxBusyThreads) { + log.warning("too many threads ["+busy+"] busy, returning SERVICE UNAVAILABLE"); + return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE); + } + boolean asynchronous = request.getBooleanProperty("asynchronous"); + + MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); + + String route = properties.getRoute().toString(); + FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); + + SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); + sender.addMessageProcessor(properties); + sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); + + Feeder feeder = createFeeder(sender, request); + feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); + feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); + response.setAbortOnFeedError(properties.getAbortOnFeedError()); + + List<String> errors = feeder.parse(); + for (String s : errors) { + response.addXMLParseError(s); + } + if (errors.size() > 0 && feeder instanceof XMLFeeder) { + response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); + } + + sender.done(); + + if (asynchronous) { + return response; + } + long millis = getTimeoutMillis(request); + boolean completed = sender.waitForPending(millis); + if ( ! completed) + response.addError("Timed out after "+millis+" ms waiting for responses"); + response.done(); + return response; + } finally { + busyThreads.decrementAndGet(); } - long millis = getTimeoutMillis(request); - boolean completed = sender.waitForPending(millis); - if ( ! completed) - response.addError("Timed out after "+millis+" ms waiting for responses"); - response.done(); - return response; } private Feeder createFeeder(SingleSender sender, HttpRequest request) { |