diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-08-04 18:34:18 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-08-04 18:34:18 +0000 |
commit | af72753359ee72003d10d92e3c7355a156d3e6d7 (patch) | |
tree | df01de4919739a944082c72a2d805482a625e9e9 /jrt | |
parent | bd3399a2677b32888ef2588adf1c976ed4cdb5cb (diff) |
Name the transport threads to understand how things are interconnected.
Diffstat (limited to 'jrt')
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 19 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/TransportThread.java | 4 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Worker.java | 8 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java | 26 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/tool/RpcInvoker.java | 2 | ||||
-rw-r--r-- | jrt/tests/com/yahoo/jrt/EchoTest.java | 4 | ||||
-rw-r--r-- | jrt/tests/com/yahoo/jrt/LatencyTest.java | 4 | ||||
-rw-r--r-- | jrt/tests/com/yahoo/jrt/SessionTest.java | 7 |
8 files changed, 47 insertions, 27 deletions
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 02a6e3e05f7..003e40b8aa9 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -20,6 +20,7 @@ public class Transport { private static final Logger log = Logger.getLogger(Transport.class.getName()); + private final String name; private final FatalErrorHandler fatalHandler; // NB: this must be set first private final CryptoEngine cryptoEngine; private final Connector connector; @@ -37,11 +38,13 @@ public class Transport { * error handler is registered, the default action is to log the * error and exit with exit code 1. * + * @param name used for identifying threads * @param fatalHandler fatal error handler * @param cryptoEngine crypto engine to use * @param numThreads number of {@link TransportThread}s. **/ - public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) { + public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) { + this.name = name; this.fatalHandler = fatalHandler; // NB: this must be set first this.cryptoEngine = cryptoEngine; this.tcpNoDelay = tcpNoDelay; @@ -49,13 +52,15 @@ public class Transport { worker = new Worker(this); runCnt = new AtomicInteger(numThreads); for (int i = 0; i < numThreads; ++i) { - threads.add(new TransportThread(this)); + threads.add(new TransportThread(this, i)); } } - public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads, true); } - public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads, true); } - public Transport(int numThreads, boolean tcpNoDelay) { this(null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); } - public Transport() { this(null, CryptoEngine.createDefault(), 1, true); } + public Transport(String name, CryptoEngine cryptoEngine, int numThreads) { this(name, null, cryptoEngine, numThreads, true); } + public Transport(String name, int numThreads) { this(name, null, CryptoEngine.createDefault(), numThreads, true); } + public Transport(String name, int numThreads, boolean tcpNoDelay) { this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); } + public Transport(String name) { this(name, null, CryptoEngine.createDefault(), 1, true); } + // Only for testing + public Transport() { this("default"); } /** * Select a random transport thread @@ -68,6 +73,8 @@ public class Transport { boolean getTcpNoDelay() { return tcpNoDelay; } + String getName() { return name; } + /** * Use the underlying CryptoEngine to create a CryptoSocket for * the client side of a connection. diff --git a/jrt/src/com/yahoo/jrt/TransportThread.java b/jrt/src/com/yahoo/jrt/TransportThread.java index bb41e67c3f1..8f158161888 100644 --- a/jrt/src/com/yahoo/jrt/TransportThread.java +++ b/jrt/src/com/yahoo/jrt/TransportThread.java @@ -167,9 +167,9 @@ public class TransportThread { return true; } - TransportThread(Transport transport) { + TransportThread(Transport transport, int index) { parent = transport; - thread = new Thread(new Run(), "<jrt-transport>"); + thread = new Thread(new Run(), transport.getName() + ".jrt-transport." + index); queue = new Queue(); myQueue = new Queue(); scheduler = new Scheduler(System.currentTimeMillis()); diff --git a/jrt/src/com/yahoo/jrt/Worker.java b/jrt/src/com/yahoo/jrt/Worker.java index 39c0e6773b2..d20ebadf856 100644 --- a/jrt/src/com/yahoo/jrt/Worker.java +++ b/jrt/src/com/yahoo/jrt/Worker.java @@ -40,13 +40,13 @@ class Worker { private static void preloadClassRequiredAtShutDown() { new CloseSocket(null); } - - private Thread thread = new Thread(new Run(), "<jrt-worker>"); - private Transport parent; - private ThreadQueue workQueue = new ThreadQueue(); + private final Thread thread; + private final Transport parent; + private final ThreadQueue workQueue = new ThreadQueue(); public Worker(Transport parent) { preloadClassRequiredAtShutDown(); + thread = new Thread(new Run(), parent.getName() + ".jrt-worker"); this.parent = parent; thread.setDaemon(true); thread.start(); diff --git a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java index 6ce8f3d1227..f19779732ba 100644 --- a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java +++ b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java @@ -1,7 +1,21 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt.slobrok.server; -import com.yahoo.jrt.*; +import com.yahoo.jrt.Acceptor; +import com.yahoo.jrt.ErrorCode; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.MethodHandler; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.RequestWaiter; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.StringArray; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; +import com.yahoo.jrt.TargetWatcher; +import com.yahoo.jrt.Task; +import com.yahoo.jrt.Transport; import java.util.ArrayList; import java.util.HashMap; @@ -10,11 +24,11 @@ import java.util.Map; public class Slobrok { - Supervisor orb; - Acceptor listener; + Supervisor orb; + Acceptor listener; private Map<String,String> services = new HashMap<>(); List<FetchMirror> pendingFetch = new ArrayList<>(); - Map<String,Target> targets = new HashMap<>(); + Map<String, Target> targets = new HashMap<>(); TargetMonitor monitor = new TargetMonitor(); int gencnt = 1; @@ -25,7 +39,7 @@ public class Slobrok { public Slobrok(int port) throws ListenFailedException { // NB: rpc must be single-threaded - orb = new Supervisor(new Transport(1)); + orb = new Supervisor(new Transport("slobrok-" + port, 1)); registerMethods(); try { listener = orb.listen(new Spec(port)); @@ -241,7 +255,7 @@ public class Slobrok { private class FetchMirror implements Runnable { public final Request req; - public final Task task; + public final Task task; public FetchMirror(Request req, int timeout) { req.detach(); diff --git a/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java index 6c36e8f9604..8f0702a9ecc 100644 --- a/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java +++ b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java @@ -77,7 +77,7 @@ public class RpcInvoker { if (connectspec.indexOf('/') < 0) connectspec = "tcp/" + connectspec; - supervisor = new Supervisor(new Transport()); + supervisor = new Supervisor(new Transport("invoker")); target = supervisor.connect(new Spec(connectspec)); Request request = createRequest(method,arguments); target.invokeSync(request,10.0); diff --git a/jrt/tests/com/yahoo/jrt/EchoTest.java b/jrt/tests/com/yahoo/jrt/EchoTest.java index 97139fd60ab..c71eae78ad9 100644 --- a/jrt/tests/com/yahoo/jrt/EchoTest.java +++ b/jrt/tests/com/yahoo/jrt/EchoTest.java @@ -91,8 +91,8 @@ public class EchoTest { public void setUp() throws ListenFailedException { metrics = TransportMetrics.getInstance(); startSnapshot = metrics.snapshot(); - server = new Supervisor(new Transport(crypto, 1)); - client = new Supervisor(new Transport(crypto, 1)); + server = new Supervisor(new Transport("server", crypto, 1)); + client = new Supervisor(new Transport("client", crypto, 1)); acceptor = server.listen(new Spec(0)); target = client.connect(new Spec("localhost", acceptor.port())); server.addMethod(new Method("echo", "*", "*", this::rpc_echo)); diff --git a/jrt/tests/com/yahoo/jrt/LatencyTest.java b/jrt/tests/com/yahoo/jrt/LatencyTest.java index 97d7affd6ea..c8ead8ebf77 100644 --- a/jrt/tests/com/yahoo/jrt/LatencyTest.java +++ b/jrt/tests/com/yahoo/jrt/LatencyTest.java @@ -18,8 +18,8 @@ public class LatencyTest { private final Supervisor client; private final Acceptor acceptor; public Network(CryptoEngine crypto, int threads) throws ListenFailedException { - server = new Supervisor(new Transport(crypto, threads)); - client = new Supervisor(new Transport(crypto, threads)); + server = new Supervisor(new Transport("server", crypto, threads)); + client = new Supervisor(new Transport("client", crypto, threads)); server.addMethod(new Method("inc", "i", "i", this::rpc_inc)); acceptor = server.listen(new Spec(0)); } diff --git a/jrt/tests/com/yahoo/jrt/SessionTest.java b/jrt/tests/com/yahoo/jrt/SessionTest.java index 29d6bb21d5f..b6568c2f283 100644 --- a/jrt/tests/com/yahoo/jrt/SessionTest.java +++ b/jrt/tests/com/yahoo/jrt/SessionTest.java @@ -122,13 +122,12 @@ public class SessionTest implements SessionHandler { @Before public void setUp() throws ListenFailedException { Session.reset(); - server = new Test.Orb(new Transport(crypto, 1)); + server = new Test.Orb(new Transport("server", crypto, 1)); server.setSessionHandler(this); - client = new Test.Orb(new Transport(crypto, 1)); + client = new Test.Orb(new Transport("client", crypto, 1)); client.setSessionHandler(this); acceptor = server.listen(new Spec(0)); - target = client.connect(new Spec("localhost", acceptor.port()), - new Session()); + target = client.connect(new Spec("localhost", acceptor.port()), new Session()); server.addMethod(new Method("set", "i", "", this::rpc_set)); server.addMethod(new Method("get", "", "i", this::rpc_get)); |