summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-09-28 17:30:34 +0200
committerJon Bratseth <bratseth@gmail.com>2020-09-28 17:30:34 +0200
commitaf1d10a394b55cc0d31575bf979cecbbc177c568 (patch)
treeab9fdd4cbc3b0d19a8ee26585682930414ca53b7
parent5cc5d898b69c8f9f4e6a31583bd69e45bfd6f04e (diff)
Extract metric query logic
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java74
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java98
2 files changed, 104 insertions, 68 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java
index 351041693db..d359b86ae85 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java
@@ -69,10 +69,12 @@ public class Autoscaler {
if (unstable(clusterNodes)) return Optional.empty();
AllocatableClusterResources currentAllocation = new AllocatableClusterResources(clusterNodes, nodeRepository);
- var startTimePerHost = metricStartTimes(clusterNodes);
- Optional<Double> cpuLoad = averageLoad(Resource.cpu, clusterNodes, startTimePerHost);
- Optional<Double> memoryLoad = averageLoad(Resource.memory, clusterNodes, startTimePerHost);
- Optional<Double> diskLoad = averageLoad(Resource.disk, clusterNodes, startTimePerHost);
+
+ MetricSnapshot metricSnapshot = new MetricSnapshot(clusterNodes, metricsDb, nodeRepository);
+
+ Optional<Double> cpuLoad = metricSnapshot.averageLoad(Resource.cpu);
+ Optional<Double> memoryLoad = metricSnapshot.averageLoad(Resource.memory);
+ Optional<Double> diskLoad = metricSnapshot.averageLoad(Resource.disk);
if (cpuLoad.isEmpty() || memoryLoad.isEmpty() || diskLoad.isEmpty()) return Optional.empty();
var target = ResourceTarget.idealLoad(cpuLoad.get(), memoryLoad.get(), diskLoad.get(), currentAllocation);
@@ -98,70 +100,6 @@ public class Autoscaler {
return Math.abs(r1 - r2) / (( r1 + r2) / 2) < threshold;
}
- /**
- * Returns the instant of the oldest metric to consider for each node, or an empty map if metrics from the
- * entire (max) window should be considered.
- */
- private Map<String, Instant> metricStartTimes(List<Node> clusterNodes) {
- ApplicationId application = clusterNodes.get(0).allocation().get().owner();
- List<NodeMetricsDb.AutoscalingEvent> deployments = metricsDb.getEvents(application);
- Map<String, Instant> startTimePerHost = new HashMap<>();
- if (!deployments.isEmpty()) {
- var deployment = deployments.get(deployments.size() - 1);
- List<NodeMetricsDb.NodeMeasurements> generationMeasurements =
- metricsDb.getMeasurements(deployment.time(),
- Metric.generation,
- clusterNodes.stream().map(Node::hostname).collect(Collectors.toList()));
- for (Node node : clusterNodes) {
- startTimePerHost.put(node.hostname(), nodeRepository.clock().instant()); // Discard all unless we can prove otherwise
- var nodeGenerationMeasurements =
- generationMeasurements.stream().filter(m -> m.hostname().equals(node.hostname())).findAny();
- if (nodeGenerationMeasurements.isPresent()) {
- var firstMeasurementOfCorrectGeneration =
- nodeGenerationMeasurements.get().asList().stream()
- .filter(m -> m.value() >= deployment.generation())
- .findFirst();
- if (firstMeasurementOfCorrectGeneration.isPresent()) {
- startTimePerHost.put(node.hostname(), firstMeasurementOfCorrectGeneration.get().at());
- }
- }
- }
- }
- return startTimePerHost;
- }
-
- /**
- * Returns the average load of this resource in the measurement window,
- * or empty if we are not in a position to make decisions from these measurements at this time.
- */
- private Optional<Double> averageLoad(Resource resource,
- List<Node> clusterNodes,
- Map<String, Instant> startTimePerHost) {
- ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type();
-
- List<NodeMetricsDb.NodeMeasurements> measurements =
- metricsDb.getMeasurements(nodeRepository.clock().instant().minus(scalingWindow(clusterType)),
- Metric.from(resource),
- clusterNodes.stream().map(Node::hostname).collect(Collectors.toList()));
- int beforeFilterStale = measurements.stream().mapToInt(m -> m.size()).sum();
- measurements = filterStale(measurements, startTimePerHost);
-
- // Require a total number of measurements scaling with the number of nodes,
- // but don't require that we have at least that many from every node
- int measurementCount = measurements.stream().mapToInt(m -> m.size()).sum();
- if (measurementCount / clusterNodes.size() < minimumMeasurementsPerNode(clusterType)) return Optional.empty();
- if (measurements.size() != clusterNodes.size()) return Optional.empty();
-
- double measurementSum = measurements.stream().flatMap(m -> m.asList().stream()).mapToDouble(m -> m.value()).sum();
- return Optional.of(measurementSum / measurementCount);
- }
-
- private List<NodeMetricsDb.NodeMeasurements> filterStale(List<NodeMetricsDb.NodeMeasurements> measurements,
- Map<String, Instant> startTimePerHost) {
- if (startTimePerHost.isEmpty()) return measurements; // Map is either empty or complete
- return measurements.stream().map(m -> m.copyAfter(startTimePerHost.get(m.hostname()))).collect(Collectors.toList());
- }
-
/** The duration of the window we need to consider to make a scaling decision. See also minimumMeasurementsPerNode */
static Duration scalingWindow(ClusterSpec.Type clusterType) {
if (clusterType.isContent()) return Duration.ofHours(12);
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java
new file mode 100644
index 00000000000..46ba4351082
--- /dev/null
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java
@@ -0,0 +1,98 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hosted.provision.autoscale;
+
+import com.yahoo.config.provision.ApplicationId;
+import com.yahoo.config.provision.ClusterSpec;
+import com.yahoo.vespa.hosted.provision.Node;
+import com.yahoo.vespa.hosted.provision.NodeRepository;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A snapshot which implements the questions we want to ask about metrics for one cluster at one point in time.
+ *
+ * @author bratseth
+ */
+public class MetricSnapshot {
+
+ private final List<Node> clusterNodes;
+ private final NodeMetricsDb db;
+ private final NodeRepository nodeRepository;
+ private final Map<String, Instant> startTimePerHost;
+
+ public MetricSnapshot(List<Node> clusterNodes, NodeMetricsDb db, NodeRepository nodeRepository) {
+ this.clusterNodes = clusterNodes;
+ this.db = db;
+ this.nodeRepository = nodeRepository;
+ this.startTimePerHost = metricStartTimes(clusterNodes, db, nodeRepository);
+ }
+
+ /**
+ * Returns the instant of the oldest metric to consider for each node, or an empty map if metrics from the
+ * entire (max) window should be considered.
+ */
+ private static Map<String, Instant> metricStartTimes(List<Node> clusterNodes,
+ NodeMetricsDb db,
+ NodeRepository nodeRepository) {
+ ApplicationId application = clusterNodes.get(0).allocation().get().owner();
+ List<NodeMetricsDb.AutoscalingEvent> deployments = db.getEvents(application);
+ Map<String, Instant> startTimePerHost = new HashMap<>();
+ if (!deployments.isEmpty()) {
+ var deployment = deployments.get(deployments.size() - 1);
+ List<NodeMetricsDb.NodeMeasurements> generationMeasurements =
+ db.getMeasurements(deployment.time(),
+ Metric.generation,
+ clusterNodes.stream().map(Node::hostname).collect(Collectors.toList()));
+ for (Node node : clusterNodes) {
+ startTimePerHost.put(node.hostname(), nodeRepository.clock().instant()); // Discard all unless we can prove otherwise
+ var nodeGenerationMeasurements =
+ generationMeasurements.stream().filter(m -> m.hostname().equals(node.hostname())).findAny();
+ if (nodeGenerationMeasurements.isPresent()) {
+ var firstMeasurementOfCorrectGeneration =
+ nodeGenerationMeasurements.get().asList().stream()
+ .filter(m -> m.value() >= deployment.generation())
+ .findFirst();
+ if (firstMeasurementOfCorrectGeneration.isPresent()) {
+ startTimePerHost.put(node.hostname(), firstMeasurementOfCorrectGeneration.get().at());
+ }
+ }
+ }
+ }
+ return startTimePerHost;
+ }
+
+ /**
+ * Returns the average load of this resource in the measurement window,
+ * or empty if we are not in a position to make decisions from these measurements at this time.
+ */
+ public Optional<Double> averageLoad(Resource resource) {
+ ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type();
+
+ List<NodeMetricsDb.NodeMeasurements> measurements =
+ db.getMeasurements(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterType)),
+ Metric.from(resource),
+ clusterNodes.stream().map(Node::hostname).collect(Collectors.toList()));
+ measurements = filterStale(measurements, startTimePerHost);
+
+ // Require a total number of measurements scaling with the number of nodes,
+ // but don't require that we have at least that many from every node
+ int measurementCount = measurements.stream().mapToInt(m -> m.size()).sum();
+ if (measurementCount / clusterNodes.size() < Autoscaler.minimumMeasurementsPerNode(clusterType)) return Optional.empty();
+ if (measurements.size() != clusterNodes.size()) return Optional.empty();
+
+ double measurementSum = measurements.stream().flatMap(m -> m.asList().stream()).mapToDouble(m -> m.value()).sum();
+ return Optional.of(measurementSum / measurementCount);
+ }
+
+ private List<NodeMetricsDb.NodeMeasurements> filterStale(List<NodeMetricsDb.NodeMeasurements> measurements,
+ Map<String, Instant> startTimePerHost) {
+ if (startTimePerHost.isEmpty()) return measurements; // Map is either empty or complete
+ return measurements.stream().map(m -> m.copyAfter(startTimePerHost.get(m.hostname()))).collect(Collectors.toList());
+ }
+
+}