summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-10-22 10:23:48 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2018-10-22 22:59:46 +0200
commit647955cea1af32ac2b6a0daa2bc671bdc143c7f6 (patch)
treeb482ac18ea6655d959214a1bdedf9b5cdefe3d1f /messagebus
parentb50ad72316837ec1e36718f604519a379ea23801 (diff)
Just send in own thread.
Diffstat (limited to 'messagebus')
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/Messenger.java39
1 files changed, 5 insertions, 34 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
index 4fb83386231..b9ea69cb116 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
-import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.log.LogLevel;
import java.util.ArrayDeque;
@@ -9,9 +8,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -27,7 +23,6 @@ public class Messenger implements Runnable {
private static final Logger log = Logger.getLogger(Messenger.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final List<Task> children = new ArrayList<>();
- private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send"));
private final Queue<Task> queue = new ArrayDeque<>();
private final Thread thread = new Thread(this, "Messenger");
@@ -44,7 +39,7 @@ public class Messenger implements Runnable {
*
* @param task The task to add.
*/
- public void addRecurrentTask(final Task task) {
+ void addRecurrentTask(final Task task) {
children.add(task);
}
@@ -69,13 +64,8 @@ public class Messenger implements Runnable {
public void deliverMessage(final Message msg, final MessageHandler handler) {
if (destroyed.get()) {
msg.discard();
- return;
- }
- try {
- sendExecutor.execute(new MessageTask(msg, handler));
- } catch (RejectedExecutionException e) {
- msg.discard();
- log.warning("Execution rejected " + e.getMessage());
+ } else {
+ handler.handleMessage(msg);
}
}
@@ -136,7 +126,6 @@ public class Messenger implements Runnable {
boolean done = false;
enqueue(Terminate.INSTANCE);
if (!destroyed.getAndSet(true)) {
- sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();});
try {
synchronized (this) {
while (!queue.isEmpty()) {
@@ -210,31 +199,13 @@ public class Messenger implements Runnable {
/**
* <p>This method is called when being executed.</p>
*/
- public void run();
+ void run();
/**
* <p>This method is called for all tasks, even if {@link #run()} was
* never called.</p>
*/
- public void destroy();
- }
-
- private static class MessageTask implements Runnable {
-
- final MessageHandler handler;
- Message msg;
-
- MessageTask(final Message msg, final MessageHandler handler) {
- this.msg = msg;
- this.handler = handler;
- }
-
- @Override
- public void run() {
- final Message msg = this.msg;
- this.msg = null;
- handler.handleMessage(msg);
- }
+ void destroy();
}
private static class ReplyTask implements Task {