diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-13 22:52:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-13 22:52:53 +0200 |
commit | eaabafa432ee58eceb1a45242fb7e3a4e6662d59 (patch) | |
tree | e606fb153bb433be2bfc57a98ed1a214765f0094 /messagebus | |
parent | 4c03e2dca94f391373dfae2aa2af7bd2aa748b29 (diff) |
Revert "Revert "Revert "Balder/send in own thread"""
Diffstat (limited to 'messagebus')
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/Messenger.java | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java index 2cb2f0967e5..63e5dbb2d04 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.log.LogLevel; import java.util.ArrayDeque; @@ -8,6 +9,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -23,6 +27,7 @@ public class Messenger implements Runnable { private static final Logger log = Logger.getLogger(Messenger.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final List<Task> children = new ArrayList<>(); + private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send")); private final Queue<Task> queue = new ArrayDeque<>(); private final Thread thread = new Thread(this, "Messenger"); @@ -64,8 +69,13 @@ public class Messenger implements Runnable { public void deliverMessage(final Message msg, final MessageHandler handler) { if (destroyed.get()) { msg.discard(); - } else { - handler.handleMessage(msg); + return; + } + try { + sendExecutor.execute(new MessageTask(msg, handler)); + } catch (RejectedExecutionException e) { + msg.discard(); + log.warning("Execution rejected " + e.getMessage()); } } @@ -126,6 +136,7 @@ public class Messenger implements Runnable { boolean done = false; enqueue(Terminate.INSTANCE); if (!destroyed.getAndSet(true)) { + sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();}); try { synchronized (this) { while (!queue.isEmpty()) { @@ -208,6 +219,24 @@ public class Messenger implements Runnable { void destroy(); } + private static class MessageTask implements Runnable { + + final MessageHandler handler; + Message msg; + + MessageTask(final Message msg, final MessageHandler handler) { + this.msg = msg; + this.handler = handler; + } + + @Override + public void run() { + final Message msg = this.msg; + this.msg = null; + handler.handleMessage(msg); + } + } + private static class ReplyTask implements Task { final ReplyHandler handler; |