diff options
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java')
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java | 112 |
1 files changed, 29 insertions, 83 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 4587c84f9dc..892f3763805 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -1,16 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.feedhandler; -import com.google.inject.Inject; import com.yahoo.clientmetrics.RouteMetricSet; -import com.yahoo.cloud.config.ClusterListConfig; -import com.yahoo.cloud.config.SlobroksConfig; -import com.yahoo.container.jdisc.EmptyResponse; import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.protect.Error; -import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.feedapi.DocprocMessageProcessor; import com.yahoo.feedapi.FeedContext; import com.yahoo.feedapi.Feeder; import com.yahoo.feedapi.JsonFeeder; @@ -18,13 +11,8 @@ import com.yahoo.feedapi.MessagePropertyProcessor; import com.yahoo.feedapi.SimpleFeedAccess; import com.yahoo.feedapi.SingleSender; import com.yahoo.feedapi.XMLFeeder; -import com.yahoo.jdisc.Metric; -import com.yahoo.vespa.config.content.LoadTypeConfig; -import com.yahoo.vespaclient.config.FeederConfig; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; /** @@ -37,89 +25,47 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { public static final String JSON_INPUT = "jsonInput"; - private final AtomicInteger busyThreads = new AtomicInteger(0); - private final int maxBusyThreads; - - @SuppressWarnings("unused") - @Inject - public VespaFeedHandler(FeederConfig feederConfig, - LoadTypeConfig loadTypeConfig, - DocumentmanagerConfig documentmanagerConfig, - SlobroksConfig slobroksConfig, - Executor executor, - Metric metric) { - super(feederConfig, loadTypeConfig, documentmanagerConfig, slobroksConfig, executor, metric); - this.maxBusyThreads = feederConfig.maxbusythreads(); - } - - private VespaFeedHandler(FeedContext context, Executor executor) { - super(context, executor); - this.maxBusyThreads = 32; + private VespaFeedHandler(FeedContext context) { + super(context); } - public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) { - return new VespaFeedHandler(context, executor); + public static VespaFeedHandler createFromContext(FeedContext context) { + return new VespaFeedHandler(context); } - @Override - public HttpResponse handle(HttpRequest request) { - return handle(request, null, 1); - } - - public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { - if (request.getProperty("status") != null) { - return new MetricResponse(context.getMetrics().getMetricSet()); - } - try { - int busy = busyThreads.incrementAndGet(); - if (busy > maxBusyThreads) - return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE); + public FeedResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { + MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); - boolean asynchronous = request.getBooleanProperty("asynchronous"); + String route = properties.getRoute().toString(); + FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); - MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); + SingleSender sender = new SingleSender(response, getSharedSender(route)); + sender.addMessageProcessor(properties); + ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender); + Feeder feeder = createFeeder(feedAccess, request); + feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); + feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); + response.setAbortOnFeedError(properties.getAbortOnFeedError()); - String route = properties.getRoute().toString(); - FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); - - SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); - sender.addMessageProcessor(properties); - sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender); - Feeder feeder = createFeeder(feedAccess, request); - feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); - feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); - response.setAbortOnFeedError(properties.getAbortOnFeedError()); - - List<String> errors = feeder.parse(); - for (String s : errors) { - response.addXMLParseError(s); - } - if (errors.size() > 0 && feeder instanceof XMLFeeder) { - response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); - } - - sender.done(); - feedAccess.close(); + List<String> errors = feeder.parse(); + for (String s : errors) { + response.addXMLParseError(s); + } - if (asynchronous) { - return response; - } - long millis = getTimeoutMillis(request); - boolean completed = sender.waitForPending(millis); - if (!completed) { - response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses"); - } - response.done(); - return response; - } finally { - busyThreads.decrementAndGet(); + sender.done(); + feedAccess.close(); + long millis = getTimeoutMillis(request); + boolean completed = sender.waitForPending(millis); + if (!completed) { + response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses"); } + response.done(); + return response; + } private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) { - String contentType = request.getHeader("Content-Type"); - if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) { + if (Boolean.valueOf(request.getProperty(JSON_INPUT))) { return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); } else { return new XMLFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); |