diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2022-01-21 13:39:31 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2022-01-21 13:40:17 +0100 |
commit | a43a77e6c48daaffd796afa5c6ec51b6289bfed8 (patch) | |
tree | e995636ecdc621305fd72b98d526e580eaca4ca7 /messagebus/src/main | |
parent | d61ce48c55f76080069974e3fcbe79e1757c0254 (diff) |
Don't retry after resender is destroyed
Diffstat (limited to 'messagebus/src/main')
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java | 25 |
1 files changed, 15 insertions, 10 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java index 753f8e28e36..5aa4296885b 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java @@ -20,8 +20,10 @@ import java.util.List; */ public class Resender { + private final Object monitor = new Object(); private final PriorityQueue<Entry> queue = new PriorityQueue<>(); private final RetryPolicy retryPolicy; + private boolean destroyed = false; /** * Constructs a new resender. @@ -59,7 +61,9 @@ public class Resender { return false; } } - return true; + synchronized (monitor) { + return !destroyed; + } } /** @@ -83,13 +87,13 @@ public class Resender { node.addError(ErrorCode.TIMEOUT, "Timeout exceeded by resender, giving up."); return false; } - node.prepareForRetry(); // consumes the reply - node.getTrace().trace(TraceLevel.COMPONENT, - "Message scheduled for retry " + retry + " in " + delay + " seconds."); - msg.setRetry(retry); - Entry entry = new Entry(node, SystemTimer.INSTANCE.milliTime() + (long) (delay * 1000)); - synchronized (queue) { - queue.add(entry); + synchronized (monitor) { + if (destroyed) return false; + node.prepareForRetry(); // consumes the reply + node.getTrace().trace(TraceLevel.COMPONENT, + "Message scheduled for retry " + retry + " in " + delay + " seconds."); + msg.setRetry(retry); + queue.add(new Entry(node, SystemTimer.INSTANCE.milliTime() + (long) (delay * 1000))); } return true; } @@ -101,7 +105,7 @@ public class Resender { List<RoutingNode> sendList; long now = SystemTimer.INSTANCE.milliTime(); - synchronized (queue) { + synchronized (monitor) { if (queue.isEmpty()) return; sendList = new LinkedList<>(); while (!queue.isEmpty() && queue.peek().time <= now) { @@ -119,10 +123,11 @@ public class Resender { * Discards all the routing nodes currently scheduled for resending. */ public void destroy() { - synchronized (queue) { + synchronized (monitor) { while (!queue.isEmpty()) { queue.poll().node.discard(); } + destroyed = true; } } |