diff options
6 files changed, 7 insertions, 261 deletions
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java index 4fb3c8f0913..6fd4beac056 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java @@ -1,11 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc.jdisc; -import com.yahoo.docproc.jdisc.metric.NullMetric; import com.yahoo.document.DocumentUtil; -import com.yahoo.jdisc.Metric; import com.yahoo.log.LogLevel; -import com.yahoo.statistics.*; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -22,12 +19,6 @@ class DocprocThreadManager { private final AtomicLong bytesFinished = new AtomicLong(0); DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb) { - this(maxConcurrentFactor, documentExpansionFactor, containerCoreMemoryMb, Statistics.nullImplementation, - new NullMetric()); - } - - DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb, - Statistics statistics, Metric metric) { this((long) (((double) DocumentUtil.calculateMaxPendingSize(maxConcurrentFactor, documentExpansionFactor, containerCoreMemoryMb)) * maxConcurrentFactor)); } diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java index ec74fb9246d..5b7b9d85a91 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java @@ -19,7 +19,6 @@ import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext; import com.yahoo.docproc.proxy.SchemaMap; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.documentapi.ThroughputLimitQueue; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.handler.AbstractRequestHandler; @@ -33,7 +32,6 @@ import com.yahoo.statistics.Statistics; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; @@ -96,29 +94,20 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { DocumentProcessingHandlerParameters params) { this(docprocServiceRegistry, documentProcessorComponentRegistry, docFactoryRegistry, new DocprocThreadPoolExecutor(params.getMaxNumThreads(), - chooseQueueType(params), + chooseQueueType(params.getMaxNumThreads()), new DocprocThreadManager(params.getMaxConcurrentFactor(), params.getDocumentExpansionFactor(), - params.getContainerCoreMemoryMb(), - params.getStatisticsManager(), - params.getMetric())), + params.getContainerCoreMemoryMb())), params.getDocumentTypeManager(), params.getChainsModel(), params.getSchemaMap(), params.getStatisticsManager(), params.getMetric(), params.getContainerDocConfig()); } - private static BlockingQueue<Runnable> chooseQueueType(DocumentProcessingHandlerParameters params) { - if (params.getMaxQueueTimeMs() > 0) { - return new ThroughputLimitQueue<>(params.getMaxQueueTimeMs()); - } - if (params.getMaxQueueTimeMs() == 0) { - return new PriorityBlockingQueue<>(); // Probably no need to bound this queue, see bug #4254537 - } - if (params.getMaxNumThreads() > 0) { - return new LinkedBlockingQueue<>(); - } - return new SynchronousQueue<>(); + private static BlockingQueue<Runnable> chooseQueueType(int maxNumThreads) { + return (maxNumThreads > 0) + ? new LinkedBlockingQueue<>() + : new SynchronousQueue<>(); } @Inject @@ -138,7 +127,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler { .setMaxConcurrentFactor(containerMbusConfig.maxConcurrentFactor()) .setDocumentExpansionFactor(containerMbusConfig.documentExpansionFactor()) .setContainerCoreMemoryMb(containerMbusConfig.containerCoreMemory()) - .setMaxQueueTimeMs(docprocConfig.maxqueuetimems()) .setDocumentTypeManager(new DocumentTypeManager(docManConfig)) .setChainsModel(buildFromConfig(chainsConfig)).setSchemaMap(configureMapping(mappingConfig)) .setStatisticsManager(manager) diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java index cec943306c8..bf308e39218 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java @@ -21,7 +21,6 @@ public class DocumentProcessingHandlerParameters { private double maxConcurrentFactor = 0.2; private double documentExpansionFactor = 20.0; private int containerCoreMemoryMb = 50; - private long maxQueueTimeMs = 0; private DocumentTypeManager documentTypeManager = null; private ChainsModel chainsModel = null; private SchemaMap schemaMap = null; @@ -86,21 +85,6 @@ public class DocumentProcessingHandlerParameters { } /** - * Returns the maximum time (in milliseconds) that a document may stay in the input queue. The default value - * of 0 disables this functionality. - * - * @return the maximum time (in milliseconds) that a document may stay in the input queue. - */ - public long getMaxQueueTimeMs() { - return maxQueueTimeMs; - } - - public DocumentProcessingHandlerParameters setMaxQueueTimeMs(long maxQueueTimeMs) { - this.maxQueueTimeMs = maxQueueTimeMs; - return this; - } - - /** * Returns the maximum number of thread that the thread pool will ever attempt to run simultaneously. * * @return the maximum number of thread that the thread pool will ever attempt to run simultaneously. diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java b/documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java deleted file mode 100644 index 579cdaced04..00000000000 --- a/documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi; - -import com.yahoo.concurrent.SystemTimer; -import com.yahoo.concurrent.Timer; - -import java.util.Collection; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -/** - * Queue that limits it's size based on the throughput. Allows the queue to keep a certain number of - * seconds of processing in its queue. - */ -public class ThroughputLimitQueue<M> extends LinkedBlockingQueue<M> { - private static Logger log = Logger.getLogger(ThroughputLimitQueue.class.getName()); - - double averageWaitTime = 0; - long maxWaitTime = 0; - long startTime; - int capacity = 2; - Timer timer; - - /** - * Creates a new queue. - * - * @param queueSizeInMs The maximum time we wish to have objects waiting in the queue. - */ - public ThroughputLimitQueue(long queueSizeInMs) { - this(SystemTimer.INSTANCE, queueSizeInMs); - } - - /** - * Creates a new queue. Used for unit testing. - * - * @param t Used to measure time spent in the queue. Subclass for unit testing, or use SystemTimer for regular use. - * @param queueSizeInMs The maximum time we wish to have objects waiting in the queue. - */ - public ThroughputLimitQueue(Timer t, long queueSizeInMs) { - maxWaitTime = queueSizeInMs; - timer = t; - } - - // Doc inherited from BlockingQueue - public boolean add(M m) { - if (!offer(m)) { - throw new IllegalStateException("Queue full"); - } - return true; - } - - // Doc inherited from BlockingQueue - public boolean offer(M m) { - return remainingCapacity() > 0 && super.offer(m); - } - - /** - * Calculates the average waiting time and readjusts the queue capacity. - * - * @param m The last message that was read from queue, if any. - * @return Returns m. - */ - private M calculateAverage(M m) { - if (m == null) { - startTime = 0; - return null; - } - - if (startTime != 0) { - long waitTime = timer.milliTime() - startTime; - - if (averageWaitTime == 0) { - averageWaitTime = waitTime; - } else { - averageWaitTime = 0.99 * averageWaitTime + 0.01 * waitTime; - } - - int newCapacity = Math.max(2, (int)Math.round(maxWaitTime / averageWaitTime)); - if (newCapacity != capacity) { - log.fine("Capacity of throughput queue changed from " + capacity + " to " + newCapacity); - capacity = newCapacity; - } - } - - if (!isEmpty()) { - startTime = timer.milliTime(); - } else { - startTime = 0; - } - - return m; - } - - // Doc inherited from BlockingQueue - public M poll() { - return calculateAverage(super.poll()); - } - - // Doc inherited from BlockingQueue - public void put(M m) throws InterruptedException { - offer(m, Long.MAX_VALUE, TimeUnit.SECONDS); - } - - // Doc inherited from BlockingQueue - public boolean offer(M m, long l, TimeUnit timeUnit) throws InterruptedException { - long timeWaited = 0; - while (timeWaited < timeUnit.toMillis(l)) { - if (offer(m)) { - return true; - } - - Thread.sleep(10); - timeWaited += 10; - } - - return false; - } - - // Doc inherited from BlockingQueue - public M take() throws InterruptedException { - return poll(Long.MAX_VALUE, TimeUnit.SECONDS); - } - - // Doc inherited from BlockingQueue - public M poll(long l, TimeUnit timeUnit) throws InterruptedException { - long timeWaited = 0; - while (timeWaited < timeUnit.toMillis(l)) { - M elem = poll(); - if (elem != null) { - return elem; - } - - Thread.sleep(10); - timeWaited += 10; - } - - return null; - } - - /** - * @return Returns the maximum capacity of the queue - */ - public int capacity() { - return capacity; - } - - // Doc inherited from BlockingQueue - public int remainingCapacity() { - int sz = capacity - size(); - return (sz > 0) ? sz : 0; - } - - // Doc inherited from BlockingQueue - public boolean addAll(Collection<? extends M> ms) { - for (M m : ms) { - if (!offer(m)) { - return false; - } - } - - return true; - } -} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java deleted file mode 100644 index 75e98bc895a..00000000000 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.test; - -import com.yahoo.documentapi.ThroughputLimitQueue; -import com.yahoo.concurrent.Timer; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * @author thomasg - */ -public class ThroughputLimitQueueTestCase { - - class TestTimer implements Timer { - public long milliTime = 0; - - public long milliTime() { - return milliTime; - } - } - - @Test - public void testCapacity() { - TestTimer t = new TestTimer(); - t.milliTime = 10; - ThroughputLimitQueue<Object> q = new ThroughputLimitQueue<Object>(t, 2000); - - q.add(new Object()); - q.add(new Object()); - q.remove(); - t.milliTime += 10; - q.remove(); - - assertEquals(200, q.capacity()); - - for (int i = 0; i < 1000; i++) { - q.add(new Object()); - q.add(new Object()); - t.milliTime += 100; - q.remove(); - t.milliTime += 100; - q.remove(); - } - - assertEquals(20, q.capacity()); - } - -} diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java index b1fd8e8cffa..a32aa75d994 100755 --- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java +++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java @@ -2,7 +2,6 @@ package com.yahoo.dummyreceiver; import com.yahoo.concurrent.DaemonThreadFactory; -import com.yahoo.documentapi.ThroughputLimitQueue; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; @@ -42,7 +41,6 @@ public class DummyReceiver implements MessageHandler { private boolean instant = false; private ThreadPoolExecutor executor = null; private int threads = 10; - private long maxQueueTime = -1; private BlockingQueue<Runnable> queue; private boolean verbose = false; private boolean helpOption = false; @@ -76,7 +74,7 @@ public class DummyReceiver implements MessageHandler { params.getMessageBusParams().setMaxPendingCount(0); params.getMessageBusParams().setMaxPendingSize(0); da = new MessageBusDocumentAccess(params); - queue = (maxQueueTime < 0) ? new LinkedBlockingDeque<>() : new ThroughputLimitQueue<>(maxQueueTime); + queue = new LinkedBlockingDeque<>(); session = da.getMessageBus().createDestinationSession("default", true, this); executor = new ThreadPoolExecutor(threads, threads, 5, TimeUnit.SECONDS, queue, new DaemonThreadFactory()); System.out.println("Registered listener at " + name + "/default with 0 max pending and sleep time of " + sleepTime); @@ -156,8 +154,6 @@ public class DummyReceiver implements MessageHandler { instant = true; } else if ("--silent".equals(arg)) { silentNum = Long.parseLong(getParam(args, arg)); - } else if ("--maxqueuetime".equals(arg)) { - maxQueueTime = Long.parseLong(getParam(args, arg)); } else if ("--threads".equals(arg)) { threads = Integer.parseInt(getParam(args, arg)); } else if ("--verbose".equals(arg)) { |