// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; import com.yahoo.component.AbstractComponent; import com.yahoo.component.ComponentId; import com.yahoo.component.annotation.Inject; import com.yahoo.compress.Compressor; import com.yahoo.container.handler.VipStatus; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; import com.yahoo.search.dispatch.rpc.RpcConnectionPool; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.FieldType; import com.yahoo.search.query.profile.types.QueryProfileType; import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; import java.time.Duration; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; /** * A dispatcher communicates with search nodes to perform queries and fill hits. *

* This class allocates {@link SearchInvoker} and {@link FillInvoker} objects based * on query properties and general system status. The caller can then use the provided * invocation object to execute the search or fill. *

* This class is multithread safe. * * @author bratseth * @author ollvir */ public class Dispatcher extends AbstractComponent { public static final String DISPATCH = "dispatch"; private static final String TOP_K_PROBABILITY = "topKProbability"; private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3; /** If set will control computation of how many hits will be fetched from each partition.*/ public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY); private final InvokerFactoryFactory invokerFactories; private final DispatchConfig dispatchConfig; private final RpcConnectionPool rpcResourcePool; private final SearchCluster searchCluster; private final ClusterMonitor clusterMonitor; private volatile VolatileItems volatileItems; private static class VolatileItems { final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. Runnable cleanup = () -> { }; VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; } private void countDown() { if (inflight.decrementAndGet() == 0) cleanup.run(); } private class Ref implements AutoCloseable { boolean handedOff = false; { inflight.incrementAndGet(); } VolatileItems get() { return VolatileItems.this; } /** Hands off the reference to the given invoker, which will decrement the counter when closed. */ T register(T invoker) { invoker.teardown((__, ___) -> countDown()); handedOff = true; return invoker; } @Override public void close() { if ( ! handedOff) countDown(); } } } private static final QueryProfileType argumentType; static { argumentType = new QueryProfileType(DISPATCH); argumentType.setStrict(true); argumentType.setBuiltin(true); argumentType.addField(new FieldDescription(TOP_K_PROBABILITY, FieldType.doubleType)); argumentType.freeze(); } public static QueryProfileType getArgumentType() { return argumentType; } interface InvokerFactoryFactory { InvokerFactory create(RpcConnectionPool rpcConnectionPool, SearchGroups searchGroups, DispatchConfig dispatchConfig); } @Inject public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) { this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new); initialWarmup(dispatchConfig.warmuptime()); } Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) { this(dispatchConfig, rpcConnectionPool, new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), toNodes(clusterId.stringValue(), nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), invokerFactories); } Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories); this.clusterMonitor.start(); // Populate nodes to monitor before starting it. } Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, SearchCluster searchCluster, ClusterMonitor clusterMonitor, InvokerFactoryFactory invokerFactories) { this.dispatchConfig = dispatchConfig; this.rpcResourcePool = rpcConnectionPool; this.searchCluster = searchCluster; this.invokerFactories = invokerFactories; this.clusterMonitor = clusterMonitor; this.volatileItems = update(); searchCluster.addMonitoring(clusterMonitor); } /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */ Dispatcher(ClusterMonitor clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory); } /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */ private VolatileItems.Ref volatileItems() { return volatileItems.new Ref(); } /** * This is called whenever we have new config for backend nodes. * Normally, we'd want to handle partial failure of the component graph, by reinstating the old state; * however, in this case, such a failure would be local to this container, and we instead want to keep * the newest config, as that is what most accurately represents the actual backend. * * The flow of reconfiguration is: * 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes. * 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close. * 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up. * * Ownership details: * 1. The RPC resource pool is owned by the dispatcher, and is updated on node set changes; * it contains the means by which the container talks to backend nodes, so cleanup must be delayed until safe. * 2. The invocation factory is owned by the volatile snapshot, and is swapped atomically with it; * it is used by the dispatcher to create ephemeral invokers, which must complete before cleanup (above) can happen. * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it; * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot. * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time; * it is technically owned by the dispatcher, but in updated by the search cluster, when that is updated. * 5. The search cluster is owned by the dispatcher, and is updated on node set changes; * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher, * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible * of its state across reconfigurations: with new node config, it will immediately forget obsolete nodes, and set * coverage information as if the new nodes have zero documents, before even checking their status; this is fine * under the assumption that this is the common case, i.e., new nodes have no documents yet. */ void updateWithNewConfig(DispatchNodesConfig nodesConfig) { try (var items = volatileItems()) { // Mark a reference to the old snapshot, which we want to have cleaned up. items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0. // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do. Collection connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig); items.get().cleanup = () -> { for (AutoCloseable pool : connectionPoolsToClose) { try { pool.close(); } catch (Exception ignored) { } } }; // Update the nodes the search cluster keeps track of, and what nodes are monitored. searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), clusterMonitor, dispatchConfig.minActivedocsPercentage()); // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. this.volatileItems = update(); } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. } private VolatileItems update() { return new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig)); } private void initialWarmup(double warmupTime) { Thread warmup = new Thread(() -> warmup(warmupTime)); warmup.start(); try { while ( ! searchCluster.hasInformationAboutAllNodes()) { Thread.sleep(1); } warmup.join(); } catch (InterruptedException e) {} // Now we have information from all nodes and a ping iteration has completed. // Instead of waiting until next ping interval to update coverage and group state, // we should compute the state ourselves, so that when the dispatcher is ready the state // of its groups are also known. searchCluster.pingIterationCompleted(); } private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) { return switch (policy) { case ROUNDROBIN -> LoadBalancer.Policy.ROUNDROBIN; case BEST_OF_RANDOM_2 -> LoadBalancer.Policy.BEST_OF_RANDOM_2; case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS; case LATENCY_AMORTIZED_OVER_TIME -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME; }; } private static List toNodes(String clusterName, DispatchNodesConfig nodesConfig) { return nodesConfig.node().stream() .map(n -> new Node(clusterName, n.key(), n.host(), n.group())) .toList(); } /** * Will run important code in order to trigger JIT compilation and avoid cold start issues. * Currently warms up lz4 compression code. */ private static void warmup(double seconds) { new Compressor().warmup(seconds); } public boolean allGroupsHaveSize1() { return searchCluster.groupList().groups().stream().allMatch(g -> g.nodes().size() == 1); } @Override public void deconstruct() { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. clusterMonitor.shutdown(); if (rpcResourcePool != null) { rpcResourcePool.close(); } } public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) { try (var items = volatileItems()) { // Take a snapshot, and release it when we're done. return items.register(items.get().invokerFactory.createFillInvoker(searcher, result)); } } public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) { try (var items = volatileItems()) { // Take a snapshot, and release it when we're done. int maxHitsPerNode = dispatchConfig.maxHitsPerNode(); SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.get().invokerFactory, maxHitsPerNode) .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.get().loadBalancer, items.get().invokerFactory, maxHitsPerNode)); if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { query.setHits(0); query.setOffset(0); } return items.register(invoker); } } /** Builds an invoker based on searchpath */ private static Optional getSearchPathInvoker(Query query, VespaBackEndSearcher searcher, SearchGroups cluster, InvokerFactory invokerFactory, int maxHitsPerNode) { String searchPath = query.getModel().getSearchPath(); if (searchPath == null) return Optional.empty(); try { List nodes = SearchPath.selectNodes(searchPath, cluster); if (nodes.isEmpty()) return Optional.empty(); query.trace(false, 2, "Dispatching with search path ", searchPath); return invokerFactory.createSearchInvoker(searcher, query, nodes, true, maxHitsPerNode); } catch (InvalidSearchPathException e) { return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage()))); } } private static SearchInvoker getInternalInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster, LoadBalancer loadBalancer, InvokerFactory invokerFactory, int maxHitsPerNode) { Optional directNode = cluster.localCorpusDispatchTarget(); if (directNode.isPresent()) { Node node = directNode.get(); query.trace(false, 2, "Dispatching to ", node); return invokerFactory.createSearchInvoker(searcher, query, List.of(node), true, maxHitsPerNode) .orElseThrow(() -> new IllegalStateException("Could not dispatch directly to " + node)); } int covered = cluster.groupsWithSufficientCoverage(); int groups = cluster.groupList().size(); int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS); Set rejected = rejectGroupBlockingFeed(cluster.groupList().groups()); for (int i = 0; i < max; i++) { Optional groupInCluster = loadBalancer.takeGroup(rejected); if (groupInCluster.isEmpty()) break; // No groups available Group group = groupInCluster.get(); boolean acceptIncompleteCoverage = (i == max - 1); Optional invoker = invokerFactory.createSearchInvoker(searcher, query, group.nodes(), acceptIncompleteCoverage, maxHitsPerNode); if (invoker.isPresent()) { query.trace(false, 2, "Dispatching to group ", group.id(), " after retries = ", i); query.getModel().setSearchPath("/" + group.id()); invoker.get().teardown((success, time) -> loadBalancer.releaseGroup(group, success, time)); return invoker.get(); } else { loadBalancer.releaseGroup(group, false, RequestDuration.of(Duration.ZERO)); if (rejected == null) { rejected = new HashSet<>(); } rejected.add(group.id()); } } throw new IllegalStateException("No suitable groups to dispatch query. Rejected: " + rejected); } /** * We want to avoid groups blocking feed because their data may be out of date. * If there is a single group blocking feed, we want to reject it. * If multiple groups are blocking feed we should use them anyway as we may not have remaining * capacity otherwise. Same if there are no other groups. * * @return a modifiable set containing the single group to reject, or null otherwise */ private static Set rejectGroupBlockingFeed(Collection groups) { if (groups.size() == 1) return null; List groupsRejectingFeed = groups.stream().filter(Group::isBlockingWrites).toList(); if (groupsRejectingFeed.size() != 1) return null; Set rejected = new HashSet<>(); rejected.add(groupsRejectingFeed.get(0).id()); return rejected; } }