From af72753359ee72003d10d92e3c7355a156d3e6d7 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 4 Aug 2020 18:34:18 +0000 Subject: Name the transport threads to understand how things are interconnected. --- jrt/src/com/yahoo/jrt/Transport.java | 19 +++++++++++------ jrt/src/com/yahoo/jrt/TransportThread.java | 4 ++-- jrt/src/com/yahoo/jrt/Worker.java | 8 +++---- jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java | 26 +++++++++++++++++------ jrt/src/com/yahoo/jrt/tool/RpcInvoker.java | 2 +- 5 files changed, 40 insertions(+), 19 deletions(-) (limited to 'jrt/src') diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 02a6e3e05f7..003e40b8aa9 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -20,6 +20,7 @@ public class Transport { private static final Logger log = Logger.getLogger(Transport.class.getName()); + private final String name; private final FatalErrorHandler fatalHandler; // NB: this must be set first private final CryptoEngine cryptoEngine; private final Connector connector; @@ -37,11 +38,13 @@ public class Transport { * error handler is registered, the default action is to log the * error and exit with exit code 1. * + * @param name used for identifying threads * @param fatalHandler fatal error handler * @param cryptoEngine crypto engine to use * @param numThreads number of {@link TransportThread}s. **/ - public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) { + public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) { + this.name = name; this.fatalHandler = fatalHandler; // NB: this must be set first this.cryptoEngine = cryptoEngine; this.tcpNoDelay = tcpNoDelay; @@ -49,13 +52,15 @@ public class Transport { worker = new Worker(this); runCnt = new AtomicInteger(numThreads); for (int i = 0; i < numThreads; ++i) { - threads.add(new TransportThread(this)); + threads.add(new TransportThread(this, i)); } } - public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads, true); } - public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads, true); } - public Transport(int numThreads, boolean tcpNoDelay) { this(null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); } - public Transport() { this(null, CryptoEngine.createDefault(), 1, true); } + public Transport(String name, CryptoEngine cryptoEngine, int numThreads) { this(name, null, cryptoEngine, numThreads, true); } + public Transport(String name, int numThreads) { this(name, null, CryptoEngine.createDefault(), numThreads, true); } + public Transport(String name, int numThreads, boolean tcpNoDelay) { this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); } + public Transport(String name) { this(name, null, CryptoEngine.createDefault(), 1, true); } + // Only for testing + public Transport() { this("default"); } /** * Select a random transport thread @@ -68,6 +73,8 @@ public class Transport { boolean getTcpNoDelay() { return tcpNoDelay; } + String getName() { return name; } + /** * Use the underlying CryptoEngine to create a CryptoSocket for * the client side of a connection. diff --git a/jrt/src/com/yahoo/jrt/TransportThread.java b/jrt/src/com/yahoo/jrt/TransportThread.java index bb41e67c3f1..8f158161888 100644 --- a/jrt/src/com/yahoo/jrt/TransportThread.java +++ b/jrt/src/com/yahoo/jrt/TransportThread.java @@ -167,9 +167,9 @@ public class TransportThread { return true; } - TransportThread(Transport transport) { + TransportThread(Transport transport, int index) { parent = transport; - thread = new Thread(new Run(), ""); + thread = new Thread(new Run(), transport.getName() + ".jrt-transport." + index); queue = new Queue(); myQueue = new Queue(); scheduler = new Scheduler(System.currentTimeMillis()); diff --git a/jrt/src/com/yahoo/jrt/Worker.java b/jrt/src/com/yahoo/jrt/Worker.java index 39c0e6773b2..d20ebadf856 100644 --- a/jrt/src/com/yahoo/jrt/Worker.java +++ b/jrt/src/com/yahoo/jrt/Worker.java @@ -40,13 +40,13 @@ class Worker { private static void preloadClassRequiredAtShutDown() { new CloseSocket(null); } - - private Thread thread = new Thread(new Run(), ""); - private Transport parent; - private ThreadQueue workQueue = new ThreadQueue(); + private final Thread thread; + private final Transport parent; + private final ThreadQueue workQueue = new ThreadQueue(); public Worker(Transport parent) { preloadClassRequiredAtShutDown(); + thread = new Thread(new Run(), parent.getName() + ".jrt-worker"); this.parent = parent; thread.setDaemon(true); thread.start(); diff --git a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java index 6ce8f3d1227..f19779732ba 100644 --- a/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java +++ b/jrt/src/com/yahoo/jrt/slobrok/server/Slobrok.java @@ -1,7 +1,21 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt.slobrok.server; -import com.yahoo.jrt.*; +import com.yahoo.jrt.Acceptor; +import com.yahoo.jrt.ErrorCode; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.MethodHandler; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.RequestWaiter; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.StringArray; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; +import com.yahoo.jrt.TargetWatcher; +import com.yahoo.jrt.Task; +import com.yahoo.jrt.Transport; import java.util.ArrayList; import java.util.HashMap; @@ -10,11 +24,11 @@ import java.util.Map; public class Slobrok { - Supervisor orb; - Acceptor listener; + Supervisor orb; + Acceptor listener; private Map services = new HashMap<>(); List pendingFetch = new ArrayList<>(); - Map targets = new HashMap<>(); + Map targets = new HashMap<>(); TargetMonitor monitor = new TargetMonitor(); int gencnt = 1; @@ -25,7 +39,7 @@ public class Slobrok { public Slobrok(int port) throws ListenFailedException { // NB: rpc must be single-threaded - orb = new Supervisor(new Transport(1)); + orb = new Supervisor(new Transport("slobrok-" + port, 1)); registerMethods(); try { listener = orb.listen(new Spec(port)); @@ -241,7 +255,7 @@ public class Slobrok { private class FetchMirror implements Runnable { public final Request req; - public final Task task; + public final Task task; public FetchMirror(Request req, int timeout) { req.detach(); diff --git a/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java index 6c36e8f9604..8f0702a9ecc 100644 --- a/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java +++ b/jrt/src/com/yahoo/jrt/tool/RpcInvoker.java @@ -77,7 +77,7 @@ public class RpcInvoker { if (connectspec.indexOf('/') < 0) connectspec = "tcp/" + connectspec; - supervisor = new Supervisor(new Transport()); + supervisor = new Supervisor(new Transport("invoker")); target = supervisor.connect(new Spec(connectspec)); Request request = createRequest(method,arguments); target.invokeSync(request,10.0); -- cgit v1.2.3