diff options
author | Jon Bratseth <bratseth@oath.com> | 2020-04-23 18:34:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-23 18:34:41 +0200 |
commit | 5ab026fa9ddd6912800c136775ebe60faba8f0e1 (patch) | |
tree | 9015ad5bb75372d6fa14bb076394abad24defbe6 | |
parent | 898e0f4a754305e3052952098627bb3dcb2aa193 (diff) | |
parent | b9ceb17637499dee9370c29443b0f0d85b09638b (diff) |
Merge pull request #13037 from vespa-engine/balder/one-threadpool-per-chain
To avoid deadlock when doing synchrounous execution in other chains, …
6 files changed, 67 insertions, 41 deletions
diff --git a/docproc/abi-spec.json b/docproc/abi-spec.json index dee2d2172e4..116e2c9edfb 100644 --- a/docproc/abi-spec.json +++ b/docproc/abi-spec.json @@ -157,12 +157,15 @@ ], "methods": [ "public void <init>(com.yahoo.component.ComponentId)", + "public void <init>(com.yahoo.component.ComponentId, com.yahoo.docproc.CallStack, com.yahoo.document.DocumentTypeManager, int)", "public void <init>(com.yahoo.component.ComponentId, com.yahoo.docproc.CallStack, com.yahoo.document.DocumentTypeManager)", "public void <init>(java.lang.String)", + "public void deconstruct()", "public com.yahoo.document.DocumentTypeManager getDocumentTypeManager()", "public void setDocumentTypeManager(com.yahoo.document.DocumentTypeManager)", "public int getQueueSize()", "public com.yahoo.docproc.DocprocExecutor getExecutor()", + "public java.util.concurrent.ThreadPoolExecutor getThreadPoolExecutor()", "public void setInService(boolean)", "public boolean isInService()", "public boolean isAcceptingNewProcessings()", diff --git a/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java index 3e720f9e0aa..53ccbd4c325 100644 --- a/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java +++ b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java @@ -35,15 +35,15 @@ public abstract class AbstractConcreteDocumentFactory extends com.yahoo.componen /** * If the FieldValue is a StructuredFieldValue it will upgrade to the concrete type - * @param field - * @param fv + * @param field to upgrade + * @param fv value to upgrade * @return fv or upgraded fv */ public FieldValue optionallyUpgrade(Field field, FieldValue fv) { return optionallyUpgrade(field.getDataType(), fv); } - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings({"unchecked"}) private FieldValue optionallyUpgrade(DataType dataType, FieldValue fv) { if (fv instanceof StructuredFieldValue) { try { diff --git a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java index 766b433c4dd..b06e6bbdf40 100644 --- a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java +++ b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java @@ -3,6 +3,7 @@ package com.yahoo.docproc; import com.yahoo.component.AbstractComponent; import com.yahoo.component.ComponentId; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.docproc.proxy.SchemaMap; import com.yahoo.document.DocumentOperation; import com.yahoo.document.DocumentTypeManager; @@ -10,6 +11,8 @@ import com.yahoo.document.DocumentTypeManager; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -27,6 +30,7 @@ import java.util.logging.Logger; * * @author bratseth */ +//TODO Vespa 8 This class and a lot of other in this package should not be part of PublicAPI public class DocprocService extends AbstractComponent { private static Logger log = Logger.getLogger(DocprocService.class.getName()); @@ -34,6 +38,7 @@ public class DocprocService extends AbstractComponent { /** The processings currently in progress at this service */ private final LinkedBlockingQueue<Processing> queue; + private final ThreadPoolExecutor threadPool; /** The current state of this service */ private boolean inService = false; /** The current state of this service */ @@ -41,9 +46,18 @@ public class DocprocService extends AbstractComponent { public static SchemaMap schemaMap = new SchemaMap(); private DocumentTypeManager documentTypeManager = null; - public DocprocService(ComponentId id) { + private DocprocService(ComponentId id, int numThreads) { super(id); queue = new LinkedBlockingQueue<>(); + threadPool = new ThreadPoolExecutor(numThreads, + numThreads, + 0, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new DaemonThreadFactory("docproc-" + id.stringValue() + "-")); + } + + public DocprocService(ComponentId id) { + this(id, Runtime.getRuntime().availableProcessors()); } /** @@ -52,20 +66,32 @@ public class DocprocService extends AbstractComponent { * @param id the component id of the new service. * @param stack the call stack to use. * @param mgr the document type manager to use. + * @param numThreads to have in the thread pool */ - public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr) { - this(id); + public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr, int numThreads) { + this(id, numThreads); setCallStack(stack); setDocumentTypeManager(mgr); setInService(true); } + @Deprecated + public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr) { + this(id, stack, mgr, Runtime.getRuntime().availableProcessors()); + } + /** * Creates a service with a name with an unbounded input queue. If the given name is null or the empty string, * it will become the name "default". + * Testing only */ public DocprocService(String name) { - this(new ComponentId(name, null)); + this(new ComponentId(name, null), 1); + } + + @Override + public void deconstruct() { + threadPool.shutdown(); } public DocumentTypeManager getDocumentTypeManager() { @@ -90,6 +116,10 @@ public class DocprocService extends AbstractComponent { return executor; } + public ThreadPoolExecutor getThreadPoolExecutor() { + return threadPool; + } + private void setExecutor(DocprocExecutor executor) { this.executor = executor; } diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java index d34c300b8d0..f93598058cf 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java @@ -30,12 +30,7 @@ import com.yahoo.processing.execution.chain.ChainRegistry; import com.yahoo.statistics.Statistics; import java.util.TimerTask; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -53,7 +48,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { private final ComponentRegistry<DocprocService> docprocServiceRegistry; private final ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry; private final ChainRegistry<DocumentProcessor> chainRegistry = new ChainRegistry<>(); - private ThreadPoolExecutor threadPool; private final ScheduledThreadPoolExecutor laterExecutor = new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("docproc-later-")); private ContainerDocumentConfig containerDocConfig; @@ -62,17 +56,16 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry, ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry, ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, - ThreadPoolExecutor threadPool, DocumentTypeManager documentTypeManager, + int numThreads, + DocumentTypeManager documentTypeManager, ChainsModel chainsModel, SchemaMap schemaMap, Statistics statistics, Metric metric, ContainerDocumentConfig containerDocConfig) { this.docprocServiceRegistry = docprocServiceRegistry; this.docFactoryRegistry = docFactoryRegistry; - this.threadPool = threadPool; this.containerDocConfig = containerDocConfig; this.documentTypeManager = documentTypeManager; DocprocService.schemaMap = schemaMap; - threadPool.prestartCoreThread(); laterExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); laterExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); @@ -81,12 +74,12 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { for (Chain<DocumentProcessor> chain : chainRegistry.allComponents()) { log.config("Setting up call stack for chain " + chain.getId()); - DocprocService service = - new DocprocService(chain.getId(), convertToCallStack(chain, statistics, metric), documentTypeManager); + DocprocService service = new DocprocService(chain.getId(), convertToCallStack(chain, statistics, metric), documentTypeManager, computeNumThreads(numThreads)); service.setInService(true); docprocServiceRegistry.register(service.getId(), service); } } + } private static int computeNumThreads(int maxThreads) { @@ -98,12 +91,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, DocumentProcessingHandlerParameters params) { this(docprocServiceRegistry, documentProcessorComponentRegistry, docFactoryRegistry, - new ThreadPoolExecutor(computeNumThreads(params.getMaxNumThreads()), - computeNumThreads(params.getMaxNumThreads()), - 0,TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new DaemonThreadFactory("docproc-") - ), + params.getMaxNumThreads(), params.getDocumentTypeManager(), params.getChainsModel(), params.getSchemaMap(), params.getStatisticsManager(), params.getMetric(), @@ -133,7 +121,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { @Override protected void destroy() { - threadPool.shutdown(); //calling shutdownNow() seems like a bit of an overkill + //threadPoolMap.values().forEach( pool -> pool.shutdown()); //calling shutdownNow() seems like a bit of an overkill } public ComponentRegistry<DocprocService> getDocprocServiceRegistry() { @@ -175,13 +163,14 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { return null; } - DocprocService service = docprocServiceRegistry.getComponent(requestContext.getServiceName()); + String serviceName = requestContext.getServiceName(); + DocprocService service = docprocServiceRegistry.getComponent(serviceName); //No need to enqueue a task if the docproc chain is empty, just forward requestContext if (service == null) { - log.log(LogLevel.ERROR, "DocprocService for session '" + requestContext.getServiceName() + + log.log(LogLevel.ERROR, "DocprocService for session '" + serviceName + "' not found, returning request '" + requestContext + "'."); requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE, - "DocprocService " + requestContext.getServiceName() + " not found."); + "DocprocService " + serviceName + " not found."); return null; } else if (service.getExecutor().getCallStack().size() == 0) { //call stack was empty, just forward message @@ -189,19 +178,11 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { return null; } - DocumentProcessingTask task = new DocumentProcessingTask(requestContext, this, service); - submit(task); + DocumentProcessingTask task = new DocumentProcessingTask(requestContext, this, service, service.getThreadPoolExecutor()); + task.submit(); return null; } - private void submit(DocumentProcessingTask task) { - try { - threadPool.execute(task); - } catch (RejectedExecutionException ree) { - task.queueFull(); - } - } - void submit(DocumentProcessingTask task, long delay) { LaterTimerTask timerTask = new LaterTimerTask(task, delay); laterExecutor.schedule(timerTask, delay, TimeUnit.MILLISECONDS); @@ -220,7 +201,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { @Override public void run() { log.log(LogLevel.DEBUG, "Submitting after having waited " + delay + " ms in LATER queue: " + processingTask); - submit(processingTask); + processingTask.submit(); } } diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java index ca4648678a5..4d887616093 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java @@ -17,6 +17,8 @@ import java.io.StringWriter; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,13 +38,23 @@ public class DocumentProcessingTask implements Comparable<DocumentProcessingTask private final static AtomicLong seq = new AtomicLong(); private final long seqNum; private final DocprocService service; + private final ThreadPoolExecutor executor; public DocumentProcessingTask(RequestContext requestContext, DocumentProcessingHandler docprocHandler, - DocprocService service) { + DocprocService service, ThreadPoolExecutor executor) { seqNum = seq.getAndIncrement(); this.requestContext = requestContext; this.docprocHandler = docprocHandler; this.service = service; + this.executor = executor; + } + + void submit() { + try { + executor.execute(this); + } catch (RejectedExecutionException ree) { + queueFull(); + } } @Override diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java index e012239c9e3..f050b6c5450 100644 --- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java @@ -71,7 +71,7 @@ public class DocumentProcessingTaskPrioritizationTestCase { private class TestDocumentProcessingTask extends DocumentProcessingTask { private TestDocumentProcessingTask(DocumentProtocol.Priority priority) { - super(new TestRequestContext(priority), null, null); + super(new TestRequestContext(priority), null, null, null); } } |