From b9ceb17637499dee9370c29443b0f0d85b09638b Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 23 Apr 2020 15:50:43 +0000 Subject: To avoid deadlock when doing synchrounous execution in other chains, we use one threadpool per chain. --- docproc/abi-spec.json | 3 ++ .../docproc/AbstractConcreteDocumentFactory.java | 6 +-- .../java/com/yahoo/docproc/DocprocService.java | 38 ++++++++++++++++-- .../docproc/jdisc/DocumentProcessingHandler.java | 45 +++++++--------------- .../docproc/jdisc/DocumentProcessingTask.java | 14 ++++++- ...cumentProcessingTaskPrioritizationTestCase.java | 2 +- 6 files changed, 67 insertions(+), 41 deletions(-) diff --git a/docproc/abi-spec.json b/docproc/abi-spec.json index dee2d2172e4..116e2c9edfb 100644 --- a/docproc/abi-spec.json +++ b/docproc/abi-spec.json @@ -157,12 +157,15 @@ ], "methods": [ "public void (com.yahoo.component.ComponentId)", + "public void (com.yahoo.component.ComponentId, com.yahoo.docproc.CallStack, com.yahoo.document.DocumentTypeManager, int)", "public void (com.yahoo.component.ComponentId, com.yahoo.docproc.CallStack, com.yahoo.document.DocumentTypeManager)", "public void (java.lang.String)", + "public void deconstruct()", "public com.yahoo.document.DocumentTypeManager getDocumentTypeManager()", "public void setDocumentTypeManager(com.yahoo.document.DocumentTypeManager)", "public int getQueueSize()", "public com.yahoo.docproc.DocprocExecutor getExecutor()", + "public java.util.concurrent.ThreadPoolExecutor getThreadPoolExecutor()", "public void setInService(boolean)", "public boolean isInService()", "public boolean isAcceptingNewProcessings()", diff --git a/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java index 3e720f9e0aa..53ccbd4c325 100644 --- a/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java +++ b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java @@ -35,15 +35,15 @@ public abstract class AbstractConcreteDocumentFactory extends com.yahoo.componen /** * If the FieldValue is a StructuredFieldValue it will upgrade to the concrete type - * @param field - * @param fv + * @param field to upgrade + * @param fv value to upgrade * @return fv or upgraded fv */ public FieldValue optionallyUpgrade(Field field, FieldValue fv) { return optionallyUpgrade(field.getDataType(), fv); } - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings({"unchecked"}) private FieldValue optionallyUpgrade(DataType dataType, FieldValue fv) { if (fv instanceof StructuredFieldValue) { try { diff --git a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java index 766b433c4dd..b06e6bbdf40 100644 --- a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java +++ b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java @@ -3,6 +3,7 @@ package com.yahoo.docproc; import com.yahoo.component.AbstractComponent; import com.yahoo.component.ComponentId; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.docproc.proxy.SchemaMap; import com.yahoo.document.DocumentOperation; import com.yahoo.document.DocumentTypeManager; @@ -10,6 +11,8 @@ import com.yahoo.document.DocumentTypeManager; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -27,6 +30,7 @@ import java.util.logging.Logger; * * @author bratseth */ +//TODO Vespa 8 This class and a lot of other in this package should not be part of PublicAPI public class DocprocService extends AbstractComponent { private static Logger log = Logger.getLogger(DocprocService.class.getName()); @@ -34,6 +38,7 @@ public class DocprocService extends AbstractComponent { /** The processings currently in progress at this service */ private final LinkedBlockingQueue queue; + private final ThreadPoolExecutor threadPool; /** The current state of this service */ private boolean inService = false; /** The current state of this service */ @@ -41,9 +46,18 @@ public class DocprocService extends AbstractComponent { public static SchemaMap schemaMap = new SchemaMap(); private DocumentTypeManager documentTypeManager = null; - public DocprocService(ComponentId id) { + private DocprocService(ComponentId id, int numThreads) { super(id); queue = new LinkedBlockingQueue<>(); + threadPool = new ThreadPoolExecutor(numThreads, + numThreads, + 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new DaemonThreadFactory("docproc-" + id.stringValue() + "-")); + } + + public DocprocService(ComponentId id) { + this(id, Runtime.getRuntime().availableProcessors()); } /** @@ -52,20 +66,32 @@ public class DocprocService extends AbstractComponent { * @param id the component id of the new service. * @param stack the call stack to use. * @param mgr the document type manager to use. + * @param numThreads to have in the thread pool */ - public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr) { - this(id); + public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr, int numThreads) { + this(id, numThreads); setCallStack(stack); setDocumentTypeManager(mgr); setInService(true); } + @Deprecated + public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr) { + this(id, stack, mgr, Runtime.getRuntime().availableProcessors()); + } + /** * Creates a service with a name with an unbounded input queue. If the given name is null or the empty string, * it will become the name "default". + * Testing only */ public DocprocService(String name) { - this(new ComponentId(name, null)); + this(new ComponentId(name, null), 1); + } + + @Override + public void deconstruct() { + threadPool.shutdown(); } public DocumentTypeManager getDocumentTypeManager() { @@ -90,6 +116,10 @@ public class DocprocService extends AbstractComponent { return executor; } + public ThreadPoolExecutor getThreadPoolExecutor() { + return threadPool; + } + private void setExecutor(DocprocExecutor executor) { this.executor = executor; } diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java index d34c300b8d0..f93598058cf 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java @@ -30,12 +30,7 @@ import com.yahoo.processing.execution.chain.ChainRegistry; import com.yahoo.statistics.Statistics; import java.util.TimerTask; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -53,7 +48,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { private final ComponentRegistry docprocServiceRegistry; private final ComponentRegistry docFactoryRegistry; private final ChainRegistry chainRegistry = new ChainRegistry<>(); - private ThreadPoolExecutor threadPool; private final ScheduledThreadPoolExecutor laterExecutor = new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("docproc-later-")); private ContainerDocumentConfig containerDocConfig; @@ -62,17 +56,16 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { public DocumentProcessingHandler(ComponentRegistry docprocServiceRegistry, ComponentRegistry documentProcessorComponentRegistry, ComponentRegistry docFactoryRegistry, - ThreadPoolExecutor threadPool, DocumentTypeManager documentTypeManager, + int numThreads, + DocumentTypeManager documentTypeManager, ChainsModel chainsModel, SchemaMap schemaMap, Statistics statistics, Metric metric, ContainerDocumentConfig containerDocConfig) { this.docprocServiceRegistry = docprocServiceRegistry; this.docFactoryRegistry = docFactoryRegistry; - this.threadPool = threadPool; this.containerDocConfig = containerDocConfig; this.documentTypeManager = documentTypeManager; DocprocService.schemaMap = schemaMap; - threadPool.prestartCoreThread(); laterExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); laterExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); @@ -81,12 +74,12 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { for (Chain chain : chainRegistry.allComponents()) { log.config("Setting up call stack for chain " + chain.getId()); - DocprocService service = - new DocprocService(chain.getId(), convertToCallStack(chain, statistics, metric), documentTypeManager); + DocprocService service = new DocprocService(chain.getId(), convertToCallStack(chain, statistics, metric), documentTypeManager, computeNumThreads(numThreads)); service.setInService(true); docprocServiceRegistry.register(service.getId(), service); } } + } private static int computeNumThreads(int maxThreads) { @@ -98,12 +91,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { ComponentRegistry docFactoryRegistry, DocumentProcessingHandlerParameters params) { this(docprocServiceRegistry, documentProcessorComponentRegistry, docFactoryRegistry, - new ThreadPoolExecutor(computeNumThreads(params.getMaxNumThreads()), - computeNumThreads(params.getMaxNumThreads()), - 0,TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new DaemonThreadFactory("docproc-") - ), + params.getMaxNumThreads(), params.getDocumentTypeManager(), params.getChainsModel(), params.getSchemaMap(), params.getStatisticsManager(), params.getMetric(), @@ -133,7 +121,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { @Override protected void destroy() { - threadPool.shutdown(); //calling shutdownNow() seems like a bit of an overkill + //threadPoolMap.values().forEach( pool -> pool.shutdown()); //calling shutdownNow() seems like a bit of an overkill } public ComponentRegistry getDocprocServiceRegistry() { @@ -175,13 +163,14 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { return null; } - DocprocService service = docprocServiceRegistry.getComponent(requestContext.getServiceName()); + String serviceName = requestContext.getServiceName(); + DocprocService service = docprocServiceRegistry.getComponent(serviceName); //No need to enqueue a task if the docproc chain is empty, just forward requestContext if (service == null) { - log.log(LogLevel.ERROR, "DocprocService for session '" + requestContext.getServiceName() + + log.log(LogLevel.ERROR, "DocprocService for session '" + serviceName + "' not found, returning request '" + requestContext + "'."); requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE, - "DocprocService " + requestContext.getServiceName() + " not found."); + "DocprocService " + serviceName + " not found."); return null; } else if (service.getExecutor().getCallStack().size() == 0) { //call stack was empty, just forward message @@ -189,19 +178,11 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { return null; } - DocumentProcessingTask task = new DocumentProcessingTask(requestContext, this, service); - submit(task); + DocumentProcessingTask task = new DocumentProcessingTask(requestContext, this, service, service.getThreadPoolExecutor()); + task.submit(); return null; } - private void submit(DocumentProcessingTask task) { - try { - threadPool.execute(task); - } catch (RejectedExecutionException ree) { - task.queueFull(); - } - } - void submit(DocumentProcessingTask task, long delay) { LaterTimerTask timerTask = new LaterTimerTask(task, delay); laterExecutor.schedule(timerTask, delay, TimeUnit.MILLISECONDS); @@ -220,7 +201,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { @Override public void run() { log.log(LogLevel.DEBUG, "Submitting after having waited " + delay + " ms in LATER queue: " + processingTask); - submit(processingTask); + processingTask.submit(); } } diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java index ca4648678a5..4d887616093 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java @@ -17,6 +17,8 @@ import java.io.StringWriter; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,13 +38,23 @@ public class DocumentProcessingTask implements Comparable