summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-09-30 17:08:41 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-09-30 17:08:41 +0200
commit34dd2f7153f4a6cd1ea0b802f28cde8abdd48707 (patch)
tree6c6d950bc1bf7c0b1421a78ecaeb07f147580b2e /documentapi
parentc725da6bc08a2caf0767b8ae4d63fc98c367cd67 (diff)
Less unusable options.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java164
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java49
2 files changed, 0 insertions, 213 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java b/documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java
deleted file mode 100644
index 579cdaced04..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/ThroughputLimitQueue.java
+++ /dev/null
@@ -1,164 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi;
-
-import com.yahoo.concurrent.SystemTimer;
-import com.yahoo.concurrent.Timer;
-
-import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-/**
- * Queue that limits it's size based on the throughput. Allows the queue to keep a certain number of
- * seconds of processing in its queue.
- */
-public class ThroughputLimitQueue<M> extends LinkedBlockingQueue<M> {
- private static Logger log = Logger.getLogger(ThroughputLimitQueue.class.getName());
-
- double averageWaitTime = 0;
- long maxWaitTime = 0;
- long startTime;
- int capacity = 2;
- Timer timer;
-
- /**
- * Creates a new queue.
- *
- * @param queueSizeInMs The maximum time we wish to have objects waiting in the queue.
- */
- public ThroughputLimitQueue(long queueSizeInMs) {
- this(SystemTimer.INSTANCE, queueSizeInMs);
- }
-
- /**
- * Creates a new queue. Used for unit testing.
- *
- * @param t Used to measure time spent in the queue. Subclass for unit testing, or use SystemTimer for regular use.
- * @param queueSizeInMs The maximum time we wish to have objects waiting in the queue.
- */
- public ThroughputLimitQueue(Timer t, long queueSizeInMs) {
- maxWaitTime = queueSizeInMs;
- timer = t;
- }
-
- // Doc inherited from BlockingQueue
- public boolean add(M m) {
- if (!offer(m)) {
- throw new IllegalStateException("Queue full");
- }
- return true;
- }
-
- // Doc inherited from BlockingQueue
- public boolean offer(M m) {
- return remainingCapacity() > 0 && super.offer(m);
- }
-
- /**
- * Calculates the average waiting time and readjusts the queue capacity.
- *
- * @param m The last message that was read from queue, if any.
- * @return Returns m.
- */
- private M calculateAverage(M m) {
- if (m == null) {
- startTime = 0;
- return null;
- }
-
- if (startTime != 0) {
- long waitTime = timer.milliTime() - startTime;
-
- if (averageWaitTime == 0) {
- averageWaitTime = waitTime;
- } else {
- averageWaitTime = 0.99 * averageWaitTime + 0.01 * waitTime;
- }
-
- int newCapacity = Math.max(2, (int)Math.round(maxWaitTime / averageWaitTime));
- if (newCapacity != capacity) {
- log.fine("Capacity of throughput queue changed from " + capacity + " to " + newCapacity);
- capacity = newCapacity;
- }
- }
-
- if (!isEmpty()) {
- startTime = timer.milliTime();
- } else {
- startTime = 0;
- }
-
- return m;
- }
-
- // Doc inherited from BlockingQueue
- public M poll() {
- return calculateAverage(super.poll());
- }
-
- // Doc inherited from BlockingQueue
- public void put(M m) throws InterruptedException {
- offer(m, Long.MAX_VALUE, TimeUnit.SECONDS);
- }
-
- // Doc inherited from BlockingQueue
- public boolean offer(M m, long l, TimeUnit timeUnit) throws InterruptedException {
- long timeWaited = 0;
- while (timeWaited < timeUnit.toMillis(l)) {
- if (offer(m)) {
- return true;
- }
-
- Thread.sleep(10);
- timeWaited += 10;
- }
-
- return false;
- }
-
- // Doc inherited from BlockingQueue
- public M take() throws InterruptedException {
- return poll(Long.MAX_VALUE, TimeUnit.SECONDS);
- }
-
- // Doc inherited from BlockingQueue
- public M poll(long l, TimeUnit timeUnit) throws InterruptedException {
- long timeWaited = 0;
- while (timeWaited < timeUnit.toMillis(l)) {
- M elem = poll();
- if (elem != null) {
- return elem;
- }
-
- Thread.sleep(10);
- timeWaited += 10;
- }
-
- return null;
- }
-
- /**
- * @return Returns the maximum capacity of the queue
- */
- public int capacity() {
- return capacity;
- }
-
- // Doc inherited from BlockingQueue
- public int remainingCapacity() {
- int sz = capacity - size();
- return (sz > 0) ? sz : 0;
- }
-
- // Doc inherited from BlockingQueue
- public boolean addAll(Collection<? extends M> ms) {
- for (M m : ms) {
- if (!offer(m)) {
- return false;
- }
- }
-
- return true;
- }
-}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java
deleted file mode 100644
index 75e98bc895a..00000000000
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/ThroughputLimitQueueTestCase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.test;
-
-import com.yahoo.documentapi.ThroughputLimitQueue;
-import com.yahoo.concurrent.Timer;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author thomasg
- */
-public class ThroughputLimitQueueTestCase {
-
- class TestTimer implements Timer {
- public long milliTime = 0;
-
- public long milliTime() {
- return milliTime;
- }
- }
-
- @Test
- public void testCapacity() {
- TestTimer t = new TestTimer();
- t.milliTime = 10;
- ThroughputLimitQueue<Object> q = new ThroughputLimitQueue<Object>(t, 2000);
-
- q.add(new Object());
- q.add(new Object());
- q.remove();
- t.milliTime += 10;
- q.remove();
-
- assertEquals(200, q.capacity());
-
- for (int i = 0; i < 1000; i++) {
- q.add(new Object());
- q.add(new Object());
- t.milliTime += 100;
- q.remove();
- t.milliTime += 100;
- q.remove();
- }
-
- assertEquals(20, q.capacity());
- }
-
-}