diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-02-19 13:57:41 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-02-19 13:57:41 +0100 |
commit | 3f2fbe171532a5f84821dc94f3d30800d981ecc7 (patch) | |
tree | 1c1d66265e58cbabe66b52b3e9de6a4d1b02d0cb /messagebus | |
parent | f1efffa53192374ad37d80b0aa81db1298d6234d (diff) |
Avoid race conditions around the resender Q.
Diffstat (limited to 'messagebus')
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java | 37 |
1 files changed, 23 insertions, 14 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java index 2813498babc..49419052f8f 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java @@ -2,7 +2,10 @@ package com.yahoo.messagebus.routing; import com.yahoo.concurrent.SystemTimer; -import com.yahoo.messagebus.*; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.TraceLevel; import java.util.PriorityQueue; import java.util.LinkedList; @@ -17,7 +20,7 @@ import java.util.List; */ public class Resender { - private final PriorityQueue<Entry> queue = new PriorityQueue<Entry>(); + private final PriorityQueue<Entry> queue = new PriorityQueue<>(); private final RetryPolicy retryPolicy; /** @@ -46,7 +49,7 @@ public class Resender { * @param reply The reply to check. * @return True if retry is required. */ - public boolean shouldRetry(Reply reply) { + boolean shouldRetry(Reply reply) { int numErrors = reply.getNumErrors(); if (numErrors == 0) { return false; @@ -66,7 +69,7 @@ public class Resender { * @param node The node to resend. * @return True if the node was queued. */ - public boolean scheduleRetry(RoutingNode node) { + boolean scheduleRetry(RoutingNode node) { Message msg = node.getMessage(); if (!msg.getRetryEnabled()) { return false; @@ -84,7 +87,9 @@ public class Resender { node.getTrace().trace(TraceLevel.COMPONENT, "Message scheduled for retry " + retry + " in " + delay + " seconds."); msg.setRetry(retry); - queue.add(new Entry(node, SystemTimer.INSTANCE.milliTime() + (long)(delay * 1000))); + synchronized (queue) { + queue.add(new Entry(node, SystemTimer.INSTANCE.milliTime() + (long) (delay * 1000))); + } return true; } @@ -92,12 +97,14 @@ public class Resender { * Invokes {@link RoutingNode#send()} on all routing nodes that are applicable for sending at the current time. */ public void resendScheduled() { - if (queue.isEmpty()) return; - - List<RoutingNode> sendList = new LinkedList<RoutingNode>(); - long now = SystemTimer.INSTANCE.milliTime(); - while (!queue.isEmpty() && queue.peek().time <= now) { - sendList.add(queue.poll().node); + List<RoutingNode> sendList; + synchronized (queue) { + if (queue.isEmpty()) return; + sendList = new LinkedList<>(); + long now = SystemTimer.INSTANCE.milliTime(); + while (!queue.isEmpty() && queue.peek().time <= now) { + sendList.add(queue.poll().node); + } } for (RoutingNode node : sendList) { @@ -110,8 +117,10 @@ public class Resender { * Discards all the routing nodes currently scheduled for resending. */ public void destroy() { - while (!queue.isEmpty()) { - queue.poll().node.discard(); + synchronized (queue) { + while (!queue.isEmpty()) { + queue.poll().node.discard(); + } } } @@ -130,7 +139,7 @@ public class Resender { * @param node The routing node being scheduled. * @param time The time of this schedule. */ - public Entry(RoutingNode node, long time) { + Entry(RoutingNode node, long time) { this.node = node; this.time = time; } |