summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-07 19:01:23 +0100
committerGitHub <noreply@github.com>2017-01-07 19:01:23 +0100
commit920f825d169bc05ff00ad7f2661305589cf70994 (patch)
tree146b648777bca6e21b66e997c778fd6e9cbf6b22 /vespaclient-core
parent9054e16f0233278f95bf22589e7e426563b786ac (diff)
Revert "Revert "Revert "Revert "Balder/avoid costly notifyall in sharedsender""""
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java194
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java4
2 files changed, 117 insertions, 81 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 48ff0eae36b..b0e52357a64 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -6,9 +6,8 @@ import com.yahoo.jdisc.Metric;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.*;
import com.yahoo.clientmetrics.RouteMetricSet;
-
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
/**
@@ -26,8 +25,7 @@ public class SharedSender implements ReplyHandler {
private final Object monitor = new Object();
private RouteMetricSet metrics;
- // Maps from filename to number of pending requests
- private Map<ResultCallback, Integer> activeOwners = new HashMap<>();
+ private ConcurrentHashMap<ResultCallback, OwnerState> activeOwners = new ConcurrentHashMap<>();
/**
* Creates a new shared sender.
@@ -50,17 +48,13 @@ public class SharedSender implements ReplyHandler {
}
public void remove(ResultCallback owner) {
- synchronized (monitor) {
- activeOwners.remove(owner);
- }
+ activeOwners.remove(owner);
}
public void shutdown() {
try {
- synchronized (monitor) {
- while ( ! activeOwners.isEmpty()) {
- monitor.wait(180 * 1000);
- }
+ while ( ! activeOwners.isEmpty()) {
+ Thread.sleep(10);
}
} catch (InterruptedException e) {
}
@@ -76,36 +70,29 @@ public class SharedSender implements ReplyHandler {
* @return true if there were no more pending, or false if the timeout expired.
*/
public boolean waitForPending(ResultCallback owner, long timeoutMs) {
- long timeStart = SystemTimer.INSTANCE.milliTime();
- long timeLeft = timeoutMs;
-
- try {
- while (timeoutMs == -1 || timeLeft > 0) {
- synchronized (monitor) {
- Integer count = activeOwners.get(owner);
- if (count == null || count == 0) {
- return true;
- } else if (timeLeft > 0) {
- monitor.wait(timeLeft);
- } else {
- monitor.wait();
- }
- }
-
- timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart);
+ OwnerState state = activeOwners.get(owner);
+ if (state != null) {
+ try {
+ return state.waitPending(timeoutMs);
+ } catch (InterruptedException e) {
+ return false;
}
- } catch (InterruptedException e) {
}
- return false;
+ return true;
}
- public int getPendingCount(ResultCallback owner) {
- Integer count = activeOwners.get(owner);
- if (count == null) {
- return 0;
+ private OwnerState getNonNullState(ResultCallback owner) {
+ OwnerState state = activeOwners.get(owner);
+ if (state == null) {
+ throw new IllegalStateException("No active callback : " + owner.toString());
}
- return count;
+ return state;
+ }
+
+ public int getPendingCount(ResultCallback owner) {
+ OwnerState state = activeOwners.get(owner);
+ return (state != null) ? state.getNumPending() : 0;
}
/**
@@ -125,7 +112,12 @@ public class SharedSender implements ReplyHandler {
* @param owner the file to check for pending documents
*/
public void waitForPending(ResultCallback owner) {
- waitForPending(owner, -1);
+ OwnerState state = activeOwners.get(owner);
+ if (state != null) {
+ try {
+ state.waitPending();
+ } catch (InterruptedException e) { }
+ }
}
/**
@@ -153,33 +145,16 @@ public class SharedSender implements ReplyHandler {
return;
}
- try {
- synchronized (monitor) {
- if (maxPendingPerOwner != -1 && blockingQueue) {
- while (true) {
- Integer count = activeOwners.get(owner);
-
- if (count != null && count >= maxPendingPerOwner) {
- log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies");
- monitor.wait(10000);
- } else {
- break;
- }
- }
- }
-
- Integer count = activeOwners.get(owner);
-
- if (count == null) {
- activeOwners.put(owner, 1);
- } else {
- activeOwners.put(owner, count + 1);
- }
- }
- } catch (InterruptedException e) {
- return;
+ OwnerState state = activeOwners.get(owner);
+ if (state == null) {
+ state = new OwnerState();
+ activeOwners.put(owner, state);
+ }
+ if (maxPendingPerOwner != -1 && blockingQueue) {
+ state.waitMaxPendingbelow(maxPendingPerOwner);
}
+ state.addPending(1);
msg.setContext(owner);
try {
@@ -203,37 +178,98 @@ public class SharedSender implements ReplyHandler {
*/
@Override
public void handleReply(Reply r) {
- synchronized (monitor) {
- ResultCallback owner = (ResultCallback) r.getContext();
+ ResultCallback owner = (ResultCallback) r.getContext();
- if (owner != null) {
- metrics.addReply(r);
+ if (owner != null) {
+ metrics.addReply(r);
+ boolean active = owner.handleReply(r);
+ OwnerState state = activeOwners.get(owner);
- Integer count = activeOwners.get(owner);
+ if (state != null) {
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending());
+ }
+ if (!active) {
+ state.clearPending();
+ activeOwners.remove(owner);
+ } else if ((state.decPending(1) <= 0)) {
+ activeOwners.remove(owner);
+ }
+ } else {
+ // TODO: should be debug level if at all.
+ log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active");
+ }
+ } else {
+ log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ }
+ }
- if (count != null) {
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count);
- }
- if ( ! owner.handleReply(r, count - 1)) {
- activeOwners.remove(owner);
- } else {
- activeOwners.put(owner, count - 1);
+ public static class OwnerState {
+
+ final AtomicInteger numPending = new AtomicInteger(0);
+
+ int addPending(int count) {
+ return numPending.addAndGet(count);
+ }
+
+ int decPending(int count) {
+ int newValue = numPending.addAndGet(-count);
+ if (newValue <= 0) {
+ synchronized (numPending) {
+ numPending.notify();
+ }
+ }
+ return newValue;
+ }
+
+ void waitMaxPendingbelow(int limit) {
+ try {
+ synchronized (numPending) {
+ while (numPending.get() > limit) {
+ numPending.wait(5);
}
}
- } else {
- log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ } catch (InterruptedException e) {
}
+ }
+
+ int getNumPending() {
+ return numPending.get();
+ }
+
+ void clearPending() {
+ numPending.set(0);
+ synchronized (numPending) {
+ numPending.notify();
+ }
+ }
- monitor.notifyAll();
+ boolean waitPending(long timeoutMS) throws InterruptedException {
+ long timeStart = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeoutMS;
+ synchronized (numPending) {
+ while ((numPending.get() > 0) && (timeLeft > 0)) {
+ numPending.wait(timeLeft);
+ timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart);
+ }
+ }
+ return numPending.get() <= 0;
+ }
+
+ void waitPending() throws InterruptedException {
+ synchronized (numPending) {
+ while (numPending.get() > 0) {
+ numPending.wait();
+ }
+ }
}
}
public interface ResultCallback {
/** Return true if we should continue waiting for replies for this sender. */
- boolean handleReply(Reply r, int numPending);
+ boolean handleReply(Reply r);
/**
* Returns true if feeding has been aborted. No more feeding is allowed with this
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
index cdd4ac76c4c..f5d717d35e0 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
@@ -101,7 +101,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
return "";
}
- public boolean handleReply(Reply reply, int numPending) {
+ public boolean handleReply(Reply reply) {
metrics.addReply(reply);
if (reply.getTrace().getLevel() > 0) {
String str = reply.getTrace().toString();
@@ -126,7 +126,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
isAborted = abortOnError;
return !abortOnError;
}
- return numPending > 0;
+ return true;
}
public void done() {