summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-09 11:57:46 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-09 11:57:46 +0100
commit27dfcb0237f8be5b806d3d17a8aa875b586d2854 (patch)
treef8d9f3afc6c8ca60e36e516b111d49c2d26d4fd8 /vespaclient-core
parentef63ebdf34df4b97d8cc54afedd32c55a3d9fabf (diff)
Git commit -a -m "Implment an AbstractQueuedSynchronizer do handle up/down counting.
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java82
1 files changed, 49 insertions, 33 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index cb2fe2dee83..2a7fe3e61ad 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -7,7 +7,7 @@ import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.*;
import com.yahoo.clientmetrics.RouteMetricSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Logger;
/**
@@ -23,8 +23,6 @@ public class SharedSender implements ReplyHandler {
private SendSession sender;
private RouteMetricSet metrics;
- private static final int REACT_LATENCY_ON_RACE = 5;
- private static final int REACT_LATENCY_ON_WATERMARK = 5;
private ConcurrentHashMap<ResultCallback, OwnerState> activeOwners = new ConcurrentHashMap<>();
@@ -203,63 +201,81 @@ public class SharedSender implements ReplyHandler {
}
}
- public static class OwnerState {
+ private static final class Sync extends AbstractQueuedSynchronizer {
+ Sync() {
+ setState(0);
+ }
- final AtomicInteger numPending = new AtomicInteger(0);
+ int getCount() {
+ return getState();
+ }
- int addPending(int count) {
- return numPending.addAndGet(count);
+ @Override
+ protected int tryAcquireShared(int acquires) {
+ return (getState() == 0) ? 1 : -1;
}
- int decPending(int count) {
- int newValue = numPending.addAndGet(-count);
- if (newValue <= 0) {
- synchronized (numPending) {
- numPending.notify();
- }
+ @Override
+ protected boolean tryReleaseShared(int releases) {
+ // Decrement count; signal when transition to zero
+ for (;;) {
+ int c = getState();
+ if (c == 0)
+ return false;
+ int nextc = (c > releases) ? c - releases : 0;
+ if (compareAndSetState(c, nextc))
+ return nextc == 0;
}
- return newValue;
+ }
+ }
+
+ public static class OwnerState {
+ private static final long MS2NS = 1000000;
+ private static final long REACT_LATENCY_ON_RACE = 5 *MS2NS;
+ private static final long REACT_LATENCY_ON_WATERMARK = 5 *MS2NS;
+
+ private final Sync sync = new Sync();
+
+ void addPending(int count) {
+ sync.releaseShared(-count);
+ }
+
+ void decPending(int count) {
+ sync.releaseShared(count);
}
void waitMaxPendingbelow(int limit) {
try {
- synchronized (numPending) {
- while (numPending.get() > limit) {
- numPending.wait(REACT_LATENCY_ON_WATERMARK);
- }
+ while (getNumPending() > limit) {
+ sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_WATERMARK);
}
} catch (InterruptedException e) {
}
}
int getNumPending() {
- return numPending.get();
+ return sync.getCount();
}
void clearPending() {
- numPending.set(0);
- synchronized (numPending) {
- numPending.notify();
- }
+ sync.releaseShared(0);
}
boolean waitPending(long timeoutMS) throws InterruptedException {
long timeStart = SystemTimer.INSTANCE.milliTime();
long timeLeft = timeoutMS;
- synchronized (numPending) {
- while ((numPending.get() > 0) && (timeLeft > 0)) {
- numPending.wait(REACT_LATENCY_ON_RACE);
- timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart);
- }
+
+ while ((getNumPending() > 0) && (timeLeft > 0)) {
+ sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE);
+ timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart);
}
- return numPending.get() <= 0;
+
+ return getNumPending() == 0;
}
void waitPending() throws InterruptedException {
- synchronized (numPending) {
- while (numPending.get() > 0) {
- numPending.wait(REACT_LATENCY_ON_RACE);
- }
+ while (getNumPending() > 0) {
+ sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE);
}
}
}