aboutsummaryrefslogtreecommitdiffstats
path: root/docproc
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-23 15:50:43 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-23 15:50:43 +0000
commitb9ceb17637499dee9370c29443b0f0d85b09638b (patch)
tree9015ad5bb75372d6fa14bb076394abad24defbe6 /docproc
parent898e0f4a754305e3052952098627bb3dcb2aa193 (diff)
To avoid deadlock when doing synchrounous execution in other chains, we use one threadpool per chain.
Diffstat (limited to 'docproc')
-rw-r--r--docproc/abi-spec.json3
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java6
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/DocprocService.java38
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java45
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java14
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java2
6 files changed, 67 insertions, 41 deletions
diff --git a/docproc/abi-spec.json b/docproc/abi-spec.json
index dee2d2172e4..116e2c9edfb 100644
--- a/docproc/abi-spec.json
+++ b/docproc/abi-spec.json
@@ -157,12 +157,15 @@
],
"methods": [
"public void <init>(com.yahoo.component.ComponentId)",
+ "public void <init>(com.yahoo.component.ComponentId, com.yahoo.docproc.CallStack, com.yahoo.document.DocumentTypeManager, int)",
"public void <init>(com.yahoo.component.ComponentId, com.yahoo.docproc.CallStack, com.yahoo.document.DocumentTypeManager)",
"public void <init>(java.lang.String)",
+ "public void deconstruct()",
"public com.yahoo.document.DocumentTypeManager getDocumentTypeManager()",
"public void setDocumentTypeManager(com.yahoo.document.DocumentTypeManager)",
"public int getQueueSize()",
"public com.yahoo.docproc.DocprocExecutor getExecutor()",
+ "public java.util.concurrent.ThreadPoolExecutor getThreadPoolExecutor()",
"public void setInService(boolean)",
"public boolean isInService()",
"public boolean isAcceptingNewProcessings()",
diff --git a/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java
index 3e720f9e0aa..53ccbd4c325 100644
--- a/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java
+++ b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java
@@ -35,15 +35,15 @@ public abstract class AbstractConcreteDocumentFactory extends com.yahoo.componen
/**
* If the FieldValue is a StructuredFieldValue it will upgrade to the concrete type
- * @param field
- * @param fv
+ * @param field to upgrade
+ * @param fv value to upgrade
* @return fv or upgraded fv
*/
public FieldValue optionallyUpgrade(Field field, FieldValue fv) {
return optionallyUpgrade(field.getDataType(), fv);
}
- @SuppressWarnings({"unchecked", "rawtypes"})
+ @SuppressWarnings({"unchecked"})
private FieldValue optionallyUpgrade(DataType dataType, FieldValue fv) {
if (fv instanceof StructuredFieldValue) {
try {
diff --git a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java
index 766b433c4dd..b06e6bbdf40 100644
--- a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java
+++ b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java
@@ -3,6 +3,7 @@ package com.yahoo.docproc;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.ComponentId;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.docproc.proxy.SchemaMap;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentTypeManager;
@@ -10,6 +11,8 @@ import com.yahoo.document.DocumentTypeManager;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -27,6 +30,7 @@ import java.util.logging.Logger;
*
* @author bratseth
*/
+//TODO Vespa 8 This class and a lot of other in this package should not be part of PublicAPI
public class DocprocService extends AbstractComponent {
private static Logger log = Logger.getLogger(DocprocService.class.getName());
@@ -34,6 +38,7 @@ public class DocprocService extends AbstractComponent {
/** The processings currently in progress at this service */
private final LinkedBlockingQueue<Processing> queue;
+ private final ThreadPoolExecutor threadPool;
/** The current state of this service */
private boolean inService = false;
/** The current state of this service */
@@ -41,9 +46,18 @@ public class DocprocService extends AbstractComponent {
public static SchemaMap schemaMap = new SchemaMap();
private DocumentTypeManager documentTypeManager = null;
- public DocprocService(ComponentId id) {
+ private DocprocService(ComponentId id, int numThreads) {
super(id);
queue = new LinkedBlockingQueue<>();
+ threadPool = new ThreadPoolExecutor(numThreads,
+ numThreads,
+ 0, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new DaemonThreadFactory("docproc-" + id.stringValue() + "-"));
+ }
+
+ public DocprocService(ComponentId id) {
+ this(id, Runtime.getRuntime().availableProcessors());
}
/**
@@ -52,20 +66,32 @@ public class DocprocService extends AbstractComponent {
* @param id the component id of the new service.
* @param stack the call stack to use.
* @param mgr the document type manager to use.
+ * @param numThreads to have in the thread pool
*/
- public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr) {
- this(id);
+ public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr, int numThreads) {
+ this(id, numThreads);
setCallStack(stack);
setDocumentTypeManager(mgr);
setInService(true);
}
+ @Deprecated
+ public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr) {
+ this(id, stack, mgr, Runtime.getRuntime().availableProcessors());
+ }
+
/**
* Creates a service with a name with an unbounded input queue. If the given name is null or the empty string,
* it will become the name "default".
+ * Testing only
*/
public DocprocService(String name) {
- this(new ComponentId(name, null));
+ this(new ComponentId(name, null), 1);
+ }
+
+ @Override
+ public void deconstruct() {
+ threadPool.shutdown();
}
public DocumentTypeManager getDocumentTypeManager() {
@@ -90,6 +116,10 @@ public class DocprocService extends AbstractComponent {
return executor;
}
+ public ThreadPoolExecutor getThreadPoolExecutor() {
+ return threadPool;
+ }
+
private void setExecutor(DocprocExecutor executor) {
this.executor = executor;
}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
index d34c300b8d0..f93598058cf 100644
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
@@ -30,12 +30,7 @@ import com.yahoo.processing.execution.chain.ChainRegistry;
import com.yahoo.statistics.Statistics;
import java.util.TimerTask;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -53,7 +48,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
private final ComponentRegistry<DocprocService> docprocServiceRegistry;
private final ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry;
private final ChainRegistry<DocumentProcessor> chainRegistry = new ChainRegistry<>();
- private ThreadPoolExecutor threadPool;
private final ScheduledThreadPoolExecutor laterExecutor =
new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("docproc-later-"));
private ContainerDocumentConfig containerDocConfig;
@@ -62,17 +56,16 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry,
ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry,
ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
- ThreadPoolExecutor threadPool, DocumentTypeManager documentTypeManager,
+ int numThreads,
+ DocumentTypeManager documentTypeManager,
ChainsModel chainsModel, SchemaMap schemaMap, Statistics statistics,
Metric metric,
ContainerDocumentConfig containerDocConfig) {
this.docprocServiceRegistry = docprocServiceRegistry;
this.docFactoryRegistry = docFactoryRegistry;
- this.threadPool = threadPool;
this.containerDocConfig = containerDocConfig;
this.documentTypeManager = documentTypeManager;
DocprocService.schemaMap = schemaMap;
- threadPool.prestartCoreThread();
laterExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
laterExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@@ -81,12 +74,12 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
for (Chain<DocumentProcessor> chain : chainRegistry.allComponents()) {
log.config("Setting up call stack for chain " + chain.getId());
- DocprocService service =
- new DocprocService(chain.getId(), convertToCallStack(chain, statistics, metric), documentTypeManager);
+ DocprocService service = new DocprocService(chain.getId(), convertToCallStack(chain, statistics, metric), documentTypeManager, computeNumThreads(numThreads));
service.setInService(true);
docprocServiceRegistry.register(service.getId(), service);
}
}
+
}
private static int computeNumThreads(int maxThreads) {
@@ -98,12 +91,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
DocumentProcessingHandlerParameters params) {
this(docprocServiceRegistry, documentProcessorComponentRegistry, docFactoryRegistry,
- new ThreadPoolExecutor(computeNumThreads(params.getMaxNumThreads()),
- computeNumThreads(params.getMaxNumThreads()),
- 0,TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new DaemonThreadFactory("docproc-")
- ),
+ params.getMaxNumThreads(),
params.getDocumentTypeManager(), params.getChainsModel(), params.getSchemaMap(),
params.getStatisticsManager(),
params.getMetric(),
@@ -133,7 +121,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
@Override
protected void destroy() {
- threadPool.shutdown(); //calling shutdownNow() seems like a bit of an overkill
+ //threadPoolMap.values().forEach( pool -> pool.shutdown()); //calling shutdownNow() seems like a bit of an overkill
}
public ComponentRegistry<DocprocService> getDocprocServiceRegistry() {
@@ -175,13 +163,14 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
return null;
}
- DocprocService service = docprocServiceRegistry.getComponent(requestContext.getServiceName());
+ String serviceName = requestContext.getServiceName();
+ DocprocService service = docprocServiceRegistry.getComponent(serviceName);
//No need to enqueue a task if the docproc chain is empty, just forward requestContext
if (service == null) {
- log.log(LogLevel.ERROR, "DocprocService for session '" + requestContext.getServiceName() +
+ log.log(LogLevel.ERROR, "DocprocService for session '" + serviceName +
"' not found, returning request '" + requestContext + "'.");
requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE,
- "DocprocService " + requestContext.getServiceName() + " not found.");
+ "DocprocService " + serviceName + " not found.");
return null;
} else if (service.getExecutor().getCallStack().size() == 0) {
//call stack was empty, just forward message
@@ -189,19 +178,11 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
return null;
}
- DocumentProcessingTask task = new DocumentProcessingTask(requestContext, this, service);
- submit(task);
+ DocumentProcessingTask task = new DocumentProcessingTask(requestContext, this, service, service.getThreadPoolExecutor());
+ task.submit();
return null;
}
- private void submit(DocumentProcessingTask task) {
- try {
- threadPool.execute(task);
- } catch (RejectedExecutionException ree) {
- task.queueFull();
- }
- }
-
void submit(DocumentProcessingTask task, long delay) {
LaterTimerTask timerTask = new LaterTimerTask(task, delay);
laterExecutor.schedule(timerTask, delay, TimeUnit.MILLISECONDS);
@@ -220,7 +201,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
@Override
public void run() {
log.log(LogLevel.DEBUG, "Submitting after having waited " + delay + " ms in LATER queue: " + processingTask);
- submit(processingTask);
+ processingTask.submit();
}
}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java
index ca4648678a5..4d887616093 100644
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java
@@ -17,6 +17,8 @@ import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,13 +38,23 @@ public class DocumentProcessingTask implements Comparable<DocumentProcessingTask
private final static AtomicLong seq = new AtomicLong();
private final long seqNum;
private final DocprocService service;
+ private final ThreadPoolExecutor executor;
public DocumentProcessingTask(RequestContext requestContext, DocumentProcessingHandler docprocHandler,
- DocprocService service) {
+ DocprocService service, ThreadPoolExecutor executor) {
seqNum = seq.getAndIncrement();
this.requestContext = requestContext;
this.docprocHandler = docprocHandler;
this.service = service;
+ this.executor = executor;
+ }
+
+ void submit() {
+ try {
+ executor.execute(this);
+ } catch (RejectedExecutionException ree) {
+ queueFull();
+ }
}
@Override
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java
index e012239c9e3..f050b6c5450 100644
--- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java
@@ -71,7 +71,7 @@ public class DocumentProcessingTaskPrioritizationTestCase {
private class TestDocumentProcessingTask extends DocumentProcessingTask {
private TestDocumentProcessingTask(DocumentProtocol.Priority priority) {
- super(new TestRequestContext(priority), null, null);
+ super(new TestRequestContext(priority), null, null, null);
}
}