summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-08 20:57:30 +0100
committerGitHub <noreply@github.com>2017-01-08 20:57:30 +0100
commit1264780883ef73482d7b386b30a542dd07f10c6e (patch)
tree234a30d62d027917d47cb53e59189707af22d64c /vespaclient-core
parent3cbce5fb9a8100dd931aaeb9ed04ec47d2de9dbd (diff)
Revert "Revert "Revert "Revert "Revert "Revert "Revert "Balder/avoid costly notifyall in sharedsender"""""""
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java192
1 files changed, 78 insertions, 114 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 00f5606b93a..48ff0eae36b 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -6,8 +6,9 @@ import com.yahoo.jdisc.Metric;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.*;
import com.yahoo.clientmetrics.RouteMetricSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.Logger;
/**
@@ -25,7 +26,8 @@ public class SharedSender implements ReplyHandler {
private final Object monitor = new Object();
private RouteMetricSet metrics;
- private ConcurrentHashMap<ResultCallback, OwnerState> activeOwners = new ConcurrentHashMap<>();
+ // Maps from filename to number of pending requests
+ private Map<ResultCallback, Integer> activeOwners = new HashMap<>();
/**
* Creates a new shared sender.
@@ -48,13 +50,17 @@ public class SharedSender implements ReplyHandler {
}
public void remove(ResultCallback owner) {
- activeOwners.remove(owner);
+ synchronized (monitor) {
+ activeOwners.remove(owner);
+ }
}
public void shutdown() {
try {
- while ( ! activeOwners.isEmpty()) {
- Thread.sleep(10);
+ synchronized (monitor) {
+ while ( ! activeOwners.isEmpty()) {
+ monitor.wait(180 * 1000);
+ }
}
} catch (InterruptedException e) {
}
@@ -70,29 +76,36 @@ public class SharedSender implements ReplyHandler {
* @return true if there were no more pending, or false if the timeout expired.
*/
public boolean waitForPending(ResultCallback owner, long timeoutMs) {
- OwnerState state = activeOwners.get(owner);
- if (state != null) {
- try {
- return state.waitPending(timeoutMs);
- } catch (InterruptedException e) {
- return false;
- }
- }
+ long timeStart = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeoutMs;
- return true;
- }
+ try {
+ while (timeoutMs == -1 || timeLeft > 0) {
+ synchronized (monitor) {
+ Integer count = activeOwners.get(owner);
+ if (count == null || count == 0) {
+ return true;
+ } else if (timeLeft > 0) {
+ monitor.wait(timeLeft);
+ } else {
+ monitor.wait();
+ }
+ }
- private OwnerState getNonNullState(ResultCallback owner) {
- OwnerState state = activeOwners.get(owner);
- if (state == null) {
- throw new IllegalStateException("No active callback : " + owner.toString());
+ timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart);
+ }
+ } catch (InterruptedException e) {
}
- return state;
+
+ return false;
}
public int getPendingCount(ResultCallback owner) {
- OwnerState state = activeOwners.get(owner);
- return (state != null) ? state.getNumPending() : 0;
+ Integer count = activeOwners.get(owner);
+ if (count == null) {
+ return 0;
+ }
+ return count;
}
/**
@@ -112,12 +125,7 @@ public class SharedSender implements ReplyHandler {
* @param owner the file to check for pending documents
*/
public void waitForPending(ResultCallback owner) {
- OwnerState state = activeOwners.get(owner);
- if (state != null) {
- try {
- state.waitPending();
- } catch (InterruptedException e) { }
- }
+ waitForPending(owner, -1);
}
/**
@@ -145,16 +153,33 @@ public class SharedSender implements ReplyHandler {
return;
}
- OwnerState state = activeOwners.get(owner);
- if (state == null) {
- state = new OwnerState();
- activeOwners.put(owner, state);
- }
- if (maxPendingPerOwner != -1 && blockingQueue) {
- state.waitMaxPendingbelow(maxPendingPerOwner);
+ try {
+ synchronized (monitor) {
+ if (maxPendingPerOwner != -1 && blockingQueue) {
+ while (true) {
+ Integer count = activeOwners.get(owner);
+
+ if (count != null && count >= maxPendingPerOwner) {
+ log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies");
+ monitor.wait(10000);
+ } else {
+ break;
+ }
+ }
+ }
+
+ Integer count = activeOwners.get(owner);
+
+ if (count == null) {
+ activeOwners.put(owner, 1);
+ } else {
+ activeOwners.put(owner, count + 1);
+ }
+ }
+ } catch (InterruptedException e) {
+ return;
}
- state.addPending(1);
msg.setContext(owner);
try {
@@ -178,91 +203,30 @@ public class SharedSender implements ReplyHandler {
*/
@Override
public void handleReply(Reply r) {
- ResultCallback owner = (ResultCallback) r.getContext();
+ synchronized (monitor) {
+ ResultCallback owner = (ResultCallback) r.getContext();
- if (owner != null) {
- metrics.addReply(r);
- OwnerState state = activeOwners.get(owner);
- boolean active = owner.handleReply(r, state.getNumPending() - 1);
+ if (owner != null) {
+ metrics.addReply(r);
- if (state != null) {
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending());
- }
- if (!active) {
- state.clearPending();
- activeOwners.remove(owner);
- } else {
- state.decPending(1);
- }
- } else {
- // TODO: should be debug level if at all.
- log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active");
- }
- } else {
- log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
- }
- }
-
-
- public static class OwnerState {
-
- final AtomicInteger numPending = new AtomicInteger(0);
-
- int addPending(int count) {
- return numPending.addAndGet(count);
- }
-
- int decPending(int count) {
- int newValue = numPending.addAndGet(-count);
- if (newValue <= 0) {
- synchronized (numPending) {
- numPending.notify();
- }
- }
- return newValue;
- }
+ Integer count = activeOwners.get(owner);
- void waitMaxPendingbelow(int limit) {
- try {
- synchronized (numPending) {
- while (numPending.get() > limit) {
- numPending.wait(5);
+ if (count != null) {
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count);
}
- }
- } catch (InterruptedException e) {
- }
- }
- int getNumPending() {
- return numPending.get();
- }
-
- void clearPending() {
- numPending.set(0);
- synchronized (numPending) {
- numPending.notify();
- }
- }
-
- boolean waitPending(long timeoutMS) throws InterruptedException {
- long timeStart = SystemTimer.INSTANCE.milliTime();
- long timeLeft = timeoutMS;
- synchronized (numPending) {
- while ((numPending.get() > 0) && (timeLeft > 0)) {
- numPending.wait(timeLeft);
- timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart);
+ if ( ! owner.handleReply(r, count - 1)) {
+ activeOwners.remove(owner);
+ } else {
+ activeOwners.put(owner, count - 1);
+ }
}
+ } else {
+ log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
}
- return numPending.get() <= 0;
- }
- void waitPending() throws InterruptedException {
- synchronized (numPending) {
- while (numPending.get() > 0) {
- numPending.wait();
- }
- }
+ monitor.notifyAll();
}
}