summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-07 19:02:09 +0100
committerGitHub <noreply@github.com>2017-01-07 19:02:09 +0100
commitaff1c35130802ea17946c6e61576e73e40e70f6b (patch)
treee3134b9a2730bcff8579ff1e9e046d9b41fcbdbb /vespaclient-core
parenta24919194a205573200e7a8955f40384fbaedafb (diff)
Revert "Revert "Revert "Revert "Revert "Balder/avoid costly notifyall in sharedsender"""""
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java194
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java4
2 files changed, 81 insertions, 117 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index b0e52357a64..48ff0eae36b 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -6,8 +6,9 @@ import com.yahoo.jdisc.Metric;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.*;
import com.yahoo.clientmetrics.RouteMetricSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.Logger;
/**
@@ -25,7 +26,8 @@ public class SharedSender implements ReplyHandler {
private final Object monitor = new Object();
private RouteMetricSet metrics;
- private ConcurrentHashMap<ResultCallback, OwnerState> activeOwners = new ConcurrentHashMap<>();
+ // Maps from filename to number of pending requests
+ private Map<ResultCallback, Integer> activeOwners = new HashMap<>();
/**
* Creates a new shared sender.
@@ -48,13 +50,17 @@ public class SharedSender implements ReplyHandler {
}
public void remove(ResultCallback owner) {
- activeOwners.remove(owner);
+ synchronized (monitor) {
+ activeOwners.remove(owner);
+ }
}
public void shutdown() {
try {
- while ( ! activeOwners.isEmpty()) {
- Thread.sleep(10);
+ synchronized (monitor) {
+ while ( ! activeOwners.isEmpty()) {
+ monitor.wait(180 * 1000);
+ }
}
} catch (InterruptedException e) {
}
@@ -70,29 +76,36 @@ public class SharedSender implements ReplyHandler {
* @return true if there were no more pending, or false if the timeout expired.
*/
public boolean waitForPending(ResultCallback owner, long timeoutMs) {
- OwnerState state = activeOwners.get(owner);
- if (state != null) {
- try {
- return state.waitPending(timeoutMs);
- } catch (InterruptedException e) {
- return false;
- }
- }
+ long timeStart = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeoutMs;
- return true;
- }
+ try {
+ while (timeoutMs == -1 || timeLeft > 0) {
+ synchronized (monitor) {
+ Integer count = activeOwners.get(owner);
+ if (count == null || count == 0) {
+ return true;
+ } else if (timeLeft > 0) {
+ monitor.wait(timeLeft);
+ } else {
+ monitor.wait();
+ }
+ }
- private OwnerState getNonNullState(ResultCallback owner) {
- OwnerState state = activeOwners.get(owner);
- if (state == null) {
- throw new IllegalStateException("No active callback : " + owner.toString());
+ timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart);
+ }
+ } catch (InterruptedException e) {
}
- return state;
+
+ return false;
}
public int getPendingCount(ResultCallback owner) {
- OwnerState state = activeOwners.get(owner);
- return (state != null) ? state.getNumPending() : 0;
+ Integer count = activeOwners.get(owner);
+ if (count == null) {
+ return 0;
+ }
+ return count;
}
/**
@@ -112,12 +125,7 @@ public class SharedSender implements ReplyHandler {
* @param owner the file to check for pending documents
*/
public void waitForPending(ResultCallback owner) {
- OwnerState state = activeOwners.get(owner);
- if (state != null) {
- try {
- state.waitPending();
- } catch (InterruptedException e) { }
- }
+ waitForPending(owner, -1);
}
/**
@@ -145,16 +153,33 @@ public class SharedSender implements ReplyHandler {
return;
}
- OwnerState state = activeOwners.get(owner);
- if (state == null) {
- state = new OwnerState();
- activeOwners.put(owner, state);
- }
- if (maxPendingPerOwner != -1 && blockingQueue) {
- state.waitMaxPendingbelow(maxPendingPerOwner);
+ try {
+ synchronized (monitor) {
+ if (maxPendingPerOwner != -1 && blockingQueue) {
+ while (true) {
+ Integer count = activeOwners.get(owner);
+
+ if (count != null && count >= maxPendingPerOwner) {
+ log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies");
+ monitor.wait(10000);
+ } else {
+ break;
+ }
+ }
+ }
+
+ Integer count = activeOwners.get(owner);
+
+ if (count == null) {
+ activeOwners.put(owner, 1);
+ } else {
+ activeOwners.put(owner, count + 1);
+ }
+ }
+ } catch (InterruptedException e) {
+ return;
}
- state.addPending(1);
msg.setContext(owner);
try {
@@ -178,98 +203,37 @@ public class SharedSender implements ReplyHandler {
*/
@Override
public void handleReply(Reply r) {
- ResultCallback owner = (ResultCallback) r.getContext();
+ synchronized (monitor) {
+ ResultCallback owner = (ResultCallback) r.getContext();
- if (owner != null) {
- metrics.addReply(r);
- boolean active = owner.handleReply(r);
- OwnerState state = activeOwners.get(owner);
+ if (owner != null) {
+ metrics.addReply(r);
- if (state != null) {
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending());
- }
- if (!active) {
- state.clearPending();
- activeOwners.remove(owner);
- } else if ((state.decPending(1) <= 0)) {
- activeOwners.remove(owner);
- }
- } else {
- // TODO: should be debug level if at all.
- log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active");
- }
- } else {
- log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
- }
- }
-
-
- public static class OwnerState {
-
- final AtomicInteger numPending = new AtomicInteger(0);
-
- int addPending(int count) {
- return numPending.addAndGet(count);
- }
-
- int decPending(int count) {
- int newValue = numPending.addAndGet(-count);
- if (newValue <= 0) {
- synchronized (numPending) {
- numPending.notify();
- }
- }
- return newValue;
- }
+ Integer count = activeOwners.get(owner);
- void waitMaxPendingbelow(int limit) {
- try {
- synchronized (numPending) {
- while (numPending.get() > limit) {
- numPending.wait(5);
+ if (count != null) {
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count);
}
- }
- } catch (InterruptedException e) {
- }
- }
- int getNumPending() {
- return numPending.get();
- }
-
- void clearPending() {
- numPending.set(0);
- synchronized (numPending) {
- numPending.notify();
- }
- }
-
- boolean waitPending(long timeoutMS) throws InterruptedException {
- long timeStart = SystemTimer.INSTANCE.milliTime();
- long timeLeft = timeoutMS;
- synchronized (numPending) {
- while ((numPending.get() > 0) && (timeLeft > 0)) {
- numPending.wait(timeLeft);
- timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart);
+ if ( ! owner.handleReply(r, count - 1)) {
+ activeOwners.remove(owner);
+ } else {
+ activeOwners.put(owner, count - 1);
+ }
}
+ } else {
+ log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
}
- return numPending.get() <= 0;
- }
- void waitPending() throws InterruptedException {
- synchronized (numPending) {
- while (numPending.get() > 0) {
- numPending.wait();
- }
- }
+ monitor.notifyAll();
}
}
public interface ResultCallback {
/** Return true if we should continue waiting for replies for this sender. */
- boolean handleReply(Reply r);
+ boolean handleReply(Reply r, int numPending);
/**
* Returns true if feeding has been aborted. No more feeding is allowed with this
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
index f5d717d35e0..cdd4ac76c4c 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
@@ -101,7 +101,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
return "";
}
- public boolean handleReply(Reply reply) {
+ public boolean handleReply(Reply reply, int numPending) {
metrics.addReply(reply);
if (reply.getTrace().getLevel() > 0) {
String str = reply.getTrace().toString();
@@ -126,7 +126,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
isAborted = abortOnError;
return !abortOnError;
}
- return true;
+ return numPending > 0;
}
public void done() {