aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus')
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java37
1 files changed, 23 insertions, 14 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java
index 2813498babc..49419052f8f 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/Resender.java
@@ -2,7 +2,10 @@
package com.yahoo.messagebus.routing;
import com.yahoo.concurrent.SystemTimer;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.TraceLevel;
import java.util.PriorityQueue;
import java.util.LinkedList;
@@ -17,7 +20,7 @@ import java.util.List;
*/
public class Resender {
- private final PriorityQueue<Entry> queue = new PriorityQueue<Entry>();
+ private final PriorityQueue<Entry> queue = new PriorityQueue<>();
private final RetryPolicy retryPolicy;
/**
@@ -46,7 +49,7 @@ public class Resender {
* @param reply The reply to check.
* @return True if retry is required.
*/
- public boolean shouldRetry(Reply reply) {
+ boolean shouldRetry(Reply reply) {
int numErrors = reply.getNumErrors();
if (numErrors == 0) {
return false;
@@ -66,7 +69,7 @@ public class Resender {
* @param node The node to resend.
* @return True if the node was queued.
*/
- public boolean scheduleRetry(RoutingNode node) {
+ boolean scheduleRetry(RoutingNode node) {
Message msg = node.getMessage();
if (!msg.getRetryEnabled()) {
return false;
@@ -84,7 +87,9 @@ public class Resender {
node.getTrace().trace(TraceLevel.COMPONENT,
"Message scheduled for retry " + retry + " in " + delay + " seconds.");
msg.setRetry(retry);
- queue.add(new Entry(node, SystemTimer.INSTANCE.milliTime() + (long)(delay * 1000)));
+ synchronized (queue) {
+ queue.add(new Entry(node, SystemTimer.INSTANCE.milliTime() + (long) (delay * 1000)));
+ }
return true;
}
@@ -92,12 +97,14 @@ public class Resender {
* Invokes {@link RoutingNode#send()} on all routing nodes that are applicable for sending at the current time.
*/
public void resendScheduled() {
- if (queue.isEmpty()) return;
-
- List<RoutingNode> sendList = new LinkedList<RoutingNode>();
- long now = SystemTimer.INSTANCE.milliTime();
- while (!queue.isEmpty() && queue.peek().time <= now) {
- sendList.add(queue.poll().node);
+ List<RoutingNode> sendList;
+ synchronized (queue) {
+ if (queue.isEmpty()) return;
+ sendList = new LinkedList<>();
+ long now = SystemTimer.INSTANCE.milliTime();
+ while (!queue.isEmpty() && queue.peek().time <= now) {
+ sendList.add(queue.poll().node);
+ }
}
for (RoutingNode node : sendList) {
@@ -110,8 +117,10 @@ public class Resender {
* Discards all the routing nodes currently scheduled for resending.
*/
public void destroy() {
- while (!queue.isEmpty()) {
- queue.poll().node.discard();
+ synchronized (queue) {
+ while (!queue.isEmpty()) {
+ queue.poll().node.discard();
+ }
}
}
@@ -130,7 +139,7 @@ public class Resender {
* @param node The routing node being scheduled.
* @param time The time of this schedule.
*/
- public Entry(RoutingNode node, long time) {
+ Entry(RoutingNode node, long time) {
this.node = node;
this.time = time;
}