diff options
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java | 43 |
1 files changed, 22 insertions, 21 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 1cf1b46dac8..67581438956 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -148,7 +148,7 @@ public class SharedSender implements ReplyHandler { } if (state != null) { if (maxPendingPerOwner != -1 && blockingQueue) { - state.waitMaxPendingbelow(maxPendingPerOwner); + state.waitMaxPendingBelow(maxPendingPerOwner); } state.addPending(1); } @@ -177,28 +177,29 @@ public class SharedSender implements ReplyHandler { @Override public void handleReply(Reply r) { ResultCallback owner = (ResultCallback) r.getContext(); + if (owner == null) { + log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + return; + } - if (owner != null) { - metrics.addReply(r); - OwnerState state = activeOwners.get(owner); + metrics.addReply(r); + OwnerState state = activeOwners.get(owner); - if (state != null) { - boolean active = owner.handleReply(r, state.getNumPending() - 1); - if (log.isLoggable(LogLevel.SPAM)) { - log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending()); - } - if (!active) { - state.clearPending(); - activeOwners.remove(owner); - } else { - state.decPending(1); - } - } else { - // TODO: should be debug level if at all. - log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active"); - } + if (state == null) { + // TODO: should be debug level if at all + log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active"); + return; + } + + boolean active = owner.handleReply(r, state.getNumPending() - 1); + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending()); + } + if (!active) { + state.clearPending(); + activeOwners.remove(owner); } else { - log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + state.decPending(1); } } @@ -244,7 +245,7 @@ public class SharedSender implements ReplyHandler { sync.releaseShared(count); } - void waitMaxPendingbelow(int limit) { + void waitMaxPendingBelow(int limit) { try { while (getNumPending() > limit) { sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(REACT_LATENCY_ON_WATERMARK_MS)); |