summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-09 13:46:52 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-09 13:46:52 +0100
commitc90866eff408818f0735dad286e8eb4b14990d41 (patch)
tree533124d458d5cdfb1a229b09bca59a1084731911 /vespaclient-core
parent27dfcb0237f8be5b806d3d17a8aa875b586d2854 (diff)
Simplify since there is no longer any race.
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java52
1 files changed, 21 insertions, 31 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 2a7fe3e61ad..85e4d700fce 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -7,6 +7,7 @@ import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.*;
import com.yahoo.clientmetrics.RouteMetricSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Logger;
@@ -143,15 +144,15 @@ public class SharedSender implements ReplyHandler {
if (state == null) {
OwnerState newState = new OwnerState();
state = activeOwners.putIfAbsent(owner, newState);
- if (state == null) {
- state = newState;
- }
+
}
- if (maxPendingPerOwner != -1 && blockingQueue) {
- state.waitMaxPendingbelow(maxPendingPerOwner);
+ if (state != null) {
+ if (maxPendingPerOwner != -1 && blockingQueue) {
+ state.waitMaxPendingbelow(maxPendingPerOwner);
+ }
+ state.addPending(1);
}
- state.addPending(1);
msg.setContext(owner);
try {
@@ -202,8 +203,8 @@ public class SharedSender implements ReplyHandler {
}
private static final class Sync extends AbstractQueuedSynchronizer {
- Sync() {
- setState(0);
+ Sync(int count) {
+ setState(1);
}
int getCount() {
@@ -217,24 +218,23 @@ public class SharedSender implements ReplyHandler {
@Override
protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
+ // Increment/Decrement count; signal when transition downwards to zero.
+ while ( true ) {
int c = getState();
- if (c == 0)
- return false;
+ if ((c == 0) && (releases >= 0)) { return false; }
int nextc = (c > releases) ? c - releases : 0;
- if (compareAndSetState(c, nextc))
+ if (compareAndSetState(c, nextc)) {
return nextc == 0;
+ }
}
}
}
- public static class OwnerState {
- private static final long MS2NS = 1000000;
- private static final long REACT_LATENCY_ON_RACE = 5 *MS2NS;
- private static final long REACT_LATENCY_ON_WATERMARK = 5 *MS2NS;
+ private static final class OwnerState {
- private final Sync sync = new Sync();
+ private static final long REACT_LATENCY_ON_WATERMARK_MS = 5;
+
+ private final Sync sync = new Sync(1);
void addPending(int count) {
sync.releaseShared(-count);
@@ -247,7 +247,7 @@ public class SharedSender implements ReplyHandler {
void waitMaxPendingbelow(int limit) {
try {
while (getNumPending() > limit) {
- sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_WATERMARK);
+ sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(REACT_LATENCY_ON_WATERMARK_MS));
}
} catch (InterruptedException e) {
}
@@ -262,21 +262,11 @@ public class SharedSender implements ReplyHandler {
}
boolean waitPending(long timeoutMS) throws InterruptedException {
- long timeStart = SystemTimer.INSTANCE.milliTime();
- long timeLeft = timeoutMS;
-
- while ((getNumPending() > 0) && (timeLeft > 0)) {
- sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE);
- timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart);
- }
-
- return getNumPending() == 0;
+ return sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(timeoutMS));
}
void waitPending() throws InterruptedException {
- while (getNumPending() > 0) {
- sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE);
- }
+ sync.tryAcquireShared(1);
}
}