summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2022-01-21 13:47:11 +0100
committerGitHub <noreply@github.com>2022-01-21 13:47:11 +0100
commit645a093e74c383d1632681e78b0d8f179dcc763e (patch)
tree06a8a1cf5367fc418bf66edf0d3da92d2177d53a
parentef75f25c9b7e2abbbc6d14fff7b9e5f640149551 (diff)
parenta43a77e6c48daaffd796afa5c6ec51b6289bfed8 (diff)
Merge pull request #20899 from vespa-engine/bjorncs/mbus
Don't retry after resender is destroyed
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java25
1 files changed, 15 insertions, 10 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java
index 753f8e28e36..5aa4296885b 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java
@@ -20,8 +20,10 @@ import java.util.List;
*/
public class Resender {
+ private final Object monitor = new Object();
private final PriorityQueue<Entry> queue = new PriorityQueue<>();
private final RetryPolicy retryPolicy;
+ private boolean destroyed = false;
/**
* Constructs a new resender.
@@ -59,7 +61,9 @@ public class Resender {
return false;
}
}
- return true;
+ synchronized (monitor) {
+ return !destroyed;
+ }
}
/**
@@ -83,13 +87,13 @@ public class Resender {
node.addError(ErrorCode.TIMEOUT, "Timeout exceeded by resender, giving up.");
return false;
}
- node.prepareForRetry(); // consumes the reply
- node.getTrace().trace(TraceLevel.COMPONENT,
- "Message scheduled for retry " + retry + " in " + delay + " seconds.");
- msg.setRetry(retry);
- Entry entry = new Entry(node, SystemTimer.INSTANCE.milliTime() + (long) (delay * 1000));
- synchronized (queue) {
- queue.add(entry);
+ synchronized (monitor) {
+ if (destroyed) return false;
+ node.prepareForRetry(); // consumes the reply
+ node.getTrace().trace(TraceLevel.COMPONENT,
+ "Message scheduled for retry " + retry + " in " + delay + " seconds.");
+ msg.setRetry(retry);
+ queue.add(new Entry(node, SystemTimer.INSTANCE.milliTime() + (long) (delay * 1000)));
}
return true;
}
@@ -101,7 +105,7 @@ public class Resender {
List<RoutingNode> sendList;
long now = SystemTimer.INSTANCE.milliTime();
- synchronized (queue) {
+ synchronized (monitor) {
if (queue.isEmpty()) return;
sendList = new LinkedList<>();
while (!queue.isEmpty() && queue.peek().time <= now) {
@@ -119,10 +123,11 @@ public class Resender {
* Discards all the routing nodes currently scheduled for resending.
*/
public void destroy() {
- synchronized (queue) {
+ synchronized (monitor) {
while (!queue.isEmpty()) {
queue.poll().node.discard();
}
+ destroyed = true;
}
}