diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-17 09:36:21 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-17 09:36:21 +0000 |
commit | b38e3a7791a46949c775e2cf00c8d6c43f82a59e (patch) | |
tree | a456203ca3d23964888314bb7165e6d089dfb579 /docproc | |
parent | 6003ad410e2fa74bb6b4532af50ff3ffa2475211 (diff) |
Since throttling now is on the outside, and has been for 12 years
we can remove this hack.
Diffstat (limited to 'docproc')
5 files changed, 16 insertions, 263 deletions
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java deleted file mode 100644 index 6fd4beac056..00000000000 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc.jdisc; - -import com.yahoo.document.DocumentUtil; -import com.yahoo.log.LogLevel; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - -/** - * @author Einar M R Rosenvinge - */ -class DocprocThreadManager { - - private static Logger log = Logger.getLogger(DocprocThreadManager.class.getName()); - - private final long maxConcurrentByteSize; - private final AtomicLong bytesStarted = new AtomicLong(0); - private final AtomicLong bytesFinished = new AtomicLong(0); - - DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb) { - this((long) (((double) DocumentUtil.calculateMaxPendingSize(maxConcurrentFactor, documentExpansionFactor, - containerCoreMemoryMb)) * maxConcurrentFactor)); - } - - DocprocThreadManager(long maxConcurrentByteSize) { - final int MINCONCURRENTBYTES=256*1024*1024; //256M - if (maxConcurrentByteSize < MINCONCURRENTBYTES) { - maxConcurrentByteSize = MINCONCURRENTBYTES; - } - - this.maxConcurrentByteSize = maxConcurrentByteSize; - log.log(LogLevel.CONFIG, "Docproc service allowed to concurrently process " - + (((double) maxConcurrentByteSize) / 1024.0d / 1024.0d) + " megabytes of input data."); - } - - boolean isAboveLimit() { - return (bytesFinished.get() - bytesStarted.get() > maxConcurrentByteSize); - } - void beforeExecute(DocumentProcessingTask task) { - bytesStarted.getAndAdd(task.getApproxSize()); - } - - void afterExecute(DocumentProcessingTask task) { - bytesFinished.getAndAdd(task.getApproxSize()); - } - void shutdown() { - } - -} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java deleted file mode 100644 index e1a902c8d5c..00000000000 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc.jdisc; - -import com.yahoo.concurrent.DaemonThreadFactory; -import com.yahoo.log.LogLevel; - -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -/** - * @author Einar M R Rosenvinge - */ -public class DocprocThreadPoolExecutor extends ThreadPoolExecutor { - - private static Logger log = Logger.getLogger(DocprocThreadPoolExecutor.class.getName()); - private DocprocThreadManager threadManager; - - public DocprocThreadPoolExecutor(int maxNumThreads, BlockingQueue<Runnable> queue, DocprocThreadManager threadMgr) { - super((maxNumThreads > 0) ? maxNumThreads : Runtime.getRuntime().availableProcessors(), - (maxNumThreads > 0) ? maxNumThreads : 2048, - 1, TimeUnit.SECONDS, - queue, - new DaemonThreadFactory("docproc-")); - this.threadManager = threadMgr; - allowCoreThreadTimeOut(false); - log.log(LogLevel.DEBUG, "Created docproc thread pool with " + super.getCorePoolSize() + " worker threads."); - } - - @Override - protected void beforeExecute(Thread thread, Runnable runnable) { - threadManager.beforeExecute((DocumentProcessingTask) runnable); - } - - @Override - protected void afterExecute(Runnable runnable, Throwable throwable) { - threadManager.afterExecute((DocumentProcessingTask) runnable); - } - - @Override - public void shutdown() { - super.shutdown(); - threadManager.shutdown(); - } - - @Override - public List<Runnable> shutdownNow() { - List<Runnable> list = super.shutdownNow(); - threadManager.shutdown(); - return list; - } - - boolean isAboveLimit() { - return threadManager.isAboveLimit(); - } - -} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java index 5b7b9d85a91..d34c300b8d0 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java @@ -35,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -52,7 +53,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { private final ComponentRegistry<DocprocService> docprocServiceRegistry; private final ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry; private final ChainRegistry<DocumentProcessor> chainRegistry = new ChainRegistry<>(); - private DocprocThreadPoolExecutor threadPool; + private ThreadPoolExecutor threadPool; private final ScheduledThreadPoolExecutor laterExecutor = new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("docproc-later-")); private ContainerDocumentConfig containerDocConfig; @@ -61,7 +62,7 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry, ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry, ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, - DocprocThreadPoolExecutor threadPool, DocumentTypeManager documentTypeManager, + ThreadPoolExecutor threadPool, DocumentTypeManager documentTypeManager, ChainsModel chainsModel, SchemaMap schemaMap, Statistics statistics, Metric metric, ContainerDocumentConfig containerDocConfig) { @@ -88,28 +89,27 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { } } + private static int computeNumThreads(int maxThreads) { + return (maxThreads > 0) ? maxThreads : Runtime.getRuntime().availableProcessors(); + } + public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry, ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry, ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, DocumentProcessingHandlerParameters params) { this(docprocServiceRegistry, documentProcessorComponentRegistry, docFactoryRegistry, - new DocprocThreadPoolExecutor(params.getMaxNumThreads(), - chooseQueueType(params.getMaxNumThreads()), - new DocprocThreadManager(params.getMaxConcurrentFactor(), - params.getDocumentExpansionFactor(), - params.getContainerCoreMemoryMb())), + new ThreadPoolExecutor(computeNumThreads(params.getMaxNumThreads()), + computeNumThreads(params.getMaxNumThreads()), + 0,TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new DaemonThreadFactory("docproc-") + ), params.getDocumentTypeManager(), params.getChainsModel(), params.getSchemaMap(), params.getStatisticsManager(), params.getMetric(), params.getContainerDocConfig()); } - private static BlockingQueue<Runnable> chooseQueueType(int maxNumThreads) { - return (maxNumThreads > 0) - ? new LinkedBlockingQueue<>() - : new SynchronousQueue<>(); - } - @Inject public DocumentProcessingHandler(ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry, ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, @@ -124,9 +124,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { this(new ComponentRegistry<>(), documentProcessorComponentRegistry, docFactoryRegistry, new DocumentProcessingHandlerParameters().setMaxNumThreads (docprocConfig.numthreads()) - .setMaxConcurrentFactor(containerMbusConfig.maxConcurrentFactor()) - .setDocumentExpansionFactor(containerMbusConfig.documentExpansionFactor()) - .setContainerCoreMemoryMb(containerMbusConfig.containerCoreMemory()) .setDocumentTypeManager(new DocumentTypeManager(docManConfig)) .setChainsModel(buildFromConfig(chainsConfig)).setSchemaMap(configureMapping(mappingConfig)) .setStatisticsManager(manager) @@ -198,14 +195,10 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { } private void submit(DocumentProcessingTask task) { - if (threadPool.isAboveLimit()) { + try { + threadPool.execute(task); + } catch (RejectedExecutionException ree) { task.queueFull(); - } else { - try { - threadPool.execute(task); - } catch (RejectedExecutionException ree) { - task.queueFull(); - } } } diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java index bf308e39218..b8a6aa9c105 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java @@ -18,9 +18,6 @@ import com.yahoo.statistics.Statistics; public class DocumentProcessingHandlerParameters { private int maxNumThreads = 0; - private double maxConcurrentFactor = 0.2; - private double documentExpansionFactor = 20.0; - private int containerCoreMemoryMb = 50; private DocumentTypeManager documentTypeManager = null; private ChainsModel chainsModel = null; private SchemaMap schemaMap = null; @@ -28,22 +25,7 @@ public class DocumentProcessingHandlerParameters { private Metric metric = new NullMetric(); private ContainerDocumentConfig containerDocConfig; - public DocumentProcessingHandlerParameters() { - } - - /** - * Returns the number of megabytes of memory reserved for container core classes and data. - * - * @return the number of megabytes of memory reserved for container core classes and data. - */ - public int getContainerCoreMemoryMb() { - return containerCoreMemoryMb; - } - public DocumentProcessingHandlerParameters setContainerCoreMemoryMb(int containerCoreMemoryMb) { - this.containerCoreMemoryMb = containerCoreMemoryMb; - return this; - } public Metric getMetric() { return metric; @@ -55,36 +37,6 @@ public class DocumentProcessingHandlerParameters { } /** - * Returns the document expansion factor, i.e. by what factor a serialized and possibly compressed - * input document is expected to expand during deserialization, including any temporary memory needed - * when processing it. - * - * @return the document expansion factor. - */ - public double getDocumentExpansionFactor() { - return documentExpansionFactor; - } - - public DocumentProcessingHandlerParameters setDocumentExpansionFactor(double documentExpansionFactor) { - this.documentExpansionFactor = documentExpansionFactor; - return this; - } - - /** - * Returns the max concurrent factor. - * - * @return the max concurrent factor. - */ - public double getMaxConcurrentFactor() { - return maxConcurrentFactor; - } - - public DocumentProcessingHandlerParameters setMaxConcurrentFactor(double maxConcurrentFactor) { - this.maxConcurrentFactor = maxConcurrentFactor; - return this; - } - - /** * Returns the maximum number of thread that the thread pool will ever attempt to run simultaneously. * * @return the maximum number of thread that the thread pool will ever attempt to run simultaneously. diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java deleted file mode 100644 index a74fb9a3edf..00000000000 --- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.docproc.jdisc; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> - */ -public class DocprocThreadPoolExecutorTestCase { - private final Set<Long> threadIds = Collections.synchronizedSet(new HashSet<Long>()); - - @Test - public void threadPool() throws InterruptedException { - int numThreads = 8; - int numTasks = 200; - - LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<>(); - DocprocThreadManager mgr = new DocprocThreadManager(1000l); - DocprocThreadPoolExecutor pool = new DocprocThreadPoolExecutor(numThreads, q, mgr); - - List<MockedDocumentProcessingTask> tasks = new ArrayList<>(numTasks); - for (int i = 0; i < numTasks; i++) { - tasks.add(new MockedDocumentProcessingTask()); - } - - for (int i = 0; i < numTasks; i++) { - pool.execute(tasks.get(i)); - } - pool.shutdown(); - pool.awaitTermination(120L, TimeUnit.SECONDS); - - for (int i = 0; i < numTasks; i++) { - assertTrue(tasks.get(i).hasBeenRun()); - } - - System.err.println(threadIds); - assertEquals(numThreads, threadIds.size()); - } - - private class MockedDocumentProcessingTask extends DocumentProcessingTask { - private boolean hasBeenRun = false; - - public MockedDocumentProcessingTask() { - super(null, null, null); - } - - @Override - public void run() { - threadIds.add(Thread.currentThread().getId()); - System.err.println(System.currentTimeMillis() + " MOCK Thread " + Thread.currentThread().getId() + " running task " + this); - for (int i = 0; i < 100000; i++) { - Math.sin((double) (System.currentTimeMillis() / 10000L)); - } - System.err.println(System.currentTimeMillis() + " MOCK Thread " + Thread.currentThread().getId() + " DONE task " + this); - hasBeenRun = true; - } - - @Override - public int getApproxSize() { - return 333; - } - - @Override - public String toString() { - return "seqNum " + getSeqNum(); - } - - public boolean hasBeenRun() { - return hasBeenRun; - } - } -} |