diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-09-28 17:30:34 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-09-28 17:30:34 +0200 |
commit | af1d10a394b55cc0d31575bf979cecbbc177c568 (patch) | |
tree | ab9fdd4cbc3b0d19a8ee26585682930414ca53b7 | |
parent | 5cc5d898b69c8f9f4e6a31583bd69e45bfd6f04e (diff) |
Extract metric query logic
2 files changed, 104 insertions, 68 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java index 351041693db..d359b86ae85 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java @@ -69,10 +69,12 @@ public class Autoscaler { if (unstable(clusterNodes)) return Optional.empty(); AllocatableClusterResources currentAllocation = new AllocatableClusterResources(clusterNodes, nodeRepository); - var startTimePerHost = metricStartTimes(clusterNodes); - Optional<Double> cpuLoad = averageLoad(Resource.cpu, clusterNodes, startTimePerHost); - Optional<Double> memoryLoad = averageLoad(Resource.memory, clusterNodes, startTimePerHost); - Optional<Double> diskLoad = averageLoad(Resource.disk, clusterNodes, startTimePerHost); + + MetricSnapshot metricSnapshot = new MetricSnapshot(clusterNodes, metricsDb, nodeRepository); + + Optional<Double> cpuLoad = metricSnapshot.averageLoad(Resource.cpu); + Optional<Double> memoryLoad = metricSnapshot.averageLoad(Resource.memory); + Optional<Double> diskLoad = metricSnapshot.averageLoad(Resource.disk); if (cpuLoad.isEmpty() || memoryLoad.isEmpty() || diskLoad.isEmpty()) return Optional.empty(); var target = ResourceTarget.idealLoad(cpuLoad.get(), memoryLoad.get(), diskLoad.get(), currentAllocation); @@ -98,70 +100,6 @@ public class Autoscaler { return Math.abs(r1 - r2) / (( r1 + r2) / 2) < threshold; } - /** - * Returns the instant of the oldest metric to consider for each node, or an empty map if metrics from the - * entire (max) window should be considered. - */ - private Map<String, Instant> metricStartTimes(List<Node> clusterNodes) { - ApplicationId application = clusterNodes.get(0).allocation().get().owner(); - List<NodeMetricsDb.AutoscalingEvent> deployments = metricsDb.getEvents(application); - Map<String, Instant> startTimePerHost = new HashMap<>(); - if (!deployments.isEmpty()) { - var deployment = deployments.get(deployments.size() - 1); - List<NodeMetricsDb.NodeMeasurements> generationMeasurements = - metricsDb.getMeasurements(deployment.time(), - Metric.generation, - clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); - for (Node node : clusterNodes) { - startTimePerHost.put(node.hostname(), nodeRepository.clock().instant()); // Discard all unless we can prove otherwise - var nodeGenerationMeasurements = - generationMeasurements.stream().filter(m -> m.hostname().equals(node.hostname())).findAny(); - if (nodeGenerationMeasurements.isPresent()) { - var firstMeasurementOfCorrectGeneration = - nodeGenerationMeasurements.get().asList().stream() - .filter(m -> m.value() >= deployment.generation()) - .findFirst(); - if (firstMeasurementOfCorrectGeneration.isPresent()) { - startTimePerHost.put(node.hostname(), firstMeasurementOfCorrectGeneration.get().at()); - } - } - } - } - return startTimePerHost; - } - - /** - * Returns the average load of this resource in the measurement window, - * or empty if we are not in a position to make decisions from these measurements at this time. - */ - private Optional<Double> averageLoad(Resource resource, - List<Node> clusterNodes, - Map<String, Instant> startTimePerHost) { - ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); - - List<NodeMetricsDb.NodeMeasurements> measurements = - metricsDb.getMeasurements(nodeRepository.clock().instant().minus(scalingWindow(clusterType)), - Metric.from(resource), - clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); - int beforeFilterStale = measurements.stream().mapToInt(m -> m.size()).sum(); - measurements = filterStale(measurements, startTimePerHost); - - // Require a total number of measurements scaling with the number of nodes, - // but don't require that we have at least that many from every node - int measurementCount = measurements.stream().mapToInt(m -> m.size()).sum(); - if (measurementCount / clusterNodes.size() < minimumMeasurementsPerNode(clusterType)) return Optional.empty(); - if (measurements.size() != clusterNodes.size()) return Optional.empty(); - - double measurementSum = measurements.stream().flatMap(m -> m.asList().stream()).mapToDouble(m -> m.value()).sum(); - return Optional.of(measurementSum / measurementCount); - } - - private List<NodeMetricsDb.NodeMeasurements> filterStale(List<NodeMetricsDb.NodeMeasurements> measurements, - Map<String, Instant> startTimePerHost) { - if (startTimePerHost.isEmpty()) return measurements; // Map is either empty or complete - return measurements.stream().map(m -> m.copyAfter(startTimePerHost.get(m.hostname()))).collect(Collectors.toList()); - } - /** The duration of the window we need to consider to make a scaling decision. See also minimumMeasurementsPerNode */ static Duration scalingWindow(ClusterSpec.Type clusterType) { if (clusterType.isContent()) return Duration.ofHours(12); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java new file mode 100644 index 00000000000..46ba4351082 --- /dev/null +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java @@ -0,0 +1,98 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.provision.autoscale; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.ClusterSpec; +import com.yahoo.vespa.hosted.provision.Node; +import com.yahoo.vespa.hosted.provision.NodeRepository; + +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A snapshot which implements the questions we want to ask about metrics for one cluster at one point in time. + * + * @author bratseth + */ +public class MetricSnapshot { + + private final List<Node> clusterNodes; + private final NodeMetricsDb db; + private final NodeRepository nodeRepository; + private final Map<String, Instant> startTimePerHost; + + public MetricSnapshot(List<Node> clusterNodes, NodeMetricsDb db, NodeRepository nodeRepository) { + this.clusterNodes = clusterNodes; + this.db = db; + this.nodeRepository = nodeRepository; + this.startTimePerHost = metricStartTimes(clusterNodes, db, nodeRepository); + } + + /** + * Returns the instant of the oldest metric to consider for each node, or an empty map if metrics from the + * entire (max) window should be considered. + */ + private static Map<String, Instant> metricStartTimes(List<Node> clusterNodes, + NodeMetricsDb db, + NodeRepository nodeRepository) { + ApplicationId application = clusterNodes.get(0).allocation().get().owner(); + List<NodeMetricsDb.AutoscalingEvent> deployments = db.getEvents(application); + Map<String, Instant> startTimePerHost = new HashMap<>(); + if (!deployments.isEmpty()) { + var deployment = deployments.get(deployments.size() - 1); + List<NodeMetricsDb.NodeMeasurements> generationMeasurements = + db.getMeasurements(deployment.time(), + Metric.generation, + clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); + for (Node node : clusterNodes) { + startTimePerHost.put(node.hostname(), nodeRepository.clock().instant()); // Discard all unless we can prove otherwise + var nodeGenerationMeasurements = + generationMeasurements.stream().filter(m -> m.hostname().equals(node.hostname())).findAny(); + if (nodeGenerationMeasurements.isPresent()) { + var firstMeasurementOfCorrectGeneration = + nodeGenerationMeasurements.get().asList().stream() + .filter(m -> m.value() >= deployment.generation()) + .findFirst(); + if (firstMeasurementOfCorrectGeneration.isPresent()) { + startTimePerHost.put(node.hostname(), firstMeasurementOfCorrectGeneration.get().at()); + } + } + } + } + return startTimePerHost; + } + + /** + * Returns the average load of this resource in the measurement window, + * or empty if we are not in a position to make decisions from these measurements at this time. + */ + public Optional<Double> averageLoad(Resource resource) { + ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); + + List<NodeMetricsDb.NodeMeasurements> measurements = + db.getMeasurements(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterType)), + Metric.from(resource), + clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); + measurements = filterStale(measurements, startTimePerHost); + + // Require a total number of measurements scaling with the number of nodes, + // but don't require that we have at least that many from every node + int measurementCount = measurements.stream().mapToInt(m -> m.size()).sum(); + if (measurementCount / clusterNodes.size() < Autoscaler.minimumMeasurementsPerNode(clusterType)) return Optional.empty(); + if (measurements.size() != clusterNodes.size()) return Optional.empty(); + + double measurementSum = measurements.stream().flatMap(m -> m.asList().stream()).mapToDouble(m -> m.value()).sum(); + return Optional.of(measurementSum / measurementCount); + } + + private List<NodeMetricsDb.NodeMeasurements> filterStale(List<NodeMetricsDb.NodeMeasurements> measurements, + Map<String, Instant> startTimePerHost) { + if (startTimePerHost.isEmpty()) return measurements; // Map is either empty or complete + return measurements.stream().map(m -> m.copyAfter(startTimePerHost.get(m.hostname()))).collect(Collectors.toList()); + } + +} |