diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2022-01-21 13:47:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-21 13:47:11 +0100 |
commit | 645a093e74c383d1632681e78b0d8f179dcc763e (patch) | |
tree | 06a8a1cf5367fc418bf66edf0d3da92d2177d53a | |
parent | ef75f25c9b7e2abbbc6d14fff7b9e5f640149551 (diff) | |
parent | a43a77e6c48daaffd796afa5c6ec51b6289bfed8 (diff) |
Merge pull request #20899 from vespa-engine/bjorncs/mbus
Don't retry after resender is destroyed
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java | 25 |
1 files changed, 15 insertions, 10 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java index 753f8e28e36..5aa4296885b 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java @@ -20,8 +20,10 @@ import java.util.List; */ public class Resender { + private final Object monitor = new Object(); private final PriorityQueue<Entry> queue = new PriorityQueue<>(); private final RetryPolicy retryPolicy; + private boolean destroyed = false; /** * Constructs a new resender. @@ -59,7 +61,9 @@ public class Resender { return false; } } - return true; + synchronized (monitor) { + return !destroyed; + } } /** @@ -83,13 +87,13 @@ public class Resender { node.addError(ErrorCode.TIMEOUT, "Timeout exceeded by resender, giving up."); return false; } - node.prepareForRetry(); // consumes the reply - node.getTrace().trace(TraceLevel.COMPONENT, - "Message scheduled for retry " + retry + " in " + delay + " seconds."); - msg.setRetry(retry); - Entry entry = new Entry(node, SystemTimer.INSTANCE.milliTime() + (long) (delay * 1000)); - synchronized (queue) { - queue.add(entry); + synchronized (monitor) { + if (destroyed) return false; + node.prepareForRetry(); // consumes the reply + node.getTrace().trace(TraceLevel.COMPONENT, + "Message scheduled for retry " + retry + " in " + delay + " seconds."); + msg.setRetry(retry); + queue.add(new Entry(node, SystemTimer.INSTANCE.milliTime() + (long) (delay * 1000))); } return true; } @@ -101,7 +105,7 @@ public class Resender { List<RoutingNode> sendList; long now = SystemTimer.INSTANCE.milliTime(); - synchronized (queue) { + synchronized (monitor) { if (queue.isEmpty()) return; sendList = new LinkedList<>(); while (!queue.isEmpty() && queue.peek().time <= now) { @@ -119,10 +123,11 @@ public class Resender { * Discards all the routing nodes currently scheduled for resending. */ public void destroy() { - synchronized (queue) { + synchronized (monitor) { while (!queue.isEmpty()) { queue.poll().node.discard(); } + destroyed = true; } } |