diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-19 15:34:59 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-19 15:34:59 +0200 |
commit | bdbd8194ea11cd1f2c4bf794c9c94ab8fdccfd14 (patch) | |
tree | c74c421df5c26c7b10513d4c7e360644ea5e5e5f /messagebus | |
parent | d4c9474d7e5c9f681a6cc0dbddf5e94fc48525d6 (diff) |
To avoid replies being affected by send use 2 threads.
Diffstat (limited to 'messagebus')
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/Messenger.java | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java index 6ef5e9adca7..34a640f3f83 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.log.LogLevel; import java.util.ArrayDeque; @@ -8,6 +9,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -23,7 +27,9 @@ public class Messenger implements Runnable { private static final Logger log = Logger.getLogger(Messenger.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final List<Task> children = new ArrayList<>(); + private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send")); private final Queue<Task> queue = new ArrayDeque<>(); + private final Thread thread = new Thread(this, "Messenger"); public Messenger() { @@ -61,7 +67,15 @@ public class Messenger implements Runnable { * @param handler The handler to send to. */ public void deliverMessage(final Message msg, final MessageHandler handler) { - enqueue(new MessageTask(msg, handler)); + if (destroyed.get()) { + msg.discard(); + } + try { + sendExecutor.execute(new MessageTask(msg, handler)); + } catch (RejectedExecutionException e) { + msg.discard(); + log.warning("Execution rejected " + e.getMessage()); + } } /** @@ -121,6 +135,7 @@ public class Messenger implements Runnable { boolean done = false; enqueue(Terminate.INSTANCE); if (!destroyed.getAndSet(true)) { + sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();}); try { synchronized (this) { while (!queue.isEmpty()) { @@ -203,7 +218,7 @@ public class Messenger implements Runnable { public void destroy(); } - private static class MessageTask implements Task { + private static class MessageTask implements Runnable { final MessageHandler handler; Message msg; @@ -219,13 +234,6 @@ public class Messenger implements Runnable { this.msg = null; handler.handleMessage(msg); } - - @Override - public void destroy() { - if (msg != null) { - msg.discard(); - } - } } private static class ReplyTask implements Task { |