summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-10-24 14:44:58 +0200
committerGitHub <noreply@github.com>2018-10-24 14:44:58 +0200
commit99312e4f6fff510ce50ac0479ca47beacba083a5 (patch)
treee8b0796c8e6a06ebfeb17c35d0f3a292c9c24ae4
parent62a74d8034b80ba25390168b6e938d73535cdaee (diff)
parent76315cb74e289c9a27e60a7441a2ee2360e9a58f (diff)
Merge pull request #7439 from vespa-engine/revert-7408-balder/send-in-own-thread
Revert "Balder/send in own thread"
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java10
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/Messenger.java39
2 files changed, 39 insertions, 10 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 0f0b704bba7..26e61e8917b 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -58,13 +58,13 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
private static Logger log = Logger.getLogger(MessageBus.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final ProtocolRepository protocolRepository = new ProtocolRepository();
- private final AtomicReference<Map<String, RoutingTable>> tablesRef = new AtomicReference<>(null);
- private final CopyOnWriteHashMap<String, MessageHandler> sessions = new CopyOnWriteHashMap<>();
+ private final AtomicReference<Map<String, RoutingTable>> tablesRef = new AtomicReference<Map<String, RoutingTable>>(null);
+ private final CopyOnWriteHashMap<String, MessageHandler> sessions = new CopyOnWriteHashMap<String, MessageHandler>();
private final Network net;
private final Messenger msn;
private final Resender resender;
- private int maxPendingCount;
- private int maxPendingSize;
+ private int maxPendingCount = 0;
+ private int maxPendingSize = 0;
private int pendingCount = 0;
private int pendingSize = 0;
private final Thread careTaker = new Thread(this::sendBlockedMessages);
@@ -440,7 +440,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
@Override
public void setupRouting(RoutingSpec spec) {
- Map<String, RoutingTable> tables = new HashMap<>();
+ Map<String, RoutingTable> tables = new HashMap<String, RoutingTable>();
for (int i = 0, len = spec.getNumTables(); i < len; ++i) {
RoutingTableSpec table = spec.getTable(i);
String name = table.getProtocol();
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
index b9ea69cb116..4fb83386231 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.log.LogLevel;
import java.util.ArrayDeque;
@@ -8,6 +9,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -23,6 +27,7 @@ public class Messenger implements Runnable {
private static final Logger log = Logger.getLogger(Messenger.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final List<Task> children = new ArrayList<>();
+ private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send"));
private final Queue<Task> queue = new ArrayDeque<>();
private final Thread thread = new Thread(this, "Messenger");
@@ -39,7 +44,7 @@ public class Messenger implements Runnable {
*
* @param task The task to add.
*/
- void addRecurrentTask(final Task task) {
+ public void addRecurrentTask(final Task task) {
children.add(task);
}
@@ -64,8 +69,13 @@ public class Messenger implements Runnable {
public void deliverMessage(final Message msg, final MessageHandler handler) {
if (destroyed.get()) {
msg.discard();
- } else {
- handler.handleMessage(msg);
+ return;
+ }
+ try {
+ sendExecutor.execute(new MessageTask(msg, handler));
+ } catch (RejectedExecutionException e) {
+ msg.discard();
+ log.warning("Execution rejected " + e.getMessage());
}
}
@@ -126,6 +136,7 @@ public class Messenger implements Runnable {
boolean done = false;
enqueue(Terminate.INSTANCE);
if (!destroyed.getAndSet(true)) {
+ sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();});
try {
synchronized (this) {
while (!queue.isEmpty()) {
@@ -199,13 +210,31 @@ public class Messenger implements Runnable {
/**
* <p>This method is called when being executed.</p>
*/
- void run();
+ public void run();
/**
* <p>This method is called for all tasks, even if {@link #run()} was
* never called.</p>
*/
- void destroy();
+ public void destroy();
+ }
+
+ private static class MessageTask implements Runnable {
+
+ final MessageHandler handler;
+ Message msg;
+
+ MessageTask(final Message msg, final MessageHandler handler) {
+ this.msg = msg;
+ this.handler = handler;
+ }
+
+ @Override
+ public void run() {
+ final Message msg = this.msg;
+ this.msg = null;
+ handler.handleMessage(msg);
+ }
}
private static class ReplyTask implements Task {