diff options
Diffstat (limited to 'container-search/src/main/java/com')
9 files changed, 127 insertions, 98 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java index 59674d25402..dd01d895963 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java @@ -26,25 +26,25 @@ public abstract class BaseNodeMonitor<T> { /** The object representing the monitored node */ protected T node; - protected boolean isWorking=true; + protected boolean isWorking = true; /** Whether this node is quarantined for unstability */ - protected boolean isQuarantined=false; + protected boolean isQuarantined = false; /** The last time this node failed, in ms */ - protected long failedAt=0; + protected long failedAt = 0; /** The last time this node responded (failed or succeeded), in ms */ - protected long respondedAt=0; + protected long respondedAt = 0; /** The last time this node responded successfully */ - protected long succeededAt=0; + protected long succeededAt = 0; /** The configuration of this monitor */ protected MonitorConfiguration configuration; /** Is the node we monitor part of an internal Vespa cluster or not */ - private boolean internal=false; + private boolean internal; public BaseNodeMonitor(boolean internal) { this.internal=internal; @@ -54,10 +54,12 @@ public abstract class BaseNodeMonitor<T> { /** * Returns whether this node is currently in a state suitable - * for receiving traffic. As far as we know, that is + * for receiving traffic (default true) */ public boolean isWorking() { return isWorking; } + /** @deprecated Not used */ + @Deprecated // TODO: Remove on Vespa 8 public boolean isQuarantined() { return isQuarantined; } /** diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index ac0c8375f04..22c7f59872c 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -22,9 +22,9 @@ import java.util.logging.Logger; */ public class ClusterMonitor<T> { - private MonitorConfiguration configuration = new MonitorConfiguration(); + private final MonitorConfiguration configuration = new MonitorConfiguration(); - private static Logger log=Logger.getLogger(ClusterMonitor.class.getName()); + private static Logger log = Logger.getLogger(ClusterMonitor.class.getName()); private NodeManager<T> nodeManager; @@ -33,7 +33,7 @@ public class ClusterMonitor<T> { private volatile boolean shutdown = false; /** A map from Node to corresponding MonitoredNode */ - private final Map<T, BaseNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); + private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); public ClusterMonitor(NodeManager<T> manager) { nodeManager = manager; @@ -56,8 +56,7 @@ public class ClusterMonitor<T> { * @param internal whether or not this node is internal to this cluster */ public void add(T node, boolean internal) { - BaseNodeMonitor<T> monitor = new TrafficNodeMonitor<>(node, configuration, internal); - nodeMonitors.put(node, monitor); + nodeMonitors.put(node, new TrafficNodeMonitor<>(node, configuration, internal)); } /** @@ -69,22 +68,20 @@ public class ClusterMonitor<T> { /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { - BaseNodeMonitor<T> monitor = nodeMonitors.get(node); - boolean wasWorking = monitor.isWorking(); + TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); + Boolean wasWorking = monitor.isKnownWorking(); monitor.failed(error); - if (wasWorking && !monitor.isWorking()) { + if (wasWorking != monitor.isKnownWorking()) nodeManager.failed(node); - } } /** Called when a node responded */ public synchronized void responded(T node) { - BaseNodeMonitor<T> monitor = nodeMonitors.get(node); - boolean wasFailing =! monitor.isWorking(); + TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); + Boolean wasWorking = monitor.isKnownWorking(); monitor.responded(); - if (wasFailing && monitor.isWorking()) { + if (wasWorking != monitor.isKnownWorking()) nodeManager.working(monitor.getNode()); - } } /** diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java index ea881ad8b48..830014bca46 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java @@ -36,6 +36,7 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { @Override public void failed(ErrorMessage error) { respondedAt = now(); + atStartUp = false; if (error.getCode() == Error.BACKEND_COMMUNICATION_ERROR.code) { setWorking(false, "Connection failure: " + error.toString()); @@ -60,9 +61,14 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { setWorking(true,"Responds correctly"); } + /** + * Returns whether this node is currently is a state suitable for receiving traffic, or null if not known + */ + public Boolean isKnownWorking() { return atStartUp ? null : isWorking; } + /** Thread-safely changes the state of this node if required */ - protected synchronized void setWorking(boolean working,String explanation) { - if (this.isWorking==working) return; // Old news + protected synchronized void setWorking(boolean working, String explanation) { + if (this.isWorking == working) return; // Old news if (explanation==null) { explanation=""; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 58f73ea52cc..af34fc3e106 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -139,7 +139,7 @@ public class Dispatcher extends AbstractComponent { return invoker; } - // build invoker based on searchpath + /** Builds an invoker based on searchpath */ private Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher) { String searchPath = query.getModel().getSearchPath(); if (searchPath == null) return Optional.empty(); @@ -156,7 +156,7 @@ public class Dispatcher extends AbstractComponent { } private Optional<SearchInvoker> getInternalInvoker(Query query, VespaBackEndSearcher searcher) { - Optional<Node> directNode = searchCluster.directDispatchTarget(); + Optional<Node> directNode = searchCluster.localCorpusDispatchTarget(); if (directNode.isPresent()) { Node node = directNode.get(); query.trace(false, 2, "Dispatching directly to ", node); @@ -202,4 +202,5 @@ public class Dispatcher extends AbstractComponent { metric.add(INTERNAL_METRIC, 1, metricContext); } } + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index 31af74a39b2..78c50254a84 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -48,21 +48,24 @@ public abstract class InvokerFactory { * @return Optional containing the SearchInvoker or <i>empty</i> if some node in the * list is invalid and the remaining coverage is not sufficient */ - public Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, List<Node> nodes, - boolean acceptIncompleteCoverage) { + public Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher, + Query query, + OptionalInt groupId, + List<Node> nodes, + boolean acceptIncompleteCoverage) { List<SearchInvoker> invokers = new ArrayList<>(nodes.size()); Set<Integer> failed = null; for (Node node : nodes) { boolean nodeAdded = false; - if (node.isWorking()) { + if (node.isWorking() != Boolean.FALSE) { Optional<SearchInvoker> invoker = createNodeSearchInvoker(searcher, query, node); - if(invoker.isPresent()) { + if (invoker.isPresent()) { invokers.add(invoker.get()); nodeAdded = true; } } - if (!nodeAdded) { + if ( ! nodeAdded) { if (failed == null) { failed = new HashSet<>(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index bafc72b9b43..0e4e87b9a6a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -54,7 +54,7 @@ public class Group { public int workingNodes() { int nodesUp = 0; for (Node node : nodes) { - if (node.isWorking()) { + if (node.isWorking() == Boolean.TRUE) { nodesUp++; } } @@ -64,7 +64,7 @@ public class Group { void aggregateActiveDocuments() { long activeDocumentsInGroup = 0; for (Node node : nodes) { - if (node.isWorking()) { + if (node.isWorking() == Boolean.TRUE) { activeDocumentsInGroup += node.getActiveDocuments(); } } @@ -77,6 +77,11 @@ public class Group { return this.activeDocuments.get(); } + public boolean isFullCoverageStatusChanged(boolean hasFullCoverageNow) { + boolean previousState = hasFullCoverage.getAndSet(hasFullCoverageNow); + return previousState != hasFullCoverageNow; + } + @Override public String toString() { return "search group " + id; } @@ -90,8 +95,4 @@ public class Group { return ((Group) other).id == this.id; } - public boolean isFullCoverageStatusChanged(boolean hasFullCoverageNow) { - boolean previousState = hasFullCoverage.getAndSet(hasFullCoverageNow); - return previousState != hasFullCoverageNow; - } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java index a71ce0354f9..b47f2fefa5b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -19,6 +19,7 @@ public class Node { private final int fs4port; final int group; + private final AtomicBoolean statusIsKnown = new AtomicBoolean(false); private final AtomicBoolean working = new AtomicBoolean(true); private final AtomicLong activeDocuments = new AtomicLong(0); @@ -46,11 +47,14 @@ public class Node { public int group() { return group; } public void setWorking(boolean working) { + this.statusIsKnown.lazySet(true); this.working.lazySet(working); } - /** Returns whether this node is currently responding to requests */ - public boolean isWorking() { return working.get(); } + /** Returns whether this node is currently responding to requests, or null if status is not known */ + public Boolean isWorking() { + return statusIsKnown.get() ? working.get() : null; + } /** Updates the active documents on this node */ public void setActiveDocuments(long activeDocuments) { @@ -77,4 +81,5 @@ public class Node { @Override public String toString() { return "search node " + hostname + ":" + fs4port + " in group " + group; } + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java index c9f722ef79b..285a1fcd57e 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java @@ -6,5 +6,7 @@ import com.yahoo.search.cluster.ClusterMonitor; import java.util.concurrent.Callable; public interface PingFactory { + Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor); + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index c091e211aca..c17212b2481 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -55,7 +55,7 @@ public class SearchCluster implements NodeManager<Node> { * if it only queries this cluster when the local node cannot be used, to avoid unnecessary * cross-node network traffic. */ - private final Optional<Node> directDispatchTarget; + private final Optional<Node> localCorpusDispatchTarget; public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) { this.clusterId = clusterId; @@ -82,8 +82,11 @@ public class SearchCluster implements NodeManager<Node> { nodesByHostBuilder.put(node.hostname(), node); this.nodesByHost = nodesByHostBuilder.build(); - this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize, - nodesByHost, groups); + this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), + size, + containerClusterSize, + nodesByHost, + groups); this.clusterMonitor = new ClusterMonitor<>(this); } @@ -92,21 +95,18 @@ public class SearchCluster implements NodeManager<Node> { this.pingFactory = pingFactory; for (var group : orderedGroups) { - for (var node : group.nodes()) { - // cluster monitor will only call working() when the - // node transitions from down to up, so we need to - // register the initial (working) state here: - working(node); + for (var node : group.nodes()) clusterMonitor.add(node, true); - } } } - private static Optional<Node> findDirectDispatchTarget(String selfHostname, - int searchClusterSize, - int containerClusterSize, - ImmutableMultimap<String, Node> nodesByHost, - ImmutableMap<Integer, Group> groups) { + ClusterMonitor<Node> clusterMonitor() { return clusterMonitor; } + + private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname, + int searchClusterSize, + int containerClusterSize, + ImmutableMultimap<String, Node> nodesByHost, + ImmutableMap<Integer, Group> groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. // It has all the data <==> No other nodes in the search cluster have the same group id as this node. // That local search node responds. @@ -186,70 +186,90 @@ public class SearchCluster implements NodeManager<Node> { } /** - * Returns the recipient we should dispatch queries directly to (bypassing fdispatch), + * Returns the single, local node we should dispatch queries directly to, * or empty if we should not dispatch directly. */ - public Optional<Node> directDispatchTarget() { - if ( directDispatchTarget.isEmpty()) return Optional.empty(); + public Optional<Node> localCorpusDispatchTarget() { + if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage - Group localSearchGroup = groups.get(directDispatchTarget.get().group()); + Group localSearchGroup = groups.get(localCorpusDispatchTarget.get().group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); - // Only use direct dispatch if the local search node is up - if ( ! directDispatchTarget.get().isWorking()) return Optional.empty(); + // Only use direct dispatch if the local search node is not down + if ( localCorpusDispatchTarget.get().isWorking() == Boolean.FALSE) return Optional.empty(); - return directDispatchTarget; + return localCorpusDispatchTarget; } - /** Used by the cluster monitor to manage node status */ + /** Called by the cluster monitor when node state changes to working */ @Override public void working(Node node) { node.setWorking(true); - - if (usesDirectDispatchTo(node)) - vipStatus.addToRotation(clusterId); + updateVipStatusOnNodeChange(node, true); } - /** Used by the cluster monitor to manage node status */ + /** Called by the cluster monitor when node state changes to failed */ @Override public void failed(Node node) { node.setWorking(false); - - // Take ourselves out if we usually dispatch only to our own host - if (usesDirectDispatchTo(node)) - vipStatus.removeFromRotation(clusterId); + updateVipStatusOnNodeChange(node, true); } private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { - // update VIP status if we direct dispatch to this group and coverage status changed - boolean isInRotation = vipStatus.isInRotation(); - boolean hasChanged = sufficientCoverage != group.hasSufficientCoverage(); - boolean isDirectDispatchGroupAndChange = usesDirectDispatchTo(group) && hasChanged; + if (sufficientCoverage == group.hasSufficientCoverage()) return; // no change + group.setHasSufficientCoverage(sufficientCoverage); - if ((!isInRotation || isDirectDispatchGroupAndChange) && sufficientCoverage) { - // We will set this cluster in rotation if - // - not already in rotation and one group has sufficient coverage. + updateVipStatusOnCoverageChange(group, sufficientCoverage); + } + + private void updateVipStatusOnNodeChange(Node node, boolean nodeIsWorking) { + if (localCorpusDispatchTarget.isEmpty()) { // consider entire cluster + if (hasInformationAboutAllNodes()) + setInRotationOnlyIf(hasWorkingNodes()); + } + else if (usesLocalCorpusIn(node)) { // follow the status of this node + setInRotationOnlyIf(nodeIsWorking); + } + } + + private void updateVipStatusOnCoverageChange(Group group, boolean sufficientCoverage) { + if ( localCorpusDispatchTarget.isEmpty()) { // consider entire cluster + // VIP status does not depend on coverage + } + else if (usesLocalCorpusIn(group)) { // follow the status of this group + setInRotationOnlyIf(sufficientCoverage); + } + } + + private void setInRotationOnlyIf(boolean inRotation) { + if (inRotation) vipStatus.addToRotation(clusterId); - } else if (isDirectDispatchGroupAndChange) { - // We will take it out of rotation if the group is mandatory (direct dispatch to this group) + else vipStatus.removeFromRotation(clusterId); - } } - private boolean usesDirectDispatchTo(Node node) { - return directDispatchTarget.isPresent() && directDispatchTarget.get().equals(node); + private boolean hasInformationAboutAllNodes() { + return nodesByHost.values().stream().allMatch(node -> node.isWorking() != null); + } + + private boolean hasWorkingNodes() { + return nodesByHost.values().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE ); + } + + private boolean usesLocalCorpusIn(Node node) { + return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().equals(node); } - private boolean usesDirectDispatchTo(Group group) { - return directDispatchTarget.isPresent() && directDispatchTarget.get().group() == group.id(); + private boolean usesLocalCorpusIn(Group group) { + return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id(); } /** Used by the cluster monitor to manage node status */ @Override public void ping(Node node, Executor executor) { - if (pingFactory == null) // not initialized yet - return; + if (pingFactory == null) return; // not initialized yet + FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor)); executor.execute(futurePong); Pong pong = getPong(futurePong, node); @@ -272,9 +292,10 @@ public class SearchCluster implements NodeManager<Node> { // group will always be marked sufficient for use. updateSufficientCoverage(group, true); boolean fullCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), group.getActiveDocuments(), - group.getActiveDocuments()); + group.getActiveDocuments()); trackGroupCoverageChanges(0, group, fullCoverage, group.getActiveDocuments()); } + private void pingIterationCompletedMultipleGroups() { int numGroups = orderedGroups.size(); // Update active documents per group and use it to decide if the group should be active @@ -299,13 +320,7 @@ public class SearchCluster implements NodeManager<Node> { trackGroupCoverageChanges(i, group, sufficientCoverage, averageDocumentsInOtherGroups); } } - private boolean areAllNodesDownInAllgroups() { - for(int i = 0; i < groups.size(); i++) { - Group group = orderedGroups.get(i); - if (group.workingNodes() > 0) return false; - } - return true; - } + /** * Update statistics after a round of issuing pings. * Note that this doesn't wait for pings to return, so it will typically accumulate data from @@ -319,9 +334,6 @@ public class SearchCluster implements NodeManager<Node> { } else { pingIterationCompletedMultipleGroups(); } - if ( areAllNodesDownInAllgroups() ) { - vipStatus.removeFromRotation(clusterId); - } } private boolean isGroupCoverageSufficient(int workingNodes, int nodesInGroup, long activeDocuments, long averageDocumentsInOtherGroups) { @@ -397,11 +409,11 @@ public class SearchCluster implements NodeManager<Node> { if (changed) { int requiredNodes = groupSize() - dispatchConfig.maxNodesDownPerGroup(); if (fullCoverage) { - log.info(() -> String.format("Group %d is now good again (%d/%d active docs, coverage %d/%d)", index, - group.getActiveDocuments(), averageDocuments, group.workingNodes(), groupSize())); + log.info(() -> String.format("Group %d is now good again (%d/%d active docs, coverage %d/%d)", + index, group.getActiveDocuments(), averageDocuments, group.workingNodes(), groupSize())); } else { - log.warning(() -> String.format("Coverage of group %d is only %d/%d (requires %d)", index, group.workingNodes(), groupSize(), - requiredNodes)); + log.warning(() -> String.format("Coverage of group %d is only %d/%d (requires %d)", + index, group.workingNodes(), groupSize(), requiredNodes)); } } } |