diff options
author | jonmv <venstad@gmail.com> | 2023-07-10 19:50:56 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-07-13 10:30:49 +0200 |
commit | 4e19b88dde87c8186bde9ae9def6c0f24a87f784 (patch) | |
tree | 819094071196405f421d50296d1aaee4dda01e7c /container-search | |
parent | e2e7fc4f5c49451a210ca681416c848cb9ed7377 (diff) |
Simplify a bit
Diffstat (limited to 'container-search')
6 files changed, 26 insertions, 40 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index 0b627e91bc5..d66b6637cd3 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -96,20 +96,14 @@ public class ClusterMonitor<T> { * Ping all nodes which needs pinging to discover state changes */ public void ping(Executor executor) { - for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) { - BaseNodeMonitor<T> monitor= i.next(); - nodeManager.ping(this, monitor.getNode(), executor); // Cause call to failed or responded + for (var monitor : nodeMonitors()) { + if (closed.get()) return; // Do nothing to change state if close has started. + nodeManager.ping(this, monitor.getNode(), executor); } - if (closed.get()) return; // Do nothing to change state if close has started. nodeManager.pingIterationCompleted(); } /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ - public Iterator<BaseNodeMonitor<T>> nodeMonitorIterator() { - return nodeMonitors().iterator(); - } - - /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public List<BaseNodeMonitor<T>> nodeMonitors() { return List.copyOf(nodeMonitors.values()); } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java index 4af6757db8c..1cf36d75fc5 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java @@ -48,7 +48,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod * * @param id the id of this searcher * @param connections the connections of the cluster - * @param internal whether or not this cluster is internal (part of the same installation) + * @param internal whether this cluster is internal (part of the same installation) */ public ClusterSearcher(ComponentId id, List<T> connections, boolean internal) { this(id, connections, new Hasher<>(), internal); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index db7e80a95e5..41f02b1b580 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -108,7 +108,7 @@ public class Dispatcher extends AbstractComponent { (invokerFactory == null) ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig) : invokerFactory); - searchCluster.addMonitoring(clusterMonitor); + searchCluster.addMonitoring(clusterMonitor); // TODO: Update, rather than add ... as this creates a pinger for each node return items; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java index 1206277a103..6b134dc23a6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java @@ -5,7 +5,7 @@ import java.time.Duration; import java.time.Instant; /** - * Contains start and and time. Exposes a duration, and lets you measure the time difference between 2 requests. + * Contains start and end time. Exposes a duration, and lets you measure the time difference between 2 requests. * It does use System.nanoTime to get a steady clock. * * @author baldersheim diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index 53dc54f7bc5..a59097e5fff 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -37,7 +37,7 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { this.clusterMonitor = clusterMonitor; this.pingSequenceId = node.createPingSequenceId(); this.pongHandler = pongHandler; - this. compressor = compressor; + this.compressor = compressor; } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index 63530a7f650..993cab11cb5 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -3,12 +3,15 @@ package com.yahoo.search.dispatch.rpc; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; +import com.yahoo.search.dispatch.rpc.RpcClient.RpcNodeConnection; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; +import com.yahoo.vespa.config.search.DispatchNodesConfig.Node; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -35,46 +38,35 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { } public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) { - super(); rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); numConnections = dispatchConfig.numJrtConnectionsPerNode(); - updateNodes(nodesConfig).forEach(item -> { - try { - item.close(); - } catch (Exception e) {} + updateNodes(nodesConfig).forEach(pool -> { + try { pool.close(); } catch (Exception ignored) { } // Shouldn't throw. }); } - /** Will return a list of items that need a delayed close */ - public Collection<AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { - List<AutoCloseable> toClose = new ArrayList<>(); - var builder = new HashMap<Integer, NodeConnectionPool>(); + /** Will return a list of items that need a delayed close. */ + public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { + Map<Integer, NodeConnectionPool> currentPools = new HashMap<>(nodeConnectionPools); + Map<Integer, NodeConnectionPool> nextPools = new HashMap<>(); // Who can be reused - for (var node : nodesConfig.node()) { - var prev = nodeConnectionPools.get(node.key()); - NodeConnection nc = prev != null ? prev.nextConnection() : null; - if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection - && rpcNodeConnection.getPort() == node.port() - && rpcNodeConnection.getHostname().equals(node.host())) + for (Node node : nodesConfig.node()) { + if ( currentPools.containsKey(node.key()) + && currentPools.get(node.key()).nextConnection() instanceof RpcNodeConnection rpcNodeConnection + && rpcNodeConnection.getPort() == node.port() + && rpcNodeConnection.getHostname().equals(node.host())) { - builder.put(node.key(), prev); + nextPools.put(node.key(), currentPools.remove(node.key())); } else { - var connections = new ArrayList<NodeConnection>(numConnections); + ArrayList<NodeConnection> connections = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; i++) { connections.add(rpcClient.createConnection(node.host(), node.port())); } - builder.put(node.key(), new NodeConnectionPool(connections)); + nextPools.put(node.key(), new NodeConnectionPool(connections)); } } - // Who are not needed any more - nodeConnectionPools.forEach((key, pool) -> { - var survivor = builder.get(key); - if (survivor == null || pool != survivor) { - toClose.add(pool); - } - }); - this.nodeConnectionPools = Map.copyOf(builder); - return toClose; + this.nodeConnectionPools = Map.copyOf(nextPools); + return currentPools.values(); } @Override |