// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.yahoo.container.handler.VipStatus; import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; /** * A model of a search cluster we might want to dispatch queries to. * * @author bratseth */ public class SearchCluster implements NodeManager { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); private final DispatchConfig dispatchConfig; private final int size; private final String clusterId; private final ImmutableMap groups; private final ImmutableMultimap nodesByHost; private final ImmutableList orderedGroups; private final ClusterMonitor clusterMonitor; private final VipStatus vipStatus; private PingFactory pingFactory; /** * A search node on this local machine having the entire corpus, which we therefore * should prefer to dispatch directly to, or empty if there is no such local search node. * If there is one, we also maintain the VIP status of this container based on the availability * of the corpus on this local node (up + has coverage), such that this node is taken out of rotation * if it only queries this cluster when the local node cannot be used, to avoid unnecessary * cross-node network traffic. */ private final Optional localCorpusDispatchTarget; public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) { this.clusterId = clusterId; this.dispatchConfig = dispatchConfig; this.vipStatus = vipStatus; List nodes = toNodes(dispatchConfig); this.size = nodes.size(); // Create groups ImmutableMap.Builder groupsBuilder = new ImmutableMap.Builder<>(); for (Map.Entry> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { Group g = new Group(group.getKey(), group.getValue()); groupsBuilder.put(group.getKey(), g); } this.groups = groupsBuilder.build(); LinkedHashMap groupIntroductionOrder = new LinkedHashMap<>(); nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group))); this.orderedGroups = ImmutableList.builder().addAll(groupIntroductionOrder.values()).build(); // Index nodes by host ImmutableMultimap.Builder nodesByHostBuilder = new ImmutableMultimap.Builder<>(); for (Node node : nodes) nodesByHostBuilder.put(node.hostname(), node); this.nodesByHost = nodesByHostBuilder.build(); this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), size, containerClusterSize, nodesByHost, groups); this.clusterMonitor = new ClusterMonitor<>(this); } public void startClusterMonitoring(PingFactory pingFactory) { this.pingFactory = pingFactory; for (var group : orderedGroups) { for (var node : group.nodes()) { // cluster monitor will only call working() when the // node transitions from down to up, so we need to // register the initial (working) state here: working(node); clusterMonitor.add(node, true); } } } private static Optional findLocalCorpusDispatchTarget(String selfHostname, int searchClusterSize, int containerClusterSize, ImmutableMultimap nodesByHost, ImmutableMap groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. // It has all the data <==> No other nodes in the search cluster have the same group id as this node. // That local search node responds. // The search cluster to be searched has at least as many nodes as the container cluster we're running in. ImmutableCollection localSearchNodes = nodesByHost.get(selfHostname); // Only use direct dispatch if we have exactly 1 search node on the same machine: if (localSearchNodes.size() != 1) return Optional.empty(); Node localSearchNode = localSearchNodes.iterator().next(); Group localSearchGroup = groups.get(localSearchNode.group()); // Only use direct dispatch if the local search node has the entire corpus if (localSearchGroup.nodes().size() != 1) return Optional.empty(); // Only use direct dispatch if this container cluster has at least as many nodes as the search cluster // to avoid load skew/preserve fanout in the case where a subset of the search nodes are also containers. // This disregards the case where the search and container clusters are partially overlapping. // Such configurations produce skewed load in any case. if (containerClusterSize < searchClusterSize) return Optional.empty(); return Optional.of(localSearchNode); } private static ImmutableList toNodes(DispatchConfig dispatchConfig) { ImmutableList.Builder nodesBuilder = new ImmutableList.Builder<>(); Predicate filter; if (dispatchConfig.useLocalNode()) { final String hostName = HostName.getLocalhost(); filter = node -> node.host().equals(hostName); } else { filter = node -> true; } for (DispatchConfig.Node node : dispatchConfig.node()) { if (filter.test(node)) { nodesBuilder.add(new Node(node.key(), node.host(), node.fs4port(), node.group())); } } return nodesBuilder.build(); } public DispatchConfig dispatchConfig() { return dispatchConfig; } /** Returns the number of nodes in this cluster (across all groups) */ public int size() { return size; } /** Returns the groups of this cluster as an immutable map indexed by group id */ public ImmutableMap groups() { return groups; } /** Returns the groups of this cluster as an immutable list in introduction order */ public ImmutableList orderedGroups() { return orderedGroups; } /** Returns the n'th (zero-indexed) group in the cluster if possible */ public Optional group(int n) { if (orderedGroups.size() > n) { return Optional.of(orderedGroups.get(n)); } else { return Optional.empty(); } } /** Returns the number of nodes per group - size()/groups.size() */ public int groupSize() { if (groups.size() == 0) return size(); return size() / groups.size(); } public int groupsWithSufficientCoverage() { int covered = 0; for (Group g : orderedGroups) { if (g.hasSufficientCoverage()) { covered++; } } return covered; } /** * Returns the single, local node we should dispatch queries directly to, * or empty if we should not dispatch directly. */ public Optional localCorpusDispatchTarget() { if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage Group localSearchGroup = groups.get(localCorpusDispatchTarget.get().group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is up if ( ! localCorpusDispatchTarget.get().isWorking()) return Optional.empty(); return localCorpusDispatchTarget; } /** Called by the cluster monitor whenever we get information (positive or negative) about a node */ @Override public void statusIsKnown(Node node) { node.setStatusIsKnown(); } /** Called by the cluster monitor when node state changes to working */ @Override public void working(Node node) { node.setWorking(true); updateVipStatusOnNodeChange(node, true); } /** Called by the cluster monitor when node state changes to failed */ @Override public void failed(Node node) { node.setWorking(false); updateVipStatusOnNodeChange(node, true); } private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { if (sufficientCoverage == group.hasSufficientCoverage()) return; // no change group.setHasSufficientCoverage(sufficientCoverage); updateVipStatusOnCoverageChange(group, sufficientCoverage); } private void updateVipStatusOnNodeChange(Node node, boolean working) { if (usesLocalCorpusIn(node)) { // follow the status of the local corpus if (working) vipStatus.addToRotation(clusterId); else vipStatus.removeFromRotation(clusterId); } else { if ( ! hasInformationAboutAllNodes()) return; if (hasWorkingNodes()) vipStatus.addToRotation(clusterId); else vipStatus.removeFromRotation(clusterId); } } private void updateVipStatusOnCoverageChange(Group group, boolean sufficientCoverage) { boolean isInRotation = vipStatus.isInRotation(); if (usesLocalCorpusIn(group)) { // follow the status of the local corpus if (sufficientCoverage) vipStatus.addToRotation(clusterId); else vipStatus.removeFromRotation(clusterId); } else { if ( ! isInRotation && sufficientCoverage) vipStatus.addToRotation(clusterId); } } private boolean hasInformationAboutAllNodes() { return nodesByHost.values().stream().allMatch(Node::getStatusIsKnown); } private boolean hasWorkingNodes() { return nodesByHost.values().stream().anyMatch(Node::isWorking); } private boolean usesLocalCorpusIn(Node node) { return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().equals(node); } private boolean usesLocalCorpusIn(Group group) { return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id(); } /** Used by the cluster monitor to manage node status */ @Override public void ping(Node node, Executor executor) { if (pingFactory == null) return; // not initialized yet FutureTask futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor)); executor.execute(futurePong); Pong pong = getPong(futurePong, node); futurePong.cancel(true); if (pong.badResponse()) { clusterMonitor.failed(node, pong.getError(0)); } else { if (pong.activeDocuments().isPresent()) { node.setActiveDocuments(pong.activeDocuments().get()); } clusterMonitor.responded(node); } } private void pingIterationCompletedSingleGroup() { Group group = groups.values().iterator().next(); group.aggregateActiveDocuments(); // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. updateSufficientCoverage(group, true); boolean fullCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), group.getActiveDocuments(), group.getActiveDocuments()); trackGroupCoverageChanges(0, group, fullCoverage, group.getActiveDocuments()); } private void pingIterationCompletedMultipleGroups() { int numGroups = orderedGroups.size(); // Update active documents per group and use it to decide if the group should be active long[] activeDocumentsInGroup = new long[numGroups]; long sumOfActiveDocuments = 0; for(int i = 0; i < numGroups; i++) { Group group = orderedGroups.get(i); group.aggregateActiveDocuments(); activeDocumentsInGroup[i] = group.getActiveDocuments(); sumOfActiveDocuments += activeDocumentsInGroup[i]; } boolean anyGroupsSufficientCoverage = false; for (int i = 0; i < numGroups; i++) { Group group = orderedGroups.get(i); long activeDocuments = activeDocumentsInGroup[i]; long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); boolean sufficientCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), activeDocuments, averageDocumentsInOtherGroups); anyGroupsSufficientCoverage = anyGroupsSufficientCoverage || sufficientCoverage; updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(i, group, sufficientCoverage, averageDocumentsInOtherGroups); } } /** * Update statistics after a round of issuing pings. * Note that this doesn't wait for pings to return, so it will typically accumulate data from * last rounds pinging, or potentially (although unlikely) some combination of new and old data. */ @Override public void pingIterationCompleted() { int numGroups = orderedGroups.size(); if (numGroups == 1) { pingIterationCompletedSingleGroup(); } else { pingIterationCompletedMultipleGroups(); } } private boolean isGroupCoverageSufficient(int workingNodes, int nodesInGroup, long activeDocuments, long averageDocumentsInOtherGroups) { boolean sufficientCoverage = true; if (averageDocumentsInOtherGroups > 0) { double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups; sufficientCoverage = coverage >= dispatchConfig.minActivedocsPercentage(); } if (sufficientCoverage) { sufficientCoverage = isGroupNodeCoverageSufficient(workingNodes, nodesInGroup); } return sufficientCoverage; } private boolean isGroupNodeCoverageSufficient(int workingNodes, int nodesInGroup) { int nodesAllowedDown = dispatchConfig.maxNodesDownPerGroup() + (int) (((double) nodesInGroup * (100.0 - dispatchConfig.minGroupCoverage())) / 100.0); return workingNodes + nodesAllowedDown >= nodesInGroup; } private Pong getPong(FutureTask futurePong, Node node) { try { return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.log(Level.WARNING, "Exception pinging " + node, e); return new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); } catch (ExecutionException e) { log.log(Level.WARNING, "Exception pinging " + node, e); return new Pong(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); } catch (TimeoutException e) { return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); } } /** * Calculate whether a subset of nodes in a group has enough coverage */ public boolean isPartialGroupCoverageSufficient(OptionalInt knownGroupId, List nodes) { if (orderedGroups.size() == 1) { boolean sufficient = nodes.size() >= groupSize() - dispatchConfig.maxNodesDownPerGroup(); return sufficient; } if (knownGroupId.isEmpty()) { return false; } int groupId = knownGroupId.getAsInt(); Group group = groups.get(groupId); if (group == null) { return false; } int nodesInGroup = group.nodes().size(); long sumOfActiveDocuments = 0; int otherGroups = 0; for (Group g : orderedGroups) { if (g.id() != groupId) { sumOfActiveDocuments += g.getActiveDocuments(); otherGroups++; } } long activeDocuments = 0; for (Node n : nodes) { activeDocuments += n.getActiveDocuments(); } long averageDocumentsInOtherGroups = sumOfActiveDocuments / otherGroups; boolean sufficient = isGroupCoverageSufficient(nodes.size(), nodesInGroup, activeDocuments, averageDocumentsInOtherGroups); return sufficient; } private void trackGroupCoverageChanges(int index, Group group, boolean fullCoverage, long averageDocuments) { boolean changed = group.isFullCoverageStatusChanged(fullCoverage); if (changed) { int requiredNodes = groupSize() - dispatchConfig.maxNodesDownPerGroup(); if (fullCoverage) { log.info(() -> String.format("Group %d is now good again (%d/%d active docs, coverage %d/%d)", index, group.getActiveDocuments(), averageDocuments, group.workingNodes(), groupSize())); } else { log.warning(() -> String.format("Coverage of group %d is only %d/%d (requires %d)", index, group.workingNodes(), groupSize(), requiredNodes)); } } } }