From 4c230a023fccdc9308b7daa5c8a5f3861c5507d4 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 21 Sep 2020 07:42:58 +0000 Subject: Shutdown and join the rpc client transport threads. --- .../src/main/java/com/yahoo/search/dispatch/rpc/Client.java | 1 + .../src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java | 5 +++++ .../main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java | 8 ++++++-- .../src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java | 2 ++ .../java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java | 5 +++-- 5 files changed, 17 insertions(+), 4 deletions(-) (limited to 'container-search/src') diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index f4536a7aa4e..53cbee114b9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -16,6 +16,7 @@ interface Client { /** Creates a connection to a particular node in this */ NodeConnection createConnection(String hostname, int port); + void close(); interface ResponseReceiver { void receive(ResponseOrError response); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 9ee455c48d3..70d94c5a8a6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -29,6 +29,11 @@ class RpcClient implements Client { supervisor = new Supervisor(new Transport(name, transportThreads)); } + @Override + public void close() { + supervisor.transport().shutdown().join(); + } + @Override public NodeConnection createConnection(String hostname, int port) { return new RpcNodeConnection(hostname, port, supervisor); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index aada16eef9b..746461630dd 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -4,7 +4,6 @@ package com.yahoo.search.dispatch.rpc; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; -import com.yahoo.component.ComponentId; import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.compress.Compressor.Compression; @@ -35,17 +34,19 @@ public class RpcResourcePool extends AbstractComponent { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private final ImmutableMap nodeConnectionPools; + private final RpcClient client; RpcResourcePool(Map nodeConnections) { var builder = new ImmutableMap.Builder(); nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection)))); this.nodeConnectionPools = builder.build(); + client = null; } @Inject public RpcResourcePool(DispatchConfig dispatchConfig) { super(); - var client = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); + client = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); // Create rpc node connection pools indexed by the node distribution key var builder = new ImmutableMap.Builder(); @@ -82,6 +83,9 @@ public class RpcResourcePool extends AbstractComponent { public void deconstruct() { super.deconstruct(); nodeConnectionPools.values().forEach(NodeConnectionPool::release); + if (client != null) { + client.close(); + } } private class NodeConnectionPool { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java index 2fc8c0fd620..1a0037e4b8a 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java @@ -31,6 +31,8 @@ public class MockClient implements Client { /** Set to true to cause this to produce an error instead of a regular response */ public void setMalfunctioning(boolean malfunctioning) { this.malfunctioning = malfunctioning; } + @Override + public void close() { } @Override public NodeConnection createConnection(String hostname, int port) { return new MockNodeConnection(hostname, port); diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java index c421e9523ed..27fc3f85136 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java @@ -79,6 +79,8 @@ public class RpcSearchInvokerTest { private Client parameterCollectorClient(AtomicReference compressionTypeHolder, AtomicReference payloadHolder, AtomicInteger lengthHolder) { return new Client() { + @Override + public void close() { } @Override public NodeConnection createConnection(String hostname, int port) { return new NodeConnection() { @@ -97,8 +99,7 @@ public class RpcSearchInvokerTest { } @Override - public void close() { - } + public void close() { } }; } }; -- cgit v1.2.3