aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-08-04 18:34:18 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-08-04 18:34:18 +0000
commitaf72753359ee72003d10d92e3c7355a156d3e6d7 (patch)
treedf01de4919739a944082c72a2d805482a625e9e9 /jrt
parentbd3399a2677b32888ef2588adf1c976ed4cdb5cb (diff)
Name the transport threads to understand how things are interconnected.
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java19
-rw-r--r--jrt/src/com/yahoo/jrt/TransportThread.java4
-rw-r--r--jrt/src/com/yahoo/jrt/Worker.java8
-rw-r--r--jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java26
-rw-r--r--jrt/src/com/yahoo/jrt/tool/RpcInvoker.java2
-rw-r--r--jrt/tests/com/yahoo/jrt/EchoTest.java4
-rw-r--r--jrt/tests/com/yahoo/jrt/LatencyTest.java4
-rw-r--r--jrt/tests/com/yahoo/jrt/SessionTest.java7
8 files changed, 47 insertions, 27 deletions
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 02a6e3e05f7..003e40b8aa9 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -20,6 +20,7 @@ public class Transport {
private static final Logger log = Logger.getLogger(Transport.class.getName());
+ private final String name;
private final FatalErrorHandler fatalHandler; // NB: this must be set first
private final CryptoEngine cryptoEngine;
private final Connector connector;
@@ -37,11 +38,13 @@ public class Transport {
* error handler is registered, the default action is to log the
* error and exit with exit code 1.
*
+ * @param name used for identifying threads
* @param fatalHandler fatal error handler
* @param cryptoEngine crypto engine to use
* @param numThreads number of {@link TransportThread}s.
**/
- public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) {
+ public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) {
+ this.name = name;
this.fatalHandler = fatalHandler; // NB: this must be set first
this.cryptoEngine = cryptoEngine;
this.tcpNoDelay = tcpNoDelay;
@@ -49,13 +52,15 @@ public class Transport {
worker = new Worker(this);
runCnt = new AtomicInteger(numThreads);
for (int i = 0; i < numThreads; ++i) {
- threads.add(new TransportThread(this));
+ threads.add(new TransportThread(this, i));
}
}
- public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads, true); }
- public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads, true); }
- public Transport(int numThreads, boolean tcpNoDelay) { this(null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); }
- public Transport() { this(null, CryptoEngine.createDefault(), 1, true); }
+ public Transport(String name, CryptoEngine cryptoEngine, int numThreads) { this(name, null, cryptoEngine, numThreads, true); }
+ public Transport(String name, int numThreads) { this(name, null, CryptoEngine.createDefault(), numThreads, true); }
+ public Transport(String name, int numThreads, boolean tcpNoDelay) { this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); }
+ public Transport(String name) { this(name, null, CryptoEngine.createDefault(), 1, true); }
+ // Only for testing
+ public Transport() { this("default"); }
/**
* Select a random transport thread
@@ -68,6 +73,8 @@ public class Transport {
boolean getTcpNoDelay() { return tcpNoDelay; }
+ String getName() { return name; }
+
/**
* Use the underlying CryptoEngine to create a CryptoSocket for
* the client side of a connection.
diff --git a/jrt/src/com/yahoo/jrt/TransportThread.java b/jrt/src/com/yahoo/jrt/TransportThread.java
index bb41e67c3f1..8f158161888 100644
--- a/jrt/src/com/yahoo/jrt/TransportThread.java
+++ b/jrt/src/com/yahoo/jrt/TransportThread.java
@@ -167,9 +167,9 @@ public class TransportThread {
return true;
}
- TransportThread(Transport transport) {
+ TransportThread(Transport transport, int index) {
parent = transport;
- thread = new Thread(new Run(), "<jrt-transport>");
+ thread = new Thread(new Run(), transport.getName() + ".jrt-transport." + index);
queue = new Queue();
myQueue = new Queue();
scheduler = new Scheduler(System.currentTimeMillis());
diff --git a/jrt/src/com/yahoo/jrt/Worker.java b/jrt/src/com/yahoo/jrt/Worker.java
index 39c0e6773b2..d20ebadf856 100644
--- a/jrt/src/com/yahoo/jrt/Worker.java
+++ b/jrt/src/com/yahoo/jrt/Worker.java
@@ -40,13 +40,13 @@ class Worker {
private static void preloadClassRequiredAtShutDown() {
new CloseSocket(null);
}
-
- private Thread thread = new Thread(new Run(), "<jrt-worker>");
- private Transport parent;
- private ThreadQueue workQueue = new ThreadQueue();
+ private final Thread thread;
+ private final Transport parent;
+ private final ThreadQueue workQueue = new ThreadQueue();
public Worker(Transport parent) {
preloadClassRequiredAtShutDown();
+ thread = new Thread(new Run(), parent.getName() + ".jrt-worker");
this.parent = parent;
thread.setDaemon(true);
thread.start();
diff --git a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java
index 6ce8f3d1227..f19779732ba 100644
--- a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java
+++ b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java
@@ -1,7 +1,21 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt.slobrok.server;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.Acceptor;
+import com.yahoo.jrt.ErrorCode;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.MethodHandler;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.StringArray;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Target;
+import com.yahoo.jrt.TargetWatcher;
+import com.yahoo.jrt.Task;
+import com.yahoo.jrt.Transport;
import java.util.ArrayList;
import java.util.HashMap;
@@ -10,11 +24,11 @@ import java.util.Map;
public class Slobrok {
- Supervisor orb;
- Acceptor listener;
+ Supervisor orb;
+ Acceptor listener;
private Map<String,String> services = new HashMap<>();
List<FetchMirror> pendingFetch = new ArrayList<>();
- Map<String,Target> targets = new HashMap<>();
+ Map<String, Target> targets = new HashMap<>();
TargetMonitor monitor = new TargetMonitor();
int gencnt = 1;
@@ -25,7 +39,7 @@ public class Slobrok {
public Slobrok(int port) throws ListenFailedException {
// NB: rpc must be single-threaded
- orb = new Supervisor(new Transport(1));
+ orb = new Supervisor(new Transport("slobrok-" + port, 1));
registerMethods();
try {
listener = orb.listen(new Spec(port));
@@ -241,7 +255,7 @@ public class Slobrok {
private class FetchMirror implements Runnable {
public final Request req;
- public final Task task;
+ public final Task task;
public FetchMirror(Request req, int timeout) {
req.detach();
diff --git a/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java
index 6c36e8f9604..8f0702a9ecc 100644
--- a/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java
+++ b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java
@@ -77,7 +77,7 @@ public class RpcInvoker {
if (connectspec.indexOf('/') < 0)
connectspec = "tcp/" + connectspec;
- supervisor = new Supervisor(new Transport());
+ supervisor = new Supervisor(new Transport("invoker"));
target = supervisor.connect(new Spec(connectspec));
Request request = createRequest(method,arguments);
target.invokeSync(request,10.0);
diff --git a/jrt/tests/com/yahoo/jrt/EchoTest.java b/jrt/tests/com/yahoo/jrt/EchoTest.java
index 97139fd60ab..c71eae78ad9 100644
--- a/jrt/tests/com/yahoo/jrt/EchoTest.java
+++ b/jrt/tests/com/yahoo/jrt/EchoTest.java
@@ -91,8 +91,8 @@ public class EchoTest {
public void setUp() throws ListenFailedException {
metrics = TransportMetrics.getInstance();
startSnapshot = metrics.snapshot();
- server = new Supervisor(new Transport(crypto, 1));
- client = new Supervisor(new Transport(crypto, 1));
+ server = new Supervisor(new Transport("server", crypto, 1));
+ client = new Supervisor(new Transport("client", crypto, 1));
acceptor = server.listen(new Spec(0));
target = client.connect(new Spec("localhost", acceptor.port()));
server.addMethod(new Method("echo", "*", "*", this::rpc_echo));
diff --git a/jrt/tests/com/yahoo/jrt/LatencyTest.java b/jrt/tests/com/yahoo/jrt/LatencyTest.java
index 97d7affd6ea..c8ead8ebf77 100644
--- a/jrt/tests/com/yahoo/jrt/LatencyTest.java
+++ b/jrt/tests/com/yahoo/jrt/LatencyTest.java
@@ -18,8 +18,8 @@ public class LatencyTest {
private final Supervisor client;
private final Acceptor acceptor;
public Network(CryptoEngine crypto, int threads) throws ListenFailedException {
- server = new Supervisor(new Transport(crypto, threads));
- client = new Supervisor(new Transport(crypto, threads));
+ server = new Supervisor(new Transport("server", crypto, threads));
+ client = new Supervisor(new Transport("client", crypto, threads));
server.addMethod(new Method("inc", "i", "i", this::rpc_inc));
acceptor = server.listen(new Spec(0));
}
diff --git a/jrt/tests/com/yahoo/jrt/SessionTest.java b/jrt/tests/com/yahoo/jrt/SessionTest.java
index 29d6bb21d5f..b6568c2f283 100644
--- a/jrt/tests/com/yahoo/jrt/SessionTest.java
+++ b/jrt/tests/com/yahoo/jrt/SessionTest.java
@@ -122,13 +122,12 @@ public class SessionTest implements SessionHandler {
@Before
public void setUp() throws ListenFailedException {
Session.reset();
- server = new Test.Orb(new Transport(crypto, 1));
+ server = new Test.Orb(new Transport("server", crypto, 1));
server.setSessionHandler(this);
- client = new Test.Orb(new Transport(crypto, 1));
+ client = new Test.Orb(new Transport("client", crypto, 1));
client.setSessionHandler(this);
acceptor = server.listen(new Spec(0));
- target = client.connect(new Spec("localhost", acceptor.port()),
- new Session());
+ target = client.connect(new Spec("localhost", acceptor.port()), new Session());
server.addMethod(new Method("set", "i", "", this::rpc_set));
server.addMethod(new Method("get", "", "i", this::rpc_get));