// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.feedhandler; import com.google.inject.Inject; import com.yahoo.clientmetrics.RouteMetricSet; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.cloud.config.SlobroksConfig; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.jdisc.EmptyResponse; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.protect.Error; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentUpdate; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.feedapi.DocprocMessageProcessor; import com.yahoo.feedapi.FeedContext; import com.yahoo.feedapi.Feeder; import com.yahoo.feedapi.JsonFeeder; import com.yahoo.feedapi.MessagePropertyProcessor; import com.yahoo.feedapi.SimpleFeedAccess; import com.yahoo.feedapi.SingleSender; import com.yahoo.feedapi.XMLFeeder; import com.yahoo.jdisc.Metric; import com.yahoo.vespa.config.content.LoadTypeConfig; import com.yahoo.vespaclient.config.FeederConfig; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * Feed documents from a com.yahoo.container.handler.Request. * * @author Thomas Gundersen * @author steinar */ public final class VespaFeedHandler extends VespaFeedHandlerBase { public static final String JSON_INPUT = "jsonInput"; private final AtomicInteger busyThreads = new AtomicInteger(0); private final int maxBusyThreads; @SuppressWarnings("unused") @Inject public VespaFeedHandler(FeederConfig feederConfig, LoadTypeConfig loadTypeConfig, DocumentmanagerConfig documentmanagerConfig, SlobroksConfig slobroksConfig, ClusterListConfig clusterListConfig, Executor executor, Metric metric) throws Exception { super(feederConfig, loadTypeConfig, documentmanagerConfig, slobroksConfig, clusterListConfig, executor, metric); this.maxBusyThreads = feederConfig.maxbusythreads(); } VespaFeedHandler(FeedContext context, Executor executor) throws Exception { super(context, executor); this.maxBusyThreads = 32; } public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) throws Exception { return new VespaFeedHandler(context, executor); } @Override public HttpResponse handle(HttpRequest request) { return handle(request, null, 1); } static final class ThreadedFeedAccess implements SimpleFeedAccess { private final SimpleFeedAccess simpleFeedAccess; private final ExecutorService executorService; private final Executor executor; ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) { this.simpleFeedAccess = simpleFeedAccess; if (numThreads <= 0) { numThreads = Runtime.getRuntime().availableProcessors(); } if (numThreads > 1) { executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, new SynchronousQueue<>(false), ThreadFactoryFactory.getDaemonThreadFactory("feeder"), new ThreadPoolExecutor.CallerRunsPolicy()); executor = executorService; } else { executorService = null; executor = new Executor() { @Override public void execute(Runnable command) { command.run(); } }; } } @Override public void put(Document doc) { executor.execute(() -> simpleFeedAccess.put(doc)); } @Override public void remove(DocumentId docId) { executor.execute(() -> simpleFeedAccess.remove(docId)); } @Override public void update(DocumentUpdate update) { executor.execute(() -> simpleFeedAccess.update(update)); } @Override public void put(Document doc, TestAndSetCondition condition) { executor.execute(() -> simpleFeedAccess.put(doc, condition)); } @Override public void remove(DocumentId docId, TestAndSetCondition condition) { executor.execute(() -> simpleFeedAccess.remove(docId, condition)); } @Override public void update(DocumentUpdate update, TestAndSetCondition condition) { executor.execute(() -> simpleFeedAccess.update(update, condition)); } @Override public boolean isAborted() { return simpleFeedAccess.isAborted(); } @Override public void close() { if (executorService != null) { executorService.shutdown(); } } } public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { if (request.getProperty("status") != null) { return new MetricResponse(context.getMetrics().getMetricSet()); } try { int busy = busyThreads.incrementAndGet(); if (busy > maxBusyThreads) return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE); boolean asynchronous = request.getBooleanProperty("asynchronous"); MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request); String route = properties.getRoute().toString(); FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback)); SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); sender.addMessageProcessor(properties); sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender); Feeder feeder = createFeeder(sender, request); feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); response.setAbortOnFeedError(properties.getAbortOnFeedError()); List errors = feeder.parse(); for (String s : errors) { response.addXMLParseError(s); } if (errors.size() > 0 && feeder instanceof XMLFeeder) { response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json."); } sender.done(); feedAccess.close(); if (asynchronous) { return response; } long millis = getTimeoutMillis(request); boolean completed = sender.waitForPending(millis); if (!completed) { response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses"); } response.done(); return response; } finally { busyThreads.decrementAndGet(); } } private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) { String contentType = request.getHeader("Content-Type"); if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) { return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); } else { return new XMLFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); } } }