diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-24 14:44:16 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-24 14:44:16 +0100 |
commit | 9e60ba29bcdc287aaba158dbac7523547ae6d232 (patch) | |
tree | 3e6835a3eeb4490d6254f368bb06a8fc5f33efdd /container-search/src/main/java | |
parent | 2edd631a79e96791d9638f7b3a80b15c9f94d1f7 (diff) | |
parent | d87842d60c12289fc8968e00f0d1f8ae944ce2ae (diff) |
Merge pull request #24972 from vespa-engine/balder/gc-ordered-groups
Cleanup the concept of orderedGroups. Just use a single way of access…
Diffstat (limited to 'container-search/src/main/java')
9 files changed, 201 insertions, 193 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 41f3f5bdead..92a522e9970 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -16,6 +16,7 @@ import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.query.profile.types.FieldDescription; @@ -26,6 +27,7 @@ import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; import java.time.Duration; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -52,12 +54,20 @@ public class Dispatcher extends AbstractComponent { /** If set will control computation of how many hits will be fetched from each partition.*/ public static final CompoundName topKProbability = CompoundName.fromComponents(DISPATCH, TOP_K_PROBABILITY); - private final ClusterMonitor<Node> clusterMonitor; - private final LoadBalancer loadBalancer; - private final SearchCluster searchCluster; - private final InvokerFactory invokerFactory; - private final int maxHitsPerNode; + private final DispatchConfig dispatchConfig; private final RpcResourcePool rpcResourcePool; + private final SearchCluster searchCluster; + private final ClusterMonitor<Node> clusterMonitor; + private volatile VolatileItems volatileItems; + + private static class VolatileItems { + final LoadBalancer loadBalancer; + final InvokerFactory invokerFactory; + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { + this.loadBalancer = loadBalancer; + this.invokerFactory = invokerFactory; + } + } private static final QueryProfileType argumentType; @@ -72,53 +82,36 @@ public class Dispatcher extends AbstractComponent { public static QueryProfileType getArgumentType() { return argumentType; } @Inject - public Dispatcher(ComponentId clusterId, - DispatchConfig dispatchConfig, - DispatchNodesConfig nodesConfig, - VipStatus vipStatus) { - this(clusterId, dispatchConfig, nodesConfig, vipStatus, - new RpcResourcePool(dispatchConfig, nodesConfig)); - - } - private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, - DispatchNodesConfig nodesConfig, VipStatus vipStatus, - RpcResourcePool resourcePool) { - this(new SearchCluster(clusterId.stringValue(), dispatchConfig, nodesConfig, - vipStatus, new RpcPingFactory(resourcePool)), - dispatchConfig, resourcePool); - - } - - private Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, - RpcResourcePool rpcResourcePool) { - this(new ClusterMonitor<>(searchCluster, true), - searchCluster, dispatchConfig, - new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig), - rpcResourcePool); - } - - /* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ - protected Dispatcher(ClusterMonitor<Node> clusterMonitor, - SearchCluster searchCluster, - DispatchConfig dispatchConfig, - InvokerFactory invokerFactory) { - this(clusterMonitor, searchCluster, dispatchConfig, invokerFactory, null); + public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, + DispatchNodesConfig nodesConfig, VipStatus vipStatus) { + this.dispatchConfig = dispatchConfig; + rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig); + searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), + nodesConfig, vipStatus, new RpcPingFactory(rpcResourcePool)); + clusterMonitor = new ClusterMonitor<>(searchCluster, true); + volatileItems = update(null); + initialWarmup(dispatchConfig.warmuptime()); } - private Dispatcher(ClusterMonitor<Node> clusterMonitor, - SearchCluster searchCluster, - DispatchConfig dispatchConfig, - InvokerFactory invokerFactory, - RpcResourcePool rpcResourcePool) { - + /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ + Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, + DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { + this.dispatchConfig = dispatchConfig; + this.rpcResourcePool = null; this.searchCluster = searchCluster; this.clusterMonitor = clusterMonitor; - this.loadBalancer = new LoadBalancer(searchCluster.orderedGroups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())); - this.invokerFactory = invokerFactory; - this.rpcResourcePool = rpcResourcePool; - this.maxHitsPerNode = dispatchConfig.maxHitsPerNode(); + this.volatileItems = update(invokerFactory); + } + private VolatileItems update(InvokerFactory invokerFactory) { + var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), + (invokerFactory == null) + ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig) + : invokerFactory); searchCluster.addMonitoring(clusterMonitor); - Thread warmup = new Thread(() -> warmup(dispatchConfig.warmuptime())); + return items; + } + private void initialWarmup(double warmupTime) { + Thread warmup = new Thread(() -> warmup(warmupTime)); warmup.start(); try { while ( ! searchCluster.hasInformationAboutAllNodes()) { @@ -152,7 +145,7 @@ public class Dispatcher extends AbstractComponent { } public boolean allGroupsHaveSize1() { - return searchCluster.allGroupsHaveSize1(); + return searchCluster.groupList().groups().stream().allMatch(g -> g.nodes().size() == 1); } @Override @@ -165,13 +158,14 @@ public class Dispatcher extends AbstractComponent { } public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) { - return invokerFactory.createFillInvoker(searcher, result); + return volatileItems.invokerFactory.createFillInvoker(searcher, result); } public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) { - SearchCluster cluster = searchCluster; // Take a snapshot - InvokerFactory factory = invokerFactory; - SearchInvoker invoker = getSearchPathInvoker(query, searcher, cluster, factory, maxHitsPerNode).orElseGet(() -> getInternalInvoker(query, searcher, cluster, loadBalancer, factory, maxHitsPerNode)); + VolatileItems items = volatileItems; // Take a snapshot + int maxHitsPerNode = dispatchConfig.maxHitsPerNode(); + SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.invokerFactory, maxHitsPerNode) + .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.loadBalancer, items.invokerFactory, maxHitsPerNode)); if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { query.setHits(0); @@ -181,7 +175,7 @@ public class Dispatcher extends AbstractComponent { } /** Builds an invoker based on searchpath */ - private static Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster, + private static Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher, SearchGroups cluster, InvokerFactory invokerFactory, int maxHitsPerNode) { String searchPath = query.getModel().getSearchPath(); if (searchPath == null) return Optional.empty(); @@ -216,9 +210,9 @@ public class Dispatcher extends AbstractComponent { } int covered = cluster.groupsWithSufficientCoverage(); - int groups = cluster.orderedGroups().size(); + int groups = cluster.groupList().size(); int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS); - Set<Integer> rejected = rejectGroupBlockingFeed(cluster.orderedGroups()); + Set<Integer> rejected = rejectGroupBlockingFeed(cluster.groupList().groups()); for (int i = 0; i < max; i++) { Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected); if (groupInCluster.isEmpty()) break; // No groups available @@ -254,7 +248,7 @@ public class Dispatcher extends AbstractComponent { * * @return a modifiable set containing the single group to reject, or null otherwise */ - private static Set<Integer> rejectGroupBlockingFeed(List<Group> groups) { + private static Set<Integer> rejectGroupBlockingFeed(Collection<Group> groups) { if (groups.size() == 1) return null; List<Group> groupsRejectingFeed = groups.stream().filter(Group::isBlockingWrites).toList(); if (groupsRejectingFeed.size() != 1) return null; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index 4b40dcf6c68..d6fb6de6354 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -6,8 +6,8 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; @@ -24,12 +24,12 @@ import java.util.Set; public abstract class InvokerFactory { private static final double SKEW_FACTOR = 0.05; - private final SearchCluster searchCluster; + private final SearchGroups cluster; private final DispatchConfig dispatchConfig; private final TopKEstimator hitEstimator; - public InvokerFactory(SearchCluster searchCluster, DispatchConfig dispatchConfig) { - this.searchCluster = searchCluster; + public InvokerFactory(SearchGroups searchCluster, DispatchConfig dispatchConfig) { + this.cluster = searchCluster; this.dispatchConfig = dispatchConfig; this.hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR); } @@ -57,7 +57,7 @@ public abstract class InvokerFactory { List<Node> nodes, boolean acceptIncompleteCoverage, int maxHits) { - Group group = searchCluster.group(nodes.get(0).group()).get(); // Nodes must be of the same group + Group group = cluster.get(nodes.get(0).group()); // Nodes must be of the same group List<SearchInvoker> invokers = new ArrayList<>(nodes.size()); Set<Integer> failed = null; for (Node node : nodes) { @@ -85,7 +85,7 @@ public abstract class InvokerFactory { success.add(node); } } - if ( ! searchCluster.isPartialGroupCoverageSufficient(success) && !acceptIncompleteCoverage) { + if ( ! cluster.isPartialGroupCoverageSufficient(success) && !acceptIncompleteCoverage) { return Optional.empty(); } if (invokers.isEmpty()) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 52cc6ad7711..edf9de4f135 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -5,7 +5,10 @@ import com.yahoo.search.dispatch.searchcluster.Group; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.Set; @@ -28,15 +31,15 @@ public class LoadBalancer { private static final Duration INITIAL_QUERY_TIME = Duration.ofMillis(1); private static final double MIN_QUERY_TIME = Duration.ofMillis(1).toMillis()/1000.0; - private final List<GroupStatus> scoreboard; + private final Map<Integer, GroupStatus> scoreboard; private final GroupScheduler scheduler; public enum Policy { ROUNDROBIN, LATENCY_AMORTIZED_OVER_REQUESTS, LATENCY_AMORTIZED_OVER_TIME, BEST_OF_RANDOM_2} - public LoadBalancer(List<Group> groups, Policy policy) { - this.scoreboard = new ArrayList<>(groups.size()); + public LoadBalancer(Collection<Group> groups, Policy policy) { + this.scoreboard = new HashMap<>(); for (Group group : groups) { - scoreboard.add(new GroupStatus(group)); + scoreboard.put(group.id(), new GroupStatus(group)); } if (scoreboard.size() == 1) policy = Policy.ROUNDROBIN; @@ -81,12 +84,8 @@ public class LoadBalancer { */ public void releaseGroup(Group group, boolean success, RequestDuration searchTime) { synchronized (this) { - for (GroupStatus sched : scoreboard) { - if (sched.group.id() == group.id()) { - sched.release(success, searchTime); - break; - } - } + GroupStatus sched = scoreboard.get(group.id()); + sched.release(success, searchTime); } } @@ -146,31 +145,29 @@ public class LoadBalancer { private static class RoundRobinScheduler implements GroupScheduler { private int needle = 0; - private final List<GroupStatus> scoreboard; + private final Map<Integer, GroupStatus> scoreboard; - public RoundRobinScheduler(List<GroupStatus> scoreboard) { + public RoundRobinScheduler(Map<Integer, GroupStatus> scoreboard) { this.scoreboard = scoreboard; } @Override public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) { GroupStatus bestCandidate = null; - int bestIndex = needle; - int index = needle; + int groupId = needle; for (int i = 0; i < scoreboard.size(); i++) { - GroupStatus candidate = scoreboard.get(index); - if (rejectedGroups == null || !rejectedGroups.contains(candidate.group.id())) { + GroupStatus candidate = scoreboard.get(groupId); + if (rejectedGroups == null || !rejectedGroups.contains(candidate.groupId())) { GroupStatus better = betterGroup(bestCandidate, candidate); if (better == candidate) { bestCandidate = candidate; - bestIndex = index; } } - index = nextScoreboardIndex(index); + groupId = nextScoreboardIndex(groupId); } - needle = nextScoreboardIndex(bestIndex); - return Optional.ofNullable(bestCandidate); + needle = nextScoreboardIndex(bestCandidate.groupId()); + return Optional.of(bestCandidate); } /** @@ -203,7 +200,7 @@ public class LoadBalancer { static class AdaptiveScheduler implements GroupScheduler { enum Type {TIME, REQUESTS} private final Random random; - private final List<GroupStatus> scoreboard; + private final Map<Integer, GroupStatus> scoreboard; private static double toDouble(Duration duration) { return duration.toNanos()/1_000_000_000.0; @@ -250,18 +247,16 @@ public class LoadBalancer { Duration averageSearchTime() { return fromDouble(averageSearchTime);} } - public AdaptiveScheduler(Type type, Random random, List<GroupStatus> scoreboard) { + public AdaptiveScheduler(Type type, Random random, Map<Integer, GroupStatus> scoreboard) { this.random = random; this.scoreboard = scoreboard; - for (GroupStatus gs : scoreboard) { - gs.setDecayer(type == Type.REQUESTS ? new DecayByRequests() : new DecayByTime()); - } + scoreboard.forEach((id, gs) -> gs.setDecayer(type == Type.REQUESTS ? new DecayByRequests() : new DecayByTime())); } private Optional<GroupStatus> selectGroup(double needle, boolean requireCoverage, Set<Integer> rejected) { double sum = 0; int n = 0; - for (GroupStatus gs : scoreboard) { + for (GroupStatus gs : scoreboard.values()) { if (rejected == null || !rejected.contains(gs.group.id())) { if (!requireCoverage || gs.group.hasSufficientCoverage()) { sum += gs.weight(); @@ -273,7 +268,7 @@ public class LoadBalancer { return Optional.empty(); } double accum = 0; - for (GroupStatus gs : scoreboard) { + for (GroupStatus gs : scoreboard.values()) { if (rejected == null || !rejected.contains(gs.group.id())) { if (!requireCoverage || gs.group.hasSufficientCoverage()) { accum += gs.weight(); @@ -297,8 +292,8 @@ public class LoadBalancer { static class BestOfRandom2 implements GroupScheduler { private final Random random; - private final List<GroupStatus> scoreboard; - public BestOfRandom2(Random random, List<GroupStatus> scoreboard) { + private final Map<Integer, GroupStatus> scoreboard; + public BestOfRandom2(Random random, Map<Integer, GroupStatus> scoreboard) { this.random = random; this.scoreboard = scoreboard; } @@ -312,11 +307,10 @@ public class LoadBalancer { private GroupStatus selectBestOf2(Set<Integer> rejectedGroups, boolean requireCoverage) { List<Integer> candidates = new ArrayList<>(scoreboard.size()); - for (int i=0; i < scoreboard.size(); i++) { - GroupStatus gs = scoreboard.get(i); + for (GroupStatus gs : scoreboard.values()) { if (rejectedGroups == null || !rejectedGroups.contains(gs.group.id())) { if (!requireCoverage || gs.group.hasSufficientCoverage()) { - candidates.add(i); + candidates.add(gs.groupId()); } } } @@ -330,8 +324,7 @@ public class LoadBalancer { private GroupStatus selectRandom(List<Integer> candidates) { if ( ! candidates.isEmpty()) { int index = random.nextInt(candidates.size()); - Integer groupIndex = candidates.remove(index); - return scoreboard.get(groupIndex); + return scoreboard.get(candidates.remove(index)); } return null; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java index fc6d5ed01b1..568c9f117d7 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java @@ -3,8 +3,8 @@ package com.yahoo.search.dispatch; import com.yahoo.collections.Pair; import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.ArrayList; import java.util.Collection; @@ -36,7 +36,7 @@ public class SearchPath { * @return list of nodes chosen with the search path, or an empty list in which * case some other node selection logic should be used */ - public static List<Node> selectNodes(String searchPath, SearchCluster cluster) { + public static List<Node> selectNodes(String searchPath, SearchGroups cluster) { Optional<SearchPath> sp = SearchPath.fromString(searchPath); if (sp.isPresent()) { return sp.get().mapToNodes(cluster); @@ -73,8 +73,8 @@ public class SearchPath { this.groups = groups; } - private List<Node> mapToNodes(SearchCluster cluster) { - if (cluster.groups().isEmpty()) { + private List<Node> mapToNodes(SearchGroups cluster) { + if (cluster.isEmpty()) { return List.of(); } @@ -100,14 +100,14 @@ public class SearchPath { return nodes.isEmpty() && groups.isEmpty(); } - private Group selectRandomGroupWithSufficientCoverage(SearchCluster cluster, List<Integer> groupIds) { + private Group selectRandomGroupWithSufficientCoverage(SearchGroups cluster, List<Integer> groupIds) { while ( groupIds.size() > 1 ) { int index = random.nextInt(groupIds.size()); int groupId = groupIds.get(index); - Optional<Group> group = cluster.group(groupId); - if (group.isPresent()) { - if (group.get().hasSufficientCoverage()) { - return group.get(); + Group group = cluster.get(groupId); + if (group != null) { + if (group.hasSufficientCoverage()) { + return group; } else { groupIds.remove(index); } @@ -115,10 +115,10 @@ public class SearchPath { throw new InvalidSearchPathException("Invalid searchPath, cluster does not have " + (groupId + 1) + " groups"); } } - return cluster.group(groupIds.get(0)).get(); + return cluster.get(groupIds.get(0)); } - private Group selectGroup(SearchCluster cluster) { + private Group selectGroup(SearchGroups cluster) { if ( ! groups.isEmpty()) { List<Integer> potentialGroups = new ArrayList<>(); for (Selection groupSelection : groups) { @@ -130,7 +130,7 @@ public class SearchPath { } // pick any working group - return selectRandomGroupWithSufficientCoverage(cluster, new ArrayList<>(cluster.groups().keySet())); + return selectRandomGroupWithSufficientCoverage(cluster, new ArrayList<>(cluster.keys())); } private static SearchPath parseElement(String element) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index d5b1b876540..4466b03a713 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -7,8 +7,8 @@ import com.yahoo.search.Result; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Optional; @@ -21,8 +21,8 @@ public class RpcInvokerFactory extends InvokerFactory { private final RpcConnectionPool rpcResourcePool; private final CompressPayload compressor; - public RpcInvokerFactory(RpcConnectionPool rpcResourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) { - super(searchCluster, dispatchConfig); + public RpcInvokerFactory(RpcConnectionPool rpcResourcePool, SearchGroups cluster, DispatchConfig dispatchConfig) { + super(cluster, dispatchConfig); this.rpcResourcePool = rpcResourcePool; this.compressor = new CompressService(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index cf161638104..fbea58054da 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -1,8 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; -import com.google.common.collect.ImmutableList; - import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -22,7 +20,7 @@ public class Group { private final static int minDocsPerNodeToRequireLowSkew = 100; private final int id; - private final ImmutableList<Node> nodes; + private final List<Node> nodes; private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true); private final AtomicBoolean hasFullCoverage = new AtomicBoolean(true); private final AtomicLong activeDocuments = new AtomicLong(0); @@ -32,7 +30,7 @@ public class Group { public Group(int id, List<Node> nodes) { this.id = id; - this.nodes = ImmutableList.copyOf(nodes); + this.nodes = List.copyOf(nodes); int idx = 0; for(var node: nodes) { @@ -48,7 +46,7 @@ public class Group { public int id() { return id; } /** Returns the nodes in this group as an immutable list */ - public ImmutableList<Node> nodes() { return nodes; } + public List<Node> nodes() { return nodes; } /** * Returns whether this group has sufficient active documents diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index ca2fce0b32b..6c48b00d175 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -1,17 +1,15 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; -import com.google.common.collect.ImmutableMap; -import com.google.common.math.Quantiles; import com.yahoo.container.handler.VipStatus; import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; -import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; -import java.util.LinkedHashMap; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -28,14 +26,11 @@ public class SearchCluster implements NodeManager<Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); - private final double minActivedocsPercentage; private final String clusterId; private final VipStatus vipStatus; private final PingFactory pingFactory; - private final Map<Integer, Group> groups; - private final List<Group> orderedGroups; - private final List<Node> nodes; - private long nextLogTime = 0; + private final SearchGroupsImpl groups; + private volatile long nextLogTime = 0; /** * A search node on this local machine having the entire corpus, which we therefore @@ -47,48 +42,39 @@ public class SearchCluster implements NodeManager<Node> { */ private final Node localCorpusDispatchTarget; - public SearchCluster(String clusterId, DispatchConfig dispatchConfig, + public SearchCluster(String clusterId, double minActivedocsPercentage, DispatchNodesConfig nodesConfig, VipStatus vipStatus, PingFactory pingFactory) { + this(clusterId, minActivedocsPercentage, toNodes(nodesConfig), vipStatus, pingFactory); + } + public SearchCluster(String clusterId, double minActivedocsPercentage, List<Node> nodes, + VipStatus vipStatus, PingFactory pingFactory) { + this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory); + } + public SearchCluster(String clusterId, SearchGroupsImpl groups, VipStatus vipStatus, PingFactory pingFactory) { this.clusterId = clusterId; - this.minActivedocsPercentage = dispatchConfig.minActivedocsPercentage(); this.vipStatus = vipStatus; this.pingFactory = pingFactory; - - this.nodes = toNodes(nodesConfig); - - // Create groups - ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); - for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { - Group g = new Group(group.getKey(), group.getValue()); - groupsBuilder.put(group.getKey(), g); - } - this.groups = groupsBuilder.build(); - LinkedHashMap<Integer, Group> groupIntroductionOrder = new LinkedHashMap<>(); - nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group()))); - this.orderedGroups = List.copyOf(groupIntroductionOrder.values()); - - this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups); + this.groups = groups; + this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), groups); } @Override public String name() { return clusterId; } public void addMonitoring(ClusterMonitor<Node> clusterMonitor) { - for (var group : orderedGroups()) { + for (var group : groups()) { for (var node : group.nodes()) clusterMonitor.add(node, true); } } - private static Node findLocalCorpusDispatchTarget(String selfHostname, - List<Node> nodes, - Map<Integer, Group> groups) { + private static Node findLocalCorpusDispatchTarget(String selfHostname, SearchGroups groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. // It has all the data <==> No other nodes in the search cluster have the same group id as this node. // That local search node responds. // The search cluster to be searched has at least as many nodes as the container cluster we're running in. - List<Node> localSearchNodes = nodes.stream() + List<Node> localSearchNodes = groups.groups().stream().flatMap(g -> g.nodes().stream()) .filter(node -> node.hostname().equals(selfHostname)) .toList(); // Only use direct dispatch if we have exactly 1 search node on the same machine: @@ -109,27 +95,22 @@ public class SearchCluster implements NodeManager<Node> { .toList(); } - /** Returns the groups of this cluster as an immutable map indexed by group id */ - public Map<Integer, Group> groups() { return groups; } - - /** Returns the groups of this cluster as an immutable list in introduction order */ - public List<Group> orderedGroups() { return orderedGroups; } - - /** Returns the n'th (zero-indexed) group in the cluster if possible */ - public Optional<Group> group(int n) { - if (orderedGroups().size() > n) { - return Optional.of(orderedGroups().get(n)); - } else { - return Optional.empty(); + private static SearchGroupsImpl toGroups(Collection<Node> nodes, double minActivedocsPercentage) { + Map<Integer, Group> groups = new HashMap<>(); + for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { + Group g = new Group(group.getKey(), group.getValue()); + groups.put(group.getKey(), g); } + return new SearchGroupsImpl(Map.copyOf(groups), minActivedocsPercentage); } - public boolean allGroupsHaveSize1() { - return nodes.size() == groups.size(); - } + public SearchGroups groupList() { return groups; } + public Group group(int id) { return groups.get(id); } + + private Collection<Group> groups() { return groups.groups(); } public int groupsWithSufficientCoverage() { - return (int)groups.values().stream().filter(g -> g.hasSufficientCoverage()).count(); + return (int)groups().stream().filter(Group::hasSufficientCoverage).count(); } /** @@ -140,7 +121,7 @@ public class SearchCluster implements NodeManager<Node> { if ( localCorpusDispatchTarget == null) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage - Group localSearchGroup = groups().get(localCorpusDispatchTarget.group()); + Group localSearchGroup = groups.get(localCorpusDispatchTarget.group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is not down @@ -181,7 +162,7 @@ public class SearchCluster implements NodeManager<Node> { else if (usesLocalCorpusIn(node)) { // follow the status of this node // Do not take this out of rotation if we're a combined cluster of size 1, // as that can't be helpful, and leads to a deadlock where this node is never set back in service - if (nodeIsWorking || nodes.size() > 1) + if (nodeIsWorking || groups().stream().map(Group::nodes).count() > 1) setInRotationOnlyIf(nodeIsWorking); } } @@ -203,11 +184,11 @@ public class SearchCluster implements NodeManager<Node> { } public boolean hasInformationAboutAllNodes() { - return nodes.stream().allMatch(node -> node.isWorking() != null); + return groups().stream().allMatch(g -> g.nodes().stream().allMatch(node -> node.isWorking() != null)); } private boolean hasWorkingNodes() { - return nodes.stream().anyMatch(node -> node.isWorking() != Boolean.FALSE ); + return groups().stream().anyMatch(g -> g.nodes().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE)); } private boolean usesLocalCorpusIn(Node node) { @@ -226,31 +207,25 @@ public class SearchCluster implements NodeManager<Node> { } private void pingIterationCompletedSingleGroup() { - Group group = groups().values().iterator().next(); + Group group = groups().iterator().next(); group.aggregateNodeValues(); // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. updateSufficientCoverage(group, true); - boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), group.activeDocuments()); + boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), group.activeDocuments()); trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments()); } private void pingIterationCompletedMultipleGroups() { - orderedGroups().forEach(Group::aggregateNodeValues); - long medianDocuments = medianDocumentsPerGroup(); - for (Group group : orderedGroups()) { - boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); + groups().forEach(Group::aggregateNodeValues); + long medianDocuments = groups.medianDocumentsPerGroup(); + for (Group group : groups()) { + boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments); } } - private long medianDocumentsPerGroup() { - if (orderedGroups().isEmpty()) return 0; - var activeDocuments = orderedGroups().stream().map(Group::activeDocuments).collect(Collectors.toList()); - return (long)Quantiles.median().compute(activeDocuments); - } - /** * Update statistics after a round of issuing pings. * Note that this doesn't wait for pings to return, so it will typically accumulate data from @@ -258,30 +233,19 @@ public class SearchCluster implements NodeManager<Node> { */ @Override public void pingIterationCompleted() { - int numGroups = orderedGroups().size(); - if (numGroups == 1) { + if (groups.size() == 1) { pingIterationCompletedSingleGroup(); } else { pingIterationCompletedMultipleGroups(); } } - private boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) { - double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments; - if (medianDocuments > 0 && documentCoverage < minActivedocsPercentage) - return false; - return true; - } + /** * Calculate whether a subset of nodes in a group has enough coverage */ - public boolean isPartialGroupCoverageSufficient(List<Node> nodes) { - if (orderedGroups().size() == 1) - return true; - long activeDocuments = nodes.stream().mapToLong(Node::getActiveDocuments).sum(); - return isGroupCoverageSufficient(activeDocuments, medianDocumentsPerGroup()); - } + private void trackGroupCoverageChanges(Group group, boolean fullCoverage, long medianDocuments) { if ( ! hasInformationAboutAllNodes()) return; // Be silent until we know what we are talking about. diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java new file mode 100644 index 00000000000..b041ba28db9 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java @@ -0,0 +1,19 @@ +package com.yahoo.search.dispatch.searchcluster; + +import java.util.Collection; +import java.util.Set; + +/** + * Simple interface for groups and their nodes in the content cluster + * @author baldersheim + */ +public interface SearchGroups { + Group get(int id); + Set<Integer> keys(); + Collection<Group> groups(); + default boolean isEmpty() { + return size() == 0; + } + int size(); + boolean isPartialGroupCoverageSufficient(Collection<Node> nodes); +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java new file mode 100644 index 00000000000..f5ce987c64c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java @@ -0,0 +1,40 @@ +package com.yahoo.search.dispatch.searchcluster; + +import com.google.common.math.Quantiles; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class SearchGroupsImpl implements SearchGroups { + private final Map<Integer, Group> groups; + private final double minActivedocsPercentage; + public SearchGroupsImpl(Map<Integer, Group> groups, double minActivedocsPercentage) { + this.groups = Map.copyOf(groups); + this.minActivedocsPercentage = minActivedocsPercentage; + } + @Override public Group get(int id) { return groups.get(id); } + @Override public Set<Integer> keys() { return groups.keySet();} + @Override public Collection<Group> groups() { return groups.values(); } + @Override public int size() { return groups.size(); } + @Override + public boolean isPartialGroupCoverageSufficient(Collection<Node> nodes) { + if (size() == 1) + return true; + long activeDocuments = nodes.stream().mapToLong(Node::getActiveDocuments).sum(); + return isGroupCoverageSufficient(activeDocuments, medianDocumentsPerGroup()); + } + + public boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) { + if (medianDocuments <= 0) return true; + double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments; + return documentCoverage >= minActivedocsPercentage; + } + + public long medianDocumentsPerGroup() { + if (isEmpty()) return 0; + var activeDocuments = groups().stream().map(Group::activeDocuments).collect(Collectors.toList()); + return (long) Quantiles.median().compute(activeDocuments); + } +} |