diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-10-20 17:47:32 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-10-22 15:30:49 +0200 |
commit | 25bceedc5e24b2e27b34536b155119c70a673ecc (patch) | |
tree | ab8c79422c659a69dac281265bb3309e7492625e | |
parent | 64918c40aaf2d0ec632a91b699d4cd92886e9884 (diff) |
Clean up terminology
6 files changed, 210 insertions, 192 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java index 58175a459d9..5b26c53b937 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java @@ -65,11 +65,11 @@ public class Autoscaler { AllocatableClusterResources currentAllocation = new AllocatableClusterResources(clusterNodes, nodeRepository); - MetricSnapshot metricSnapshot = new MetricSnapshot(cluster, clusterNodes, metricsDb, nodeRepository); + ClusterTimeseries clusterTimeseries = new ClusterTimeseries(cluster, clusterNodes, metricsDb, nodeRepository); - Optional<Double> cpuLoad = metricSnapshot.averageLoad(Resource.cpu); - Optional<Double> memoryLoad = metricSnapshot.averageLoad(Resource.memory); - Optional<Double> diskLoad = metricSnapshot.averageLoad(Resource.disk); + Optional<Double> cpuLoad = clusterTimeseries.averageLoad(Resource.cpu); + Optional<Double> memoryLoad = clusterTimeseries.averageLoad(Resource.memory); + Optional<Double> diskLoad = clusterTimeseries.averageLoad(Resource.disk); if (cpuLoad.isEmpty() || memoryLoad.isEmpty() || diskLoad.isEmpty()) return Optional.empty(); var target = ResourceTarget.idealLoad(cpuLoad.get(), memoryLoad.get(), diskLoad.get(), currentAllocation); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java new file mode 100644 index 00000000000..49dea25f8a8 --- /dev/null +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java @@ -0,0 +1,99 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.provision.autoscale; + +import com.yahoo.config.provision.ClusterSpec; +import com.yahoo.vespa.hosted.provision.Node; +import com.yahoo.vespa.hosted.provision.NodeRepository; +import com.yahoo.vespa.hosted.provision.applications.Cluster; + +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A series of metric snapshots for all nodes in a cluster + * + * @author bratseth + */ +public class ClusterTimeseries { + + private final List<Node> clusterNodes; + private final Map<String, Instant> startTimePerNode; + + /** The measurements for all hosts in this snapshot */ + private final List<NodeTimeseries> nodeTimeseries; + + public ClusterTimeseries(Cluster cluster, List<Node> clusterNodes, NodeMetricsDb db, NodeRepository nodeRepository) { + this.clusterNodes = clusterNodes; + ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); + this.nodeTimeseries = db.getNodeTimeseries(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterType)), + clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); + this.startTimePerNode = metricStartTimes(cluster, clusterNodes, nodeRepository); + } + + /** + * Returns the instant of the oldest metric to consider for each node, or an empty map if metrics from the + * entire (max) window should be considered. + */ + private Map<String, Instant> metricStartTimes(Cluster cluster, + List<Node> clusterNodes, + NodeRepository nodeRepository) { + Map<String, Instant> startTimePerHost = new HashMap<>(); + if ( ! cluster.scalingEvents().isEmpty()) { + var deployment = cluster.scalingEvents().get(cluster.scalingEvents().size() - 1); + for (Node node : clusterNodes) { + startTimePerHost.put(node.hostname(), nodeRepository.clock().instant()); // Discard all unless we can prove otherwise + var nodeGenerationMeasurements = + nodeTimeseries.stream().filter(m -> m.hostname().equals(node.hostname())).findAny(); + if (nodeGenerationMeasurements.isPresent()) { + var firstMeasurementOfCorrectGeneration = + nodeGenerationMeasurements.get().asList().stream() + .filter(m -> m.generation() >= deployment.generation()) + .findFirst(); + if (firstMeasurementOfCorrectGeneration.isPresent()) { + startTimePerHost.put(node.hostname(), firstMeasurementOfCorrectGeneration.get().at()); + } + } + } + } + return startTimePerHost; + } + + /** + * Returns the average load of this resource in the measurement window, + * or empty if we are not in a position to make decisions from these measurements at this time. + */ + public Optional<Double> averageLoad(Resource resource) { + ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); + + List<NodeTimeseries> currentMeasurements = filterStale(nodeTimeseries, startTimePerNode); + + // Require a total number of measurements scaling with the number of nodes, + // but don't require that we have at least that many from every node + int measurementCount = currentMeasurements.stream().mapToInt(m -> m.size()).sum(); + if (measurementCount / clusterNodes.size() < Autoscaler.minimumMeasurementsPerNode(clusterType)) return Optional.empty(); + if (currentMeasurements.size() != clusterNodes.size()) return Optional.empty(); + + double measurementSum = currentMeasurements.stream().flatMap(m -> m.asList().stream()).mapToDouble(m -> value(resource, m)).sum(); + return Optional.of(measurementSum / measurementCount); + } + + private double value(Resource resource, MetricSnapshot snapshot) { + switch (resource) { + case cpu: return snapshot.cpu(); + case memory: return snapshot.memory(); + case disk: return snapshot.disk(); + default: throw new IllegalArgumentException("Got an unknown resource " + resource); + } + } + + private List<NodeTimeseries> filterStale(List<NodeTimeseries> timeseries, + Map<String, Instant> startTimePerHost) { + if (startTimePerHost.isEmpty()) return timeseries; // Map is either empty or complete + return timeseries.stream().map(m -> m.copyAfter(startTimePerHost.get(m.hostname()))).collect(Collectors.toList()); + } + +} diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java index 52a8a3bd2ed..2a0661dab02 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java @@ -1,100 +1,44 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.autoscale; -import com.yahoo.config.provision.ClusterSpec; -import com.yahoo.metrics.simple.Measurement; -import com.yahoo.vespa.hosted.provision.Node; -import com.yahoo.vespa.hosted.provision.NodeRepository; -import com.yahoo.vespa.hosted.provision.applications.Cluster; - import java.time.Instant; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; /** - * A snapshot which implements the questions we want to ask about metrics for one cluster at one point in time. + * A single measurement of all values we measure for one node. * * @author bratseth */ public class MetricSnapshot { - private final List<Node> clusterNodes; - private final Map<String, Instant> startTimePerHost; - - /** The measurements for all hosts in this snapshot */ - private final List<NodeMetricsDb.NodeMeasurements> measurements; - - public MetricSnapshot(Cluster cluster, List<Node> clusterNodes, NodeMetricsDb db, NodeRepository nodeRepository) { - this.clusterNodes = clusterNodes; - ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); - this.measurements = db.getMeasurements(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterType)), - clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); - this.startTimePerHost = metricStartTimes(cluster, clusterNodes, nodeRepository); - } - - /** - * Returns the instant of the oldest metric to consider for each node, or an empty map if metrics from the - * entire (max) window should be considered. - */ - private Map<String, Instant> metricStartTimes(Cluster cluster, - List<Node> clusterNodes, - NodeRepository nodeRepository) { - Map<String, Instant> startTimePerHost = new HashMap<>(); - if ( ! cluster.scalingEvents().isEmpty()) { - var deployment = cluster.scalingEvents().get(cluster.scalingEvents().size() - 1); - for (Node node : clusterNodes) { - startTimePerHost.put(node.hostname(), nodeRepository.clock().instant()); // Discard all unless we can prove otherwise - var nodeGenerationMeasurements = - measurements.stream().filter(m -> m.hostname().equals(node.hostname())).findAny(); - if (nodeGenerationMeasurements.isPresent()) { - var firstMeasurementOfCorrectGeneration = - nodeGenerationMeasurements.get().asList().stream() - .filter(m -> m.generation() >= deployment.generation()) - .findFirst(); - if (firstMeasurementOfCorrectGeneration.isPresent()) { - startTimePerHost.put(node.hostname(), firstMeasurementOfCorrectGeneration.get().at()); - } - } - } - } - return startTimePerHost; - } - - /** - * Returns the average load of this resource in the measurement window, - * or empty if we are not in a position to make decisions from these measurements at this time. - */ - public Optional<Double> averageLoad(Resource resource) { - ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); + // TODO: Order by timestamp + /** The time of this measurement in epoch millis */ + private final long timestamp; - List<NodeMetricsDb.NodeMeasurements> currentMeasurements = filterStale(measurements, startTimePerHost); + private final double cpu; + private final double memory; + private final double disk; + private final long generation; - // Require a total number of measurements scaling with the number of nodes, - // but don't require that we have at least that many from every node - int measurementCount = currentMeasurements.stream().mapToInt(m -> m.size()).sum(); - if (measurementCount / clusterNodes.size() < Autoscaler.minimumMeasurementsPerNode(clusterType)) return Optional.empty(); - if (currentMeasurements.size() != clusterNodes.size()) return Optional.empty(); + public MetricSnapshot(MetricsFetcher.NodeMetrics metrics) { + this.timestamp = metrics.timestampSecond() * 1000; + this.cpu = Metric.cpu.measurementFromMetric(metrics.cpuUtil()); + this.memory = Metric.memory.measurementFromMetric(metrics.totalMemUtil()); + this.disk = Metric.disk.measurementFromMetric(metrics.diskUtil()); + this.generation = (long)Metric.generation.measurementFromMetric(metrics.applicationGeneration()); - double measurementSum = currentMeasurements.stream().flatMap(m -> m.asList().stream()).mapToDouble(m -> value(resource, m)).sum(); - return Optional.of(measurementSum / measurementCount); } - private double value(Resource resource, NodeMetricsDb.Measurements measurement) { - switch (resource) { - case cpu: return measurement.cpu(); - case memory: return measurement.memory(); - case disk: return measurement.disk(); - default: throw new IllegalArgumentException("Got an unknown resource " + resource); - } - } - - private List<NodeMetricsDb.NodeMeasurements> filterStale(List<NodeMetricsDb.NodeMeasurements> measurements, - Map<String, Instant> startTimePerHost) { - if (startTimePerHost.isEmpty()) return measurements; // Map is either empty or complete - return measurements.stream().map(m -> m.copyAfter(startTimePerHost.get(m.hostname()))).collect(Collectors.toList()); - } + public double cpu() { return cpu; } + public double memory() { return memory; } + public double disk() { return disk; } + public long generation() { return generation; } + public Instant at() { return Instant.ofEpochMilli(timestamp); } + + @Override + public String toString() { return "metrics at " + timestamp + ": " + + "cpu: " + cpu + + "memory: " + memory + + "disk: " + disk + + "generation: " + generation; } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDb.java index ef9ddceeb75..50181e51f85 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDb.java @@ -1,7 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.autoscale; -import com.yahoo.config.provision.ClusterSpec; import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeRepository; @@ -9,13 +8,11 @@ import java.time.Clock; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; /** * An in-memory time-series "database" of node metrics. @@ -27,8 +24,8 @@ public class NodeMetricsDb { private final NodeRepository nodeRepository; - /** Measurements by host. Each list of measurements is sorted by increasing timestamp */ - private final Map<String, NodeMeasurements> db = new HashMap<>(); + /** Metric time seriest by node (hostname). Each list of metric snapshots is sorted by increasing timestamp */ + private final Map<String, NodeTimeseries> db = new HashMap<>(); /** Lock all access for now since we modify lists inside a map */ private final Object lock = new Object(); @@ -37,27 +34,27 @@ public class NodeMetricsDb { this.nodeRepository = nodeRepository; } - /** Adds measurements to this. */ + /** Adds snapshots to this. */ public void add(Collection<MetricsFetcher.NodeMetrics> nodeMetrics) { synchronized (lock) { for (var value : nodeMetrics) { - add(value.hostname(), new Measurements(value)); + add(value.hostname(), new MetricSnapshot(value)); } } } - private void add(String hostname, Measurements measurement) { - NodeMeasurements measurements = db.get(hostname); - if (measurements == null) { // new node + private void add(String hostname, MetricSnapshot snapshot) { + NodeTimeseries timeseries = db.get(hostname); + if (timeseries == null) { // new node Optional<Node> node = nodeRepository.getNode(hostname); if (node.isEmpty()) return; if (node.get().allocation().isEmpty()) return; - measurements = new NodeMeasurements(hostname, - node.get().allocation().get().membership().cluster().type(), - new ArrayList<>()); - db.put(hostname, measurements); + timeseries = new NodeTimeseries(hostname, + node.get().allocation().get().membership().cluster().type(), + new ArrayList<>()); + db.put(hostname, timeseries); } - measurements.add(measurement); + timeseries.add(snapshot); } /** Must be called intermittently (as long as any add methods are called) to gc old data */ @@ -65,10 +62,10 @@ public class NodeMetricsDb { synchronized (lock) { // Each measurement is Object + long + float = 16 + 8 + 4 = 28 bytes // 12 hours with 1k nodes and 3 resources and 1 measurement/sec is about 5Gb - for (Iterator<NodeMeasurements> i = db.values().iterator(); i.hasNext(); ) { - var measurements = i.next(); - measurements.removeOlderThan(clock.instant().minus(Autoscaler.scalingWindow(measurements.type)).toEpochMilli()); - if (measurements.isEmpty()) + for (Iterator<NodeTimeseries> i = db.values().iterator(); i.hasNext(); ) { + var snapshot = i.next(); + snapshot.removeOlderThan(clock.instant().minus(Autoscaler.scalingWindow(snapshot.type())).toEpochMilli()); + if (snapshot.isEmpty()) i.remove(); } } @@ -78,11 +75,11 @@ public class NodeMetricsDb { * Returns a list of measurements with one entry for each of the given host names * which have any values after startTime, in the same order */ - public List<NodeMeasurements> getMeasurements(Instant startTime, List<String> hostnames) { + public List<NodeTimeseries> getNodeTimeseries(Instant startTime, List<String> hostnames) { synchronized (lock) { - List<NodeMeasurements> measurementsList = new ArrayList<>(hostnames.size()); + List<NodeTimeseries> measurementsList = new ArrayList<>(hostnames.size()); for (String hostname : hostnames) { - NodeMeasurements measurements = db.get(hostname); + NodeTimeseries measurements = db.get(hostname); if (measurements == null) continue; measurements = measurements.copyAfter(startTime); if (measurements.isEmpty()) continue; @@ -92,85 +89,4 @@ public class NodeMetricsDb { } } - /** A list of measurements from a host */ - public static class NodeMeasurements { - - private final String hostname; - private final ClusterSpec.Type type; - private final List<Measurements> measurements; - - // Note: This transfers ownership of the measurement list to this - private NodeMeasurements(String hostname, ClusterSpec.Type type, List<Measurements> measurements) { - this.hostname = hostname; - this.type = type; - this.measurements = measurements; - } - - // Public access - - public boolean isEmpty() { return measurements.isEmpty(); } - - public int size() { return measurements.size(); } - - public Measurements get(int index) { return measurements.get(index); } - - public List<Measurements> asList() { return Collections.unmodifiableList(measurements); } - - public String hostname() { return hostname; } - - public NodeMeasurements copyAfter(Instant oldestTime) { - long oldestTimestamp = oldestTime.toEpochMilli(); - return new NodeMeasurements(hostname, type, - measurements.stream() - .filter(measurement -> measurement.timestamp >= oldestTimestamp) - .collect(Collectors.toList())); - } - - // Private mutation - - private void add(Measurements measurement) { measurements.add(measurement); } - - private void removeOlderThan(long oldestTimestamp) { - while (!measurements.isEmpty() && measurements.get(0).timestamp < oldestTimestamp) - measurements.remove(0); - } - - } - - /** A single measurement of all values we measure, for one node */ - public static class Measurements { - - // TODO: Order by timestamp - /** The time of this measurement in epoch millis */ - private final long timestamp; - - private final double cpu; - private final double memory; - private final double disk; - private final long generation; - - public Measurements(MetricsFetcher.NodeMetrics metrics) { - this.timestamp = metrics.timestampSecond() * 1000; - this.cpu = Metric.cpu.measurementFromMetric(metrics.cpuUtil()); - this.memory = Metric.memory.measurementFromMetric(metrics.totalMemUtil()); - this.disk = Metric.disk.measurementFromMetric(metrics.diskUtil()); - this.generation = (long)Metric.generation.measurementFromMetric(metrics.applicationGeneration()); - - } - - public double cpu() { return cpu; } - public double memory() { return memory; } - public double disk() { return disk; } - public long generation() { return generation; } - public Instant at() { return Instant.ofEpochMilli(timestamp); } - - @Override - public String toString() { return "measurement at " + timestamp + ": " + - "cpu: " + cpu + - "memory: " + memory + - "disk: " + disk + - "generation: " + generation; } - - } - } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeTimeseries.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeTimeseries.java new file mode 100644 index 00000000000..e6f786f55aa --- /dev/null +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeTimeseries.java @@ -0,0 +1,59 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.provision.autoscale; + +import com.yahoo.config.provision.ClusterSpec; + +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A list of metric snapshots from a host + * + * @author bratseth + */ +public class NodeTimeseries { + + private final String hostname; + private final ClusterSpec.Type type; + private final List<MetricSnapshot> snapshots; + + // Note: This transfers ownership of the snapshot list to this + NodeTimeseries(String hostname, ClusterSpec.Type type, List<MetricSnapshot> snapshots) { + this.hostname = hostname; + this.type = type; + this.snapshots = snapshots; + } + + // Public access + + public boolean isEmpty() { return snapshots.isEmpty(); } + + public int size() { return snapshots.size(); } + + public ClusterSpec.Type type() { return type; } + + public MetricSnapshot get(int index) { return snapshots.get(index); } + + public List<MetricSnapshot> asList() { return Collections.unmodifiableList(snapshots); } + + public String hostname() { return hostname; } + + public NodeTimeseries copyAfter(Instant oldestTime) { + return new NodeTimeseries(hostname, type, + snapshots.stream() + .filter(measurement -> measurement.at().equals(oldestTime) || measurement.at().isAfter(oldestTime)) + .collect(Collectors.toList())); + } + + // Restricted mutation + + void add(MetricSnapshot snapshot) { snapshots.add(snapshot); } + + void removeOlderThan(long oldestTimestamp) { + while (!snapshots.isEmpty() && snapshots.get(0).at().toEpochMilli() < oldestTimestamp) + snapshots.remove(0); + } + +} diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsFetcherDbTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsFetcherDbTest.java index 01d2a177ca7..3ae71b0b40c 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsFetcherDbTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsFetcherDbTest.java @@ -47,12 +47,12 @@ public class MetricsFetcherDbTest { // Avoid off-by-one bug when the below windows starts exactly on one of the above getEpochSecond() timestamps. clock.advance(Duration.ofMinutes(1)); - assertEquals(35, measurementCount(db.getMeasurements(clock.instant().minus(Duration.ofHours(6)), List.of(node0)))); + assertEquals(35, measurementCount(db.getNodeTimeseries(clock.instant().minus(Duration.ofHours(6)), List.of(node0)))); db.gc(clock); - assertEquals( 5, measurementCount(db.getMeasurements(clock.instant().minus(Duration.ofHours(6)), List.of(node0)))); + assertEquals( 5, measurementCount(db.getNodeTimeseries(clock.instant().minus(Duration.ofHours(6)), List.of(node0)))); } - private int measurementCount(List<NodeMetricsDb.NodeMeasurements> measurements) { + private int measurementCount(List<NodeTimeseries> measurements) { return measurements.stream().mapToInt(m -> m.size()).sum(); } |