diff options
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/Messenger.java | 33 |
1 files changed, 2 insertions, 31 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java index 63e5dbb2d04..2cb2f0967e5 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; -import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.log.LogLevel; import java.util.ArrayDeque; @@ -9,9 +8,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -27,7 +23,6 @@ public class Messenger implements Runnable { private static final Logger log = Logger.getLogger(Messenger.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final List<Task> children = new ArrayList<>(); - private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send")); private final Queue<Task> queue = new ArrayDeque<>(); private final Thread thread = new Thread(this, "Messenger"); @@ -69,13 +64,8 @@ public class Messenger implements Runnable { public void deliverMessage(final Message msg, final MessageHandler handler) { if (destroyed.get()) { msg.discard(); - return; - } - try { - sendExecutor.execute(new MessageTask(msg, handler)); - } catch (RejectedExecutionException e) { - msg.discard(); - log.warning("Execution rejected " + e.getMessage()); + } else { + handler.handleMessage(msg); } } @@ -136,7 +126,6 @@ public class Messenger implements Runnable { boolean done = false; enqueue(Terminate.INSTANCE); if (!destroyed.getAndSet(true)) { - sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();}); try { synchronized (this) { while (!queue.isEmpty()) { @@ -219,24 +208,6 @@ public class Messenger implements Runnable { void destroy(); } - private static class MessageTask implements Runnable { - - final MessageHandler handler; - Message msg; - - MessageTask(final Message msg, final MessageHandler handler) { - this.msg = msg; - this.handler = handler; - } - - @Override - public void run() { - final Message msg = this.msg; - this.msg = null; - handler.handleMessage(msg); - } - } - private static class ReplyTask implements Task { final ReplyHandler handler; |