summaryrefslogtreecommitdiffstats
path: root/docproc
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-17 09:36:21 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-17 09:36:21 +0000
commitb38e3a7791a46949c775e2cf00c8d6c43f82a59e (patch)
treea456203ca3d23964888314bb7165e6d089dfb579 /docproc
parent6003ad410e2fa74bb6b4532af50ff3ffa2475211 (diff)
Since throttling now is on the outside, and has been for 12 years
we can remove this hack.
Diffstat (limited to 'docproc')
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java50
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java59
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java39
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java48
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java83
5 files changed, 16 insertions, 263 deletions
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java
deleted file mode 100644
index 6fd4beac056..00000000000
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.docproc.jdisc;
-
-import com.yahoo.document.DocumentUtil;
-import com.yahoo.log.LogLevel;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-/**
- * @author Einar M R Rosenvinge
- */
-class DocprocThreadManager {
-
- private static Logger log = Logger.getLogger(DocprocThreadManager.class.getName());
-
- private final long maxConcurrentByteSize;
- private final AtomicLong bytesStarted = new AtomicLong(0);
- private final AtomicLong bytesFinished = new AtomicLong(0);
-
- DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb) {
- this((long) (((double) DocumentUtil.calculateMaxPendingSize(maxConcurrentFactor, documentExpansionFactor,
- containerCoreMemoryMb)) * maxConcurrentFactor));
- }
-
- DocprocThreadManager(long maxConcurrentByteSize) {
- final int MINCONCURRENTBYTES=256*1024*1024; //256M
- if (maxConcurrentByteSize < MINCONCURRENTBYTES) {
- maxConcurrentByteSize = MINCONCURRENTBYTES;
- }
-
- this.maxConcurrentByteSize = maxConcurrentByteSize;
- log.log(LogLevel.CONFIG, "Docproc service allowed to concurrently process "
- + (((double) maxConcurrentByteSize) / 1024.0d / 1024.0d) + " megabytes of input data.");
- }
-
- boolean isAboveLimit() {
- return (bytesFinished.get() - bytesStarted.get() > maxConcurrentByteSize);
- }
- void beforeExecute(DocumentProcessingTask task) {
- bytesStarted.getAndAdd(task.getApproxSize());
- }
-
- void afterExecute(DocumentProcessingTask task) {
- bytesFinished.getAndAdd(task.getApproxSize());
- }
- void shutdown() {
- }
-
-}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java
deleted file mode 100644
index e1a902c8d5c..00000000000
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.docproc.jdisc;
-
-import com.yahoo.concurrent.DaemonThreadFactory;
-import com.yahoo.log.LogLevel;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-/**
- * @author Einar M R Rosenvinge
- */
-public class DocprocThreadPoolExecutor extends ThreadPoolExecutor {
-
- private static Logger log = Logger.getLogger(DocprocThreadPoolExecutor.class.getName());
- private DocprocThreadManager threadManager;
-
- public DocprocThreadPoolExecutor(int maxNumThreads, BlockingQueue<Runnable> queue, DocprocThreadManager threadMgr) {
- super((maxNumThreads > 0) ? maxNumThreads : Runtime.getRuntime().availableProcessors(),
- (maxNumThreads > 0) ? maxNumThreads : 2048,
- 1, TimeUnit.SECONDS,
- queue,
- new DaemonThreadFactory("docproc-"));
- this.threadManager = threadMgr;
- allowCoreThreadTimeOut(false);
- log.log(LogLevel.DEBUG, "Created docproc thread pool with " + super.getCorePoolSize() + " worker threads.");
- }
-
- @Override
- protected void beforeExecute(Thread thread, Runnable runnable) {
- threadManager.beforeExecute((DocumentProcessingTask) runnable);
- }
-
- @Override
- protected void afterExecute(Runnable runnable, Throwable throwable) {
- threadManager.afterExecute((DocumentProcessingTask) runnable);
- }
-
- @Override
- public void shutdown() {
- super.shutdown();
- threadManager.shutdown();
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- List<Runnable> list = super.shutdownNow();
- threadManager.shutdown();
- return list;
- }
-
- boolean isAboveLimit() {
- return threadManager.isAboveLimit();
- }
-
-}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
index 5b7b9d85a91..d34c300b8d0 100644
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
@@ -35,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -52,7 +53,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
private final ComponentRegistry<DocprocService> docprocServiceRegistry;
private final ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry;
private final ChainRegistry<DocumentProcessor> chainRegistry = new ChainRegistry<>();
- private DocprocThreadPoolExecutor threadPool;
+ private ThreadPoolExecutor threadPool;
private final ScheduledThreadPoolExecutor laterExecutor =
new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("docproc-later-"));
private ContainerDocumentConfig containerDocConfig;
@@ -61,7 +62,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry,
ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry,
ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
- DocprocThreadPoolExecutor threadPool, DocumentTypeManager documentTypeManager,
+ ThreadPoolExecutor threadPool, DocumentTypeManager documentTypeManager,
ChainsModel chainsModel, SchemaMap schemaMap, Statistics statistics,
Metric metric,
ContainerDocumentConfig containerDocConfig) {
@@ -88,28 +89,27 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
}
}
+ private static int computeNumThreads(int maxThreads) {
+ return (maxThreads > 0) ? maxThreads : Runtime.getRuntime().availableProcessors();
+ }
+
public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry,
ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry,
ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
DocumentProcessingHandlerParameters params) {
this(docprocServiceRegistry, documentProcessorComponentRegistry, docFactoryRegistry,
- new DocprocThreadPoolExecutor(params.getMaxNumThreads(),
- chooseQueueType(params.getMaxNumThreads()),
- new DocprocThreadManager(params.getMaxConcurrentFactor(),
- params.getDocumentExpansionFactor(),
- params.getContainerCoreMemoryMb())),
+ new ThreadPoolExecutor(computeNumThreads(params.getMaxNumThreads()),
+ computeNumThreads(params.getMaxNumThreads()),
+ 0,TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new DaemonThreadFactory("docproc-")
+ ),
params.getDocumentTypeManager(), params.getChainsModel(), params.getSchemaMap(),
params.getStatisticsManager(),
params.getMetric(),
params.getContainerDocConfig());
}
- private static BlockingQueue<Runnable> chooseQueueType(int maxNumThreads) {
- return (maxNumThreads > 0)
- ? new LinkedBlockingQueue<>()
- : new SynchronousQueue<>();
- }
-
@Inject
public DocumentProcessingHandler(ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry,
ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
@@ -124,9 +124,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
this(new ComponentRegistry<>(),
documentProcessorComponentRegistry, docFactoryRegistry, new DocumentProcessingHandlerParameters().setMaxNumThreads
(docprocConfig.numthreads())
- .setMaxConcurrentFactor(containerMbusConfig.maxConcurrentFactor())
- .setDocumentExpansionFactor(containerMbusConfig.documentExpansionFactor())
- .setContainerCoreMemoryMb(containerMbusConfig.containerCoreMemory())
.setDocumentTypeManager(new DocumentTypeManager(docManConfig))
.setChainsModel(buildFromConfig(chainsConfig)).setSchemaMap(configureMapping(mappingConfig))
.setStatisticsManager(manager)
@@ -198,14 +195,10 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
}
private void submit(DocumentProcessingTask task) {
- if (threadPool.isAboveLimit()) {
+ try {
+ threadPool.execute(task);
+ } catch (RejectedExecutionException ree) {
task.queueFull();
- } else {
- try {
- threadPool.execute(task);
- } catch (RejectedExecutionException ree) {
- task.queueFull();
- }
}
}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java
index bf308e39218..b8a6aa9c105 100644
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java
@@ -18,9 +18,6 @@ import com.yahoo.statistics.Statistics;
public class DocumentProcessingHandlerParameters {
private int maxNumThreads = 0;
- private double maxConcurrentFactor = 0.2;
- private double documentExpansionFactor = 20.0;
- private int containerCoreMemoryMb = 50;
private DocumentTypeManager documentTypeManager = null;
private ChainsModel chainsModel = null;
private SchemaMap schemaMap = null;
@@ -28,22 +25,7 @@ public class DocumentProcessingHandlerParameters {
private Metric metric = new NullMetric();
private ContainerDocumentConfig containerDocConfig;
- public DocumentProcessingHandlerParameters() {
- }
-
- /**
- * Returns the number of megabytes of memory reserved for container core classes and data.
- *
- * @return the number of megabytes of memory reserved for container core classes and data.
- */
- public int getContainerCoreMemoryMb() {
- return containerCoreMemoryMb;
- }
- public DocumentProcessingHandlerParameters setContainerCoreMemoryMb(int containerCoreMemoryMb) {
- this.containerCoreMemoryMb = containerCoreMemoryMb;
- return this;
- }
public Metric getMetric() {
return metric;
@@ -55,36 +37,6 @@ public class DocumentProcessingHandlerParameters {
}
/**
- * Returns the document expansion factor, i.e.&nbsp;by what factor a serialized and possibly compressed
- * input document is expected to expand during deserialization, including any temporary memory needed
- * when processing it.
- *
- * @return the document expansion factor.
- */
- public double getDocumentExpansionFactor() {
- return documentExpansionFactor;
- }
-
- public DocumentProcessingHandlerParameters setDocumentExpansionFactor(double documentExpansionFactor) {
- this.documentExpansionFactor = documentExpansionFactor;
- return this;
- }
-
- /**
- * Returns the max concurrent factor.
- *
- * @return the max concurrent factor.
- */
- public double getMaxConcurrentFactor() {
- return maxConcurrentFactor;
- }
-
- public DocumentProcessingHandlerParameters setMaxConcurrentFactor(double maxConcurrentFactor) {
- this.maxConcurrentFactor = maxConcurrentFactor;
- return this;
- }
-
- /**
* Returns the maximum number of thread that the thread pool will ever attempt to run simultaneously.
*
* @return the maximum number of thread that the thread pool will ever attempt to run simultaneously.
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java
deleted file mode 100644
index a74fb9a3edf..00000000000
--- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.docproc.jdisc;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
- */
-public class DocprocThreadPoolExecutorTestCase {
- private final Set<Long> threadIds = Collections.synchronizedSet(new HashSet<Long>());
-
- @Test
- public void threadPool() throws InterruptedException {
- int numThreads = 8;
- int numTasks = 200;
-
- LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<>();
- DocprocThreadManager mgr = new DocprocThreadManager(1000l);
- DocprocThreadPoolExecutor pool = new DocprocThreadPoolExecutor(numThreads, q, mgr);
-
- List<MockedDocumentProcessingTask> tasks = new ArrayList<>(numTasks);
- for (int i = 0; i < numTasks; i++) {
- tasks.add(new MockedDocumentProcessingTask());
- }
-
- for (int i = 0; i < numTasks; i++) {
- pool.execute(tasks.get(i));
- }
- pool.shutdown();
- pool.awaitTermination(120L, TimeUnit.SECONDS);
-
- for (int i = 0; i < numTasks; i++) {
- assertTrue(tasks.get(i).hasBeenRun());
- }
-
- System.err.println(threadIds);
- assertEquals(numThreads, threadIds.size());
- }
-
- private class MockedDocumentProcessingTask extends DocumentProcessingTask {
- private boolean hasBeenRun = false;
-
- public MockedDocumentProcessingTask() {
- super(null, null, null);
- }
-
- @Override
- public void run() {
- threadIds.add(Thread.currentThread().getId());
- System.err.println(System.currentTimeMillis() + " MOCK Thread " + Thread.currentThread().getId() + " running task " + this);
- for (int i = 0; i < 100000; i++) {
- Math.sin((double) (System.currentTimeMillis() / 10000L));
- }
- System.err.println(System.currentTimeMillis() + " MOCK Thread " + Thread.currentThread().getId() + " DONE task " + this);
- hasBeenRun = true;
- }
-
- @Override
- public int getApproxSize() {
- return 333;
- }
-
- @Override
- public String toString() {
- return "seqNum " + getSeqNum();
- }
-
- public boolean hasBeenRun() {
- return hasBeenRun;
- }
- }
-}