diff options
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java')
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java | 87 |
1 files changed, 32 insertions, 55 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 59cd86670e1..ca2da28d0dd 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -2,11 +2,8 @@ package com.yahoo.feedhandler; import com.yahoo.clientmetrics.RouteMetricSet; -import com.yahoo.container.jdisc.EmptyResponse; import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.protect.Error; -import com.yahoo.feedapi.DocprocMessageProcessor; import com.yahoo.feedapi.FeedContext; import com.yahoo.feedapi.Feeder; import com.yahoo.feedapi.JsonFeeder; @@ -16,8 +13,6 @@ import com.yahoo.feedapi.SingleSender; import com.yahoo.feedapi.XMLFeeder; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; /** @@ -30,68 +25,50 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { public static final String JSON_INPUT = "jsonInput"; - private final AtomicInteger busyThreads = new AtomicInteger(0); - private final int maxBusyThreads; - - private VespaFeedHandler(FeedContext context, Executor executor) { - super(context, executor); - this.maxBusyThreads = 32; - } - - public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) { - return new VespaFeedHandler(context, executor); + private VespaFeedHandler(FeedContext context) { + super(context); } - @Override - public HttpResponse handle(HttpRequest request) { - return handle(request, null, 1); + public static VespaFeedHandler createFromContext(FeedContext context) { + return new VespaFeedHandler(context); } - public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { - try { - int busy = busyThreads.incrementAndGet(); - if (busy > maxBusyThreads) - return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE); - - MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); + public FeedResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { + MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); - String route = properties.getRoute().toString(); - FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); + String route = properties.getRoute().toString(); + FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); - SingleSender sender = new SingleSender(response, getSharedSender(route)); - sender.addMessageProcessor(properties); - sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender); - Feeder feeder = createFeeder(feedAccess, request); - feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); - feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); - response.setAbortOnFeedError(properties.getAbortOnFeedError()); + SingleSender sender = new SingleSender(response, getSharedSender(route)); + sender.addMessageProcessor(properties); + ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender); + Feeder feeder = createFeeder(feedAccess, request); + feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); + feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); + response.setAbortOnFeedError(properties.getAbortOnFeedError()); - List<String> errors = feeder.parse(); - for (String s : errors) { - response.addXMLParseError(s); - } - if (errors.size() > 0 && feeder instanceof XMLFeeder) { - response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); - } + List<String> errors = feeder.parse(); + for (String s : errors) { + response.addXMLParseError(s); + } + if (errors.size() > 0 && feeder instanceof XMLFeeder) { + response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); + } - sender.done(); - feedAccess.close(); - long millis = getTimeoutMillis(request); - boolean completed = sender.waitForPending(millis); - if (!completed) { - response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses"); - } - response.done(); - return response; - } finally { - busyThreads.decrementAndGet(); + sender.done(); + feedAccess.close(); + long millis = getTimeoutMillis(request); + boolean completed = sender.waitForPending(millis); + if (!completed) { + response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses"); } + response.done(); + return response; + } private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) { - String contentType = request.getHeader("Content-Type"); - if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) { + if (Boolean.valueOf(request.getProperty(JSON_INPUT))) { return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); } else { return new XMLFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); |