summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorArnstein Ressem <aressem@gmail.com>2017-01-09 20:48:11 +0100
committerGitHub <noreply@github.com>2017-01-09 20:48:11 +0100
commit97a3166a34b4fc343bffc9f0f36eed6ff2c806d2 (patch)
tree06ea532997fb5c6b8e399bbbc7027ee6aa2a24e8 /vespaclient-core
parent94dbe38db0cf670ff7ab159627d8a29bd89d4cc9 (diff)
Revert "Revert "Revert "Revert "Revert "Revert "Revert "Revert "Revert "Balder/avoid costly notifyall in sharedsender"""""""""
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java199
1 files changed, 77 insertions, 122 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 08243c4e0ec..48ff0eae36b 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -6,9 +6,9 @@ import com.yahoo.jdisc.Metric;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.*;
import com.yahoo.clientmetrics.RouteMetricSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.Logger;
/**
@@ -23,9 +23,11 @@ public class SharedSender implements ReplyHandler {
public static final Logger log = Logger.getLogger(SharedSender.class.getName());
private SendSession sender;
+ private final Object monitor = new Object();
private RouteMetricSet metrics;
- private ConcurrentHashMap<ResultCallback, OwnerState> activeOwners = new ConcurrentHashMap<>();
+ // Maps from filename to number of pending requests
+ private Map<ResultCallback, Integer> activeOwners = new HashMap<>();
/**
* Creates a new shared sender.
@@ -48,16 +50,17 @@ public class SharedSender implements ReplyHandler {
}
public void remove(ResultCallback owner) {
- OwnerState state = activeOwners.remove(owner);
- if (state != null) {
- state.clearPending();
+ synchronized (monitor) {
+ activeOwners.remove(owner);
}
}
public void shutdown() {
try {
- while ( ! activeOwners.isEmpty()) {
- Thread.sleep(10);
+ synchronized (monitor) {
+ while ( ! activeOwners.isEmpty()) {
+ monitor.wait(180 * 1000);
+ }
}
} catch (InterruptedException e) {
}
@@ -73,21 +76,36 @@ public class SharedSender implements ReplyHandler {
* @return true if there were no more pending, or false if the timeout expired.
*/
public boolean waitForPending(ResultCallback owner, long timeoutMs) {
- OwnerState state = activeOwners.get(owner);
- if (state != null) {
- try {
- return state.waitPending(timeoutMs);
- } catch (InterruptedException e) {
- return false;
+ long timeStart = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeoutMs;
+
+ try {
+ while (timeoutMs == -1 || timeLeft > 0) {
+ synchronized (monitor) {
+ Integer count = activeOwners.get(owner);
+ if (count == null || count == 0) {
+ return true;
+ } else if (timeLeft > 0) {
+ monitor.wait(timeLeft);
+ } else {
+ monitor.wait();
+ }
+ }
+
+ timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart);
}
+ } catch (InterruptedException e) {
}
- return true;
+ return false;
}
public int getPendingCount(ResultCallback owner) {
- OwnerState state = activeOwners.get(owner);
- return (state != null) ? state.getNumPending() : 0;
+ Integer count = activeOwners.get(owner);
+ if (count == null) {
+ return 0;
+ }
+ return count;
}
/**
@@ -107,12 +125,7 @@ public class SharedSender implements ReplyHandler {
* @param owner the file to check for pending documents
*/
public void waitForPending(ResultCallback owner) {
- OwnerState state = activeOwners.get(owner);
- if (state != null) {
- try {
- state.waitPending();
- } catch (InterruptedException e) { }
- }
+ waitForPending(owner, -1);
}
/**
@@ -140,17 +153,31 @@ public class SharedSender implements ReplyHandler {
return;
}
- OwnerState state = activeOwners.get(owner);
- if (state == null) {
- OwnerState newState = new OwnerState();
- state = activeOwners.putIfAbsent(owner, newState);
+ try {
+ synchronized (monitor) {
+ if (maxPendingPerOwner != -1 && blockingQueue) {
+ while (true) {
+ Integer count = activeOwners.get(owner);
+
+ if (count != null && count >= maxPendingPerOwner) {
+ log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies");
+ monitor.wait(10000);
+ } else {
+ break;
+ }
+ }
+ }
- }
- if (state != null) {
- if (maxPendingPerOwner != -1 && blockingQueue) {
- state.waitMaxPendingBelow(maxPendingPerOwner);
+ Integer count = activeOwners.get(owner);
+
+ if (count == null) {
+ activeOwners.put(owner, 1);
+ } else {
+ activeOwners.put(owner, count + 1);
+ }
}
- state.addPending(1);
+ } catch (InterruptedException e) {
+ return;
}
msg.setContext(owner);
@@ -176,102 +203,30 @@ public class SharedSender implements ReplyHandler {
*/
@Override
public void handleReply(Reply r) {
- ResultCallback owner = (ResultCallback) r.getContext();
- if (owner == null) {
- log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
- return;
- }
+ synchronized (monitor) {
+ ResultCallback owner = (ResultCallback) r.getContext();
- metrics.addReply(r);
- OwnerState state = activeOwners.get(owner);
+ if (owner != null) {
+ metrics.addReply(r);
- if (state == null) {
- // TODO: should be debug level if at all
- log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active");
- return;
- }
+ Integer count = activeOwners.get(owner);
- int numPending = state.getNumPending() - 1;
- boolean noMorePending = state.decPending(1);
- if (noMorePending) {
- numPending = 0;
- }
- boolean active = owner.handleReply(r, numPending);
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending());
- }
- if (!active) {
- state.clearPending();
- activeOwners.remove(owner);
- }
- }
+ if (count != null) {
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count);
+ }
- private static final class Sync extends AbstractQueuedSynchronizer {
- Sync(int initialCount) {
- setState(initialCount);
- }
-
- int getCount() {
- return getState();
- }
-
- @Override
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
-
- @Override
- protected boolean tryReleaseShared(int releases) {
- // Increment/Decrement count; signal when transition downwards to zero.
- // releases == 0 means unblock all
- while ( true ) {
- int c = getState();
- if ((c == 0) && (releases >= 0)) { return false; }
- int nextc = (c > releases) ? ((releases != 0) ? c - releases : 0) : 0;
- if (compareAndSetState(c, nextc)) {
- return nextc == 0;
+ if ( ! owner.handleReply(r, count - 1)) {
+ activeOwners.remove(owner);
+ } else {
+ activeOwners.put(owner, count - 1);
+ }
}
+ } else {
+ log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
}
- }
- }
-
- private static final class OwnerState {
-
- private static final long REACT_LATENCY_ON_WATERMARK_MS = 5;
-
- private final Sync sync = new Sync(1);
-
- void addPending(int count) {
- sync.releaseShared(-count);
- }
-
- boolean decPending(int count) {
- return sync.releaseShared(count);
- }
-
- void waitMaxPendingBelow(int limit) {
- try {
- while (getNumPending() > limit) {
- sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(REACT_LATENCY_ON_WATERMARK_MS));
- }
- } catch (InterruptedException e) {
- }
- }
-
- int getNumPending() {
- return sync.getCount();
- }
-
- void clearPending() {
- sync.releaseShared(0);
- }
-
- boolean waitPending(long timeoutMS) throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(timeoutMS));
- }
- void waitPending() throws InterruptedException {
- sync.tryAcquireShared(1);
+ monitor.notifyAll();
}
}