diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/rpc')
5 files changed, 39 insertions, 32 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index 22ed8b6d9fa..6c1f666835c 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -12,7 +12,7 @@ import java.util.Optional; * * @author bratseth */ -interface Client { +public interface Client { /** Creates a connection to a particular node in this */ NodeConnection createConnection(String hostname, int port); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java index fd8e0e4f81a..a93ddb0b360 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java @@ -1,11 +1,27 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; +import com.yahoo.vespa.config.search.DispatchNodesConfig; + +import java.util.Collection; +import java.util.List; + /** * Interface for getting a connection given a node id. * * @author balderersheim */ -public interface RpcConnectionPool { +public interface RpcConnectionPool extends AutoCloseable { + + /** Returns a connection to the given node id. */ Client.NodeConnection getConnection(int nodeId); + + + /** Will return a list of items that need a delayed close when updating node set. */ + default Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { return List.of(); } + + /** Shuts down all connections in the pool, and the underlying RPC client. */ + @Override + void close(); + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index 154002c4f77..b6228994ac8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -33,7 +33,7 @@ public class RpcInvokerFactory extends InvokerFactory { super(cluster, dispatchConfig); this.rpcResourcePool = rpcResourcePool; this.compressor = new CompressService(); - decodeType = convert(dispatchConfig.summaryDecodePolicy()); + this.decodeType = convert(dispatchConfig.summaryDecodePolicy()); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index 53dc54f7bc5..a59097e5fff 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -37,7 +37,7 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { this.clusterMonitor = clusterMonitor; this.pingSequenceId = node.createPingSequenceId(); this.pongHandler = pongHandler; - this. compressor = compressor; + this.compressor = compressor; } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index 63530a7f650..d1f22514481 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -3,8 +3,10 @@ package com.yahoo.search.dispatch.rpc; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; +import com.yahoo.search.dispatch.rpc.RpcClient.RpcNodeConnection; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; +import com.yahoo.vespa.config.search.DispatchNodesConfig.Node; import java.util.ArrayList; import java.util.Collection; @@ -19,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom; * * @author ollivir */ -public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { +public class RpcResourcePool implements RpcConnectionPool { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of(); @@ -35,46 +37,35 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { } public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) { - super(); rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); numConnections = dispatchConfig.numJrtConnectionsPerNode(); - updateNodes(nodesConfig).forEach(item -> { - try { - item.close(); - } catch (Exception e) {} + updateNodes(nodesConfig).forEach(pool -> { + try { pool.close(); } catch (Exception ignored) { } // Shouldn't throw. }); } - /** Will return a list of items that need a delayed close */ - public Collection<AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { - List<AutoCloseable> toClose = new ArrayList<>(); - var builder = new HashMap<Integer, NodeConnectionPool>(); + @Override + public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { + Map<Integer, NodeConnectionPool> currentPools = new HashMap<>(nodeConnectionPools); + Map<Integer, NodeConnectionPool> nextPools = new HashMap<>(); // Who can be reused - for (var node : nodesConfig.node()) { - var prev = nodeConnectionPools.get(node.key()); - NodeConnection nc = prev != null ? prev.nextConnection() : null; - if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection - && rpcNodeConnection.getPort() == node.port() - && rpcNodeConnection.getHostname().equals(node.host())) + for (Node node : nodesConfig.node()) { + if ( currentPools.containsKey(node.key()) + && currentPools.get(node.key()).nextConnection() instanceof RpcNodeConnection rpcNodeConnection + && rpcNodeConnection.getPort() == node.port() + && rpcNodeConnection.getHostname().equals(node.host())) { - builder.put(node.key(), prev); + nextPools.put(node.key(), currentPools.remove(node.key())); } else { - var connections = new ArrayList<NodeConnection>(numConnections); + ArrayList<NodeConnection> connections = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; i++) { connections.add(rpcClient.createConnection(node.host(), node.port())); } - builder.put(node.key(), new NodeConnectionPool(connections)); + nextPools.put(node.key(), new NodeConnectionPool(connections)); } } - // Who are not needed any more - nodeConnectionPools.forEach((key, pool) -> { - var survivor = builder.get(key); - if (survivor == null || pool != survivor) { - toClose.add(pool); - } - }); - this.nodeConnectionPools = Map.copyOf(builder); - return toClose; + this.nodeConnectionPools = Map.copyOf(nextPools); + return currentPools.values(); } @Override |