diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-24 14:44:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-24 14:44:58 +0200 |
commit | 99312e4f6fff510ce50ac0479ca47beacba083a5 (patch) | |
tree | e8b0796c8e6a06ebfeb17c35d0f3a292c9c24ae4 | |
parent | 62a74d8034b80ba25390168b6e938d73535cdaee (diff) | |
parent | 76315cb74e289c9a27e60a7441a2ee2360e9a58f (diff) |
Merge pull request #7439 from vespa-engine/revert-7408-balder/send-in-own-thread
Revert "Balder/send in own thread"
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java | 10 | ||||
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/Messenger.java | 39 |
2 files changed, 39 insertions, 10 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 0f0b704bba7..26e61e8917b 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -58,13 +58,13 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, private static Logger log = Logger.getLogger(MessageBus.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final ProtocolRepository protocolRepository = new ProtocolRepository(); - private final AtomicReference<Map<String, RoutingTable>> tablesRef = new AtomicReference<>(null); - private final CopyOnWriteHashMap<String, MessageHandler> sessions = new CopyOnWriteHashMap<>(); + private final AtomicReference<Map<String, RoutingTable>> tablesRef = new AtomicReference<Map<String, RoutingTable>>(null); + private final CopyOnWriteHashMap<String, MessageHandler> sessions = new CopyOnWriteHashMap<String, MessageHandler>(); private final Network net; private final Messenger msn; private final Resender resender; - private int maxPendingCount; - private int maxPendingSize; + private int maxPendingCount = 0; + private int maxPendingSize = 0; private int pendingCount = 0; private int pendingSize = 0; private final Thread careTaker = new Thread(this::sendBlockedMessages); @@ -440,7 +440,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, @Override public void setupRouting(RoutingSpec spec) { - Map<String, RoutingTable> tables = new HashMap<>(); + Map<String, RoutingTable> tables = new HashMap<String, RoutingTable>(); for (int i = 0, len = spec.getNumTables(); i < len; ++i) { RoutingTableSpec table = spec.getTable(i); String name = table.getProtocol(); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java index b9ea69cb116..4fb83386231 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.log.LogLevel; import java.util.ArrayDeque; @@ -8,6 +9,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -23,6 +27,7 @@ public class Messenger implements Runnable { private static final Logger log = Logger.getLogger(Messenger.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final List<Task> children = new ArrayList<>(); + private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send")); private final Queue<Task> queue = new ArrayDeque<>(); private final Thread thread = new Thread(this, "Messenger"); @@ -39,7 +44,7 @@ public class Messenger implements Runnable { * * @param task The task to add. */ - void addRecurrentTask(final Task task) { + public void addRecurrentTask(final Task task) { children.add(task); } @@ -64,8 +69,13 @@ public class Messenger implements Runnable { public void deliverMessage(final Message msg, final MessageHandler handler) { if (destroyed.get()) { msg.discard(); - } else { - handler.handleMessage(msg); + return; + } + try { + sendExecutor.execute(new MessageTask(msg, handler)); + } catch (RejectedExecutionException e) { + msg.discard(); + log.warning("Execution rejected " + e.getMessage()); } } @@ -126,6 +136,7 @@ public class Messenger implements Runnable { boolean done = false; enqueue(Terminate.INSTANCE); if (!destroyed.getAndSet(true)) { + sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();}); try { synchronized (this) { while (!queue.isEmpty()) { @@ -199,13 +210,31 @@ public class Messenger implements Runnable { /** * <p>This method is called when being executed.</p> */ - void run(); + public void run(); /** * <p>This method is called for all tasks, even if {@link #run()} was * never called.</p> */ - void destroy(); + public void destroy(); + } + + private static class MessageTask implements Runnable { + + final MessageHandler handler; + Message msg; + + MessageTask(final Message msg, final MessageHandler handler) { + this.msg = msg; + this.handler = handler; + } + + @Override + public void run() { + final Message msg = this.msg; + this.msg = null; + handler.handleMessage(msg); + } } private static class ReplyTask implements Task { |