summaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-09 17:26:13 +0100
committerGitHub <noreply@github.com>2017-01-09 17:26:13 +0100
commitcc161c73c27a5c9e7ddad25102dd1a64910cabdf (patch)
tree33c3eb6a868c1ee4518c8add342457cfccf1c1a0 /vespaclient-core/src
parentb99468d847b444a3ad7f4aeba0f2eac1906c74cb (diff)
parente3f1192b39150b70372b508032dc1204b747eaf6 (diff)
Merge pull request #1457 from yahoo/revert-1456-revert-1453-revert-1452-revert-1451-revert-1450-revert-1449-revert-1448-revert-1447-balder/avoid-costly-notifyall-in-sharedsender
Revert "Revert "Revert "Revert "Revert "Revert "Revert "Revert "Balder/avoid costly notifyall in sharedsender""""""""
Diffstat (limited to 'vespaclient-core/src')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java199
1 files changed, 122 insertions, 77 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 48ff0eae36b..08243c4e0ec 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -6,9 +6,9 @@ import com.yahoo.jdisc.Metric;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.*;
import com.yahoo.clientmetrics.RouteMetricSet;
-
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Logger;
/**
@@ -23,11 +23,9 @@ public class SharedSender implements ReplyHandler {
public static final Logger log = Logger.getLogger(SharedSender.class.getName());
private SendSession sender;
- private final Object monitor = new Object();
private RouteMetricSet metrics;
- // Maps from filename to number of pending requests
- private Map<ResultCallback, Integer> activeOwners = new HashMap<>();
+ private ConcurrentHashMap<ResultCallback, OwnerState> activeOwners = new ConcurrentHashMap<>();
/**
* Creates a new shared sender.
@@ -50,17 +48,16 @@ public class SharedSender implements ReplyHandler {
}
public void remove(ResultCallback owner) {
- synchronized (monitor) {
- activeOwners.remove(owner);
+ OwnerState state = activeOwners.remove(owner);
+ if (state != null) {
+ state.clearPending();
}
}
public void shutdown() {
try {
- synchronized (monitor) {
- while ( ! activeOwners.isEmpty()) {
- monitor.wait(180 * 1000);
- }
+ while ( ! activeOwners.isEmpty()) {
+ Thread.sleep(10);
}
} catch (InterruptedException e) {
}
@@ -76,36 +73,21 @@ public class SharedSender implements ReplyHandler {
* @return true if there were no more pending, or false if the timeout expired.
*/
public boolean waitForPending(ResultCallback owner, long timeoutMs) {
- long timeStart = SystemTimer.INSTANCE.milliTime();
- long timeLeft = timeoutMs;
-
- try {
- while (timeoutMs == -1 || timeLeft > 0) {
- synchronized (monitor) {
- Integer count = activeOwners.get(owner);
- if (count == null || count == 0) {
- return true;
- } else if (timeLeft > 0) {
- monitor.wait(timeLeft);
- } else {
- monitor.wait();
- }
- }
-
- timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart);
+ OwnerState state = activeOwners.get(owner);
+ if (state != null) {
+ try {
+ return state.waitPending(timeoutMs);
+ } catch (InterruptedException e) {
+ return false;
}
- } catch (InterruptedException e) {
}
- return false;
+ return true;
}
public int getPendingCount(ResultCallback owner) {
- Integer count = activeOwners.get(owner);
- if (count == null) {
- return 0;
- }
- return count;
+ OwnerState state = activeOwners.get(owner);
+ return (state != null) ? state.getNumPending() : 0;
}
/**
@@ -125,7 +107,12 @@ public class SharedSender implements ReplyHandler {
* @param owner the file to check for pending documents
*/
public void waitForPending(ResultCallback owner) {
- waitForPending(owner, -1);
+ OwnerState state = activeOwners.get(owner);
+ if (state != null) {
+ try {
+ state.waitPending();
+ } catch (InterruptedException e) { }
+ }
}
/**
@@ -153,31 +140,17 @@ public class SharedSender implements ReplyHandler {
return;
}
- try {
- synchronized (monitor) {
- if (maxPendingPerOwner != -1 && blockingQueue) {
- while (true) {
- Integer count = activeOwners.get(owner);
-
- if (count != null && count >= maxPendingPerOwner) {
- log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies");
- monitor.wait(10000);
- } else {
- break;
- }
- }
- }
-
- Integer count = activeOwners.get(owner);
+ OwnerState state = activeOwners.get(owner);
+ if (state == null) {
+ OwnerState newState = new OwnerState();
+ state = activeOwners.putIfAbsent(owner, newState);
- if (count == null) {
- activeOwners.put(owner, 1);
- } else {
- activeOwners.put(owner, count + 1);
- }
+ }
+ if (state != null) {
+ if (maxPendingPerOwner != -1 && blockingQueue) {
+ state.waitMaxPendingBelow(maxPendingPerOwner);
}
- } catch (InterruptedException e) {
- return;
+ state.addPending(1);
}
msg.setContext(owner);
@@ -203,30 +176,102 @@ public class SharedSender implements ReplyHandler {
*/
@Override
public void handleReply(Reply r) {
- synchronized (monitor) {
- ResultCallback owner = (ResultCallback) r.getContext();
+ ResultCallback owner = (ResultCallback) r.getContext();
+ if (owner == null) {
+ log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ return;
+ }
- if (owner != null) {
- metrics.addReply(r);
+ metrics.addReply(r);
+ OwnerState state = activeOwners.get(owner);
- Integer count = activeOwners.get(owner);
+ if (state == null) {
+ // TODO: should be debug level if at all
+ log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active");
+ return;
+ }
- if (count != null) {
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count);
- }
+ int numPending = state.getNumPending() - 1;
+ boolean noMorePending = state.decPending(1);
+ if (noMorePending) {
+ numPending = 0;
+ }
+ boolean active = owner.handleReply(r, numPending);
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending());
+ }
+ if (!active) {
+ state.clearPending();
+ activeOwners.remove(owner);
+ }
+ }
- if ( ! owner.handleReply(r, count - 1)) {
- activeOwners.remove(owner);
- } else {
- activeOwners.put(owner, count - 1);
- }
+ private static final class Sync extends AbstractQueuedSynchronizer {
+ Sync(int initialCount) {
+ setState(initialCount);
+ }
+
+ int getCount() {
+ return getState();
+ }
+
+ @Override
+ protected int tryAcquireShared(int acquires) {
+ return (getState() == 0) ? 1 : -1;
+ }
+
+ @Override
+ protected boolean tryReleaseShared(int releases) {
+ // Increment/Decrement count; signal when transition downwards to zero.
+ // releases == 0 means unblock all
+ while ( true ) {
+ int c = getState();
+ if ((c == 0) && (releases >= 0)) { return false; }
+ int nextc = (c > releases) ? ((releases != 0) ? c - releases : 0) : 0;
+ if (compareAndSetState(c, nextc)) {
+ return nextc == 0;
}
- } else {
- log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
}
+ }
+ }
+
+ private static final class OwnerState {
+
+ private static final long REACT_LATENCY_ON_WATERMARK_MS = 5;
+
+ private final Sync sync = new Sync(1);
+
+ void addPending(int count) {
+ sync.releaseShared(-count);
+ }
+
+ boolean decPending(int count) {
+ return sync.releaseShared(count);
+ }
+
+ void waitMaxPendingBelow(int limit) {
+ try {
+ while (getNumPending() > limit) {
+ sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(REACT_LATENCY_ON_WATERMARK_MS));
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
+ int getNumPending() {
+ return sync.getCount();
+ }
+
+ void clearPending() {
+ sync.releaseShared(0);
+ }
+
+ boolean waitPending(long timeoutMS) throws InterruptedException {
+ return sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(timeoutMS));
+ }
- monitor.notifyAll();
+ void waitPending() throws InterruptedException {
+ sync.tryAcquireShared(1);
}
}