summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java9
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java24
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java16
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java164
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java49
-rwxr-xr-xvespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java6
6 files changed, 7 insertions, 261 deletions
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java
index 4fb3c8f0913..6fd4beac056 100644
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java
@@ -1,11 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.docproc.jdisc;
-import com.yahoo.docproc.jdisc.metric.NullMetric;
import com.yahoo.document.DocumentUtil;
-import com.yahoo.jdisc.Metric;
import com.yahoo.log.LogLevel;
-import com.yahoo.statistics.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -22,12 +19,6 @@ class DocprocThreadManager {
private final AtomicLong bytesFinished = new AtomicLong(0);
DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb) {
- this(maxConcurrentFactor, documentExpansionFactor, containerCoreMemoryMb, Statistics.nullImplementation,
- new NullMetric());
- }
-
- DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb,
- Statistics statistics, Metric metric) {
this((long) (((double) DocumentUtil.calculateMaxPendingSize(maxConcurrentFactor, documentExpansionFactor,
containerCoreMemoryMb)) * maxConcurrentFactor));
}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
index ec74fb9246d..5b7b9d85a91 100644
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
@@ -19,7 +19,6 @@ import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext;
import com.yahoo.docproc.proxy.SchemaMap;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.documentapi.ThroughputLimitQueue;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.handler.AbstractRequestHandler;
@@ -33,7 +32,6 @@ import com.yahoo.statistics.Statistics;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
@@ -96,29 +94,20 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
DocumentProcessingHandlerParameters params) {
this(docprocServiceRegistry, documentProcessorComponentRegistry, docFactoryRegistry,
new DocprocThreadPoolExecutor(params.getMaxNumThreads(),
- chooseQueueType(params),
+ chooseQueueType(params.getMaxNumThreads()),
new DocprocThreadManager(params.getMaxConcurrentFactor(),
params.getDocumentExpansionFactor(),
- params.getContainerCoreMemoryMb(),
- params.getStatisticsManager(),
- params.getMetric())),
+ params.getContainerCoreMemoryMb())),
params.getDocumentTypeManager(), params.getChainsModel(), params.getSchemaMap(),
params.getStatisticsManager(),
params.getMetric(),
params.getContainerDocConfig());
}
- private static BlockingQueue<Runnable> chooseQueueType(DocumentProcessingHandlerParameters params) {
- if (params.getMaxQueueTimeMs() > 0) {
- return new ThroughputLimitQueue<>(params.getMaxQueueTimeMs());
- }
- if (params.getMaxQueueTimeMs() == 0) {
- return new PriorityBlockingQueue<>(); // Probably no need to bound this queue, see bug #4254537
- }
- if (params.getMaxNumThreads() > 0) {
- return new LinkedBlockingQueue<>();
- }
- return new SynchronousQueue<>();
+ private static BlockingQueue<Runnable> chooseQueueType(int maxNumThreads) {
+ return (maxNumThreads > 0)
+ ? new LinkedBlockingQueue<>()
+ : new SynchronousQueue<>();
}
@Inject
@@ -138,7 +127,6 @@ public class DocumentProcessingHandler extends AbstractRequestHandler {
.setMaxConcurrentFactor(containerMbusConfig.maxConcurrentFactor())
.setDocumentExpansionFactor(containerMbusConfig.documentExpansionFactor())
.setContainerCoreMemoryMb(containerMbusConfig.containerCoreMemory())
- .setMaxQueueTimeMs(docprocConfig.maxqueuetimems())
.setDocumentTypeManager(new DocumentTypeManager(docManConfig))
.setChainsModel(buildFromConfig(chainsConfig)).setSchemaMap(configureMapping(mappingConfig))
.setStatisticsManager(manager)
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java
index cec943306c8..bf308e39218 100644
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java
@@ -21,7 +21,6 @@ public class DocumentProcessingHandlerParameters {
private double maxConcurrentFactor = 0.2;
private double documentExpansionFactor = 20.0;
private int containerCoreMemoryMb = 50;
- private long maxQueueTimeMs = 0;
private DocumentTypeManager documentTypeManager = null;
private ChainsModel chainsModel = null;
private SchemaMap schemaMap = null;
@@ -86,21 +85,6 @@ public class DocumentProcessingHandlerParameters {
}
/**
- * Returns the maximum time (in milliseconds) that a document may stay in the input queue.&nbsp;The default value
- * of 0 disables this functionality.
- *
- * @return the maximum time (in milliseconds) that a document may stay in the input queue.
- */
- public long getMaxQueueTimeMs() {
- return maxQueueTimeMs;
- }
-
- public DocumentProcessingHandlerParameters setMaxQueueTimeMs(long maxQueueTimeMs) {
- this.maxQueueTimeMs = maxQueueTimeMs;
- return this;
- }
-
- /**
* Returns the maximum number of thread that the thread pool will ever attempt to run simultaneously.
*
* @return the maximum number of thread that the thread pool will ever attempt to run simultaneously.
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java b/documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java
deleted file mode 100644
index 579cdaced04..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java
+++ /dev/null
@@ -1,164 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi;
-
-import com.yahoo.concurrent.SystemTimer;
-import com.yahoo.concurrent.Timer;
-
-import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-/**
- * Queue that limits it's size based on the throughput. Allows the queue to keep a certain number of
- * seconds of processing in its queue.
- */
-public class ThroughputLimitQueue<M> extends LinkedBlockingQueue<M> {
- private static Logger log = Logger.getLogger(ThroughputLimitQueue.class.getName());
-
- double averageWaitTime = 0;
- long maxWaitTime = 0;
- long startTime;
- int capacity = 2;
- Timer timer;
-
- /**
- * Creates a new queue.
- *
- * @param queueSizeInMs The maximum time we wish to have objects waiting in the queue.
- */
- public ThroughputLimitQueue(long queueSizeInMs) {
- this(SystemTimer.INSTANCE, queueSizeInMs);
- }
-
- /**
- * Creates a new queue. Used for unit testing.
- *
- * @param t Used to measure time spent in the queue. Subclass for unit testing, or use SystemTimer for regular use.
- * @param queueSizeInMs The maximum time we wish to have objects waiting in the queue.
- */
- public ThroughputLimitQueue(Timer t, long queueSizeInMs) {
- maxWaitTime = queueSizeInMs;
- timer = t;
- }
-
- // Doc inherited from BlockingQueue
- public boolean add(M m) {
- if (!offer(m)) {
- throw new IllegalStateException("Queue full");
- }
- return true;
- }
-
- // Doc inherited from BlockingQueue
- public boolean offer(M m) {
- return remainingCapacity() > 0 && super.offer(m);
- }
-
- /**
- * Calculates the average waiting time and readjusts the queue capacity.
- *
- * @param m The last message that was read from queue, if any.
- * @return Returns m.
- */
- private M calculateAverage(M m) {
- if (m == null) {
- startTime = 0;
- return null;
- }
-
- if (startTime != 0) {
- long waitTime = timer.milliTime() - startTime;
-
- if (averageWaitTime == 0) {
- averageWaitTime = waitTime;
- } else {
- averageWaitTime = 0.99 * averageWaitTime + 0.01 * waitTime;
- }
-
- int newCapacity = Math.max(2, (int)Math.round(maxWaitTime / averageWaitTime));
- if (newCapacity != capacity) {
- log.fine("Capacity of throughput queue changed from " + capacity + " to " + newCapacity);
- capacity = newCapacity;
- }
- }
-
- if (!isEmpty()) {
- startTime = timer.milliTime();
- } else {
- startTime = 0;
- }
-
- return m;
- }
-
- // Doc inherited from BlockingQueue
- public M poll() {
- return calculateAverage(super.poll());
- }
-
- // Doc inherited from BlockingQueue
- public void put(M m) throws InterruptedException {
- offer(m, Long.MAX_VALUE, TimeUnit.SECONDS);
- }
-
- // Doc inherited from BlockingQueue
- public boolean offer(M m, long l, TimeUnit timeUnit) throws InterruptedException {
- long timeWaited = 0;
- while (timeWaited < timeUnit.toMillis(l)) {
- if (offer(m)) {
- return true;
- }
-
- Thread.sleep(10);
- timeWaited += 10;
- }
-
- return false;
- }
-
- // Doc inherited from BlockingQueue
- public M take() throws InterruptedException {
- return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
- }
-
- // Doc inherited from BlockingQueue
- public M poll(long l, TimeUnit timeUnit) throws InterruptedException {
- long timeWaited = 0;
- while (timeWaited < timeUnit.toMillis(l)) {
- M elem = poll();
- if (elem != null) {
- return elem;
- }
-
- Thread.sleep(10);
- timeWaited += 10;
- }
-
- return null;
- }
-
- /**
- * @return Returns the maximum capacity of the queue
- */
- public int capacity() {
- return capacity;
- }
-
- // Doc inherited from BlockingQueue
- public int remainingCapacity() {
- int sz = capacity - size();
- return (sz > 0) ? sz : 0;
- }
-
- // Doc inherited from BlockingQueue
- public boolean addAll(Collection<? extends M> ms) {
- for (M m : ms) {
- if (!offer(m)) {
- return false;
- }
- }
-
- return true;
- }
-}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java
deleted file mode 100644
index 75e98bc895a..00000000000
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.test;
-
-import com.yahoo.documentapi.ThroughputLimitQueue;
-import com.yahoo.concurrent.Timer;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author thomasg
- */
-public class ThroughputLimitQueueTestCase {
-
- class TestTimer implements Timer {
- public long milliTime = 0;
-
- public long milliTime() {
- return milliTime;
- }
- }
-
- @Test
- public void testCapacity() {
- TestTimer t = new TestTimer();
- t.milliTime = 10;
- ThroughputLimitQueue<Object> q = new ThroughputLimitQueue<Object>(t, 2000);
-
- q.add(new Object());
- q.add(new Object());
- q.remove();
- t.milliTime += 10;
- q.remove();
-
- assertEquals(200, q.capacity());
-
- for (int i = 0; i < 1000; i++) {
- q.add(new Object());
- q.add(new Object());
- t.milliTime += 100;
- q.remove();
- t.milliTime += 100;
- q.remove();
- }
-
- assertEquals(20, q.capacity());
- }
-
-}
diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
index b1fd8e8cffa..a32aa75d994 100755
--- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
+++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
@@ -2,7 +2,6 @@
package com.yahoo.dummyreceiver;
import com.yahoo.concurrent.DaemonThreadFactory;
-import com.yahoo.documentapi.ThroughputLimitQueue;
import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
import com.yahoo.documentapi.messagebus.MessageBusParams;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
@@ -42,7 +41,6 @@ public class DummyReceiver implements MessageHandler {
private boolean instant = false;
private ThreadPoolExecutor executor = null;
private int threads = 10;
- private long maxQueueTime = -1;
private BlockingQueue<Runnable> queue;
private boolean verbose = false;
private boolean helpOption = false;
@@ -76,7 +74,7 @@ public class DummyReceiver implements MessageHandler {
params.getMessageBusParams().setMaxPendingCount(0);
params.getMessageBusParams().setMaxPendingSize(0);
da = new MessageBusDocumentAccess(params);
- queue = (maxQueueTime < 0) ? new LinkedBlockingDeque<>() : new ThroughputLimitQueue<>(maxQueueTime);
+ queue = new LinkedBlockingDeque<>();
session = da.getMessageBus().createDestinationSession("default", true, this);
executor = new ThreadPoolExecutor(threads, threads, 5, TimeUnit.SECONDS, queue, new DaemonThreadFactory());
System.out.println("Registered listener at " + name + "/default with 0 max pending and sleep time of " + sleepTime);
@@ -156,8 +154,6 @@ public class DummyReceiver implements MessageHandler {
instant = true;
} else if ("--silent".equals(arg)) {
silentNum = Long.parseLong(getParam(args, arg));
- } else if ("--maxqueuetime".equals(arg)) {
- maxQueueTime = Long.parseLong(getParam(args, arg));
} else if ("--threads".equals(arg)) {
threads = Integer.parseInt(getParam(args, arg));
} else if ("--verbose".equals(arg)) {