diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-22 10:23:48 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-22 22:59:46 +0200 |
commit | 647955cea1af32ac2b6a0daa2bc671bdc143c7f6 (patch) | |
tree | b482ac18ea6655d959214a1bdedf9b5cdefe3d1f /messagebus | |
parent | b50ad72316837ec1e36718f604519a379ea23801 (diff) |
Just send in own thread.
Diffstat (limited to 'messagebus')
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/Messenger.java | 39 |
1 files changed, 5 insertions, 34 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java index 4fb83386231..b9ea69cb116 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; -import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.log.LogLevel; import java.util.ArrayDeque; @@ -9,9 +8,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -27,7 +23,6 @@ public class Messenger implements Runnable { private static final Logger log = Logger.getLogger(Messenger.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final List<Task> children = new ArrayList<>(); - private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send")); private final Queue<Task> queue = new ArrayDeque<>(); private final Thread thread = new Thread(this, "Messenger"); @@ -44,7 +39,7 @@ public class Messenger implements Runnable { * * @param task The task to add. */ - public void addRecurrentTask(final Task task) { + void addRecurrentTask(final Task task) { children.add(task); } @@ -69,13 +64,8 @@ public class Messenger implements Runnable { public void deliverMessage(final Message msg, final MessageHandler handler) { if (destroyed.get()) { msg.discard(); - return; - } - try { - sendExecutor.execute(new MessageTask(msg, handler)); - } catch (RejectedExecutionException e) { - msg.discard(); - log.warning("Execution rejected " + e.getMessage()); + } else { + handler.handleMessage(msg); } } @@ -136,7 +126,6 @@ public class Messenger implements Runnable { boolean done = false; enqueue(Terminate.INSTANCE); if (!destroyed.getAndSet(true)) { - sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();}); try { synchronized (this) { while (!queue.isEmpty()) { @@ -210,31 +199,13 @@ public class Messenger implements Runnable { /** * <p>This method is called when being executed.</p> */ - public void run(); + void run(); /** * <p>This method is called for all tasks, even if {@link #run()} was * never called.</p> */ - public void destroy(); - } - - private static class MessageTask implements Runnable { - - final MessageHandler handler; - Message msg; - - MessageTask(final Message msg, final MessageHandler handler) { - this.msg = msg; - this.handler = handler; - } - - @Override - public void run() { - final Message msg = this.msg; - this.msg = null; - handler.handleMessage(msg); - } + void destroy(); } private static class ReplyTask implements Task { |