diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-10-22 14:04:42 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-10-22 15:35:55 +0200 |
commit | 5c54905d8bf15e8e0d467ead1c4b919103055e0f (patch) | |
tree | f06985ec3f27e45234bb968c0d7c16c1ac5d4a36 | |
parent | 081c3fb60c09797e64f317563901c3aefa35f3f0 (diff) |
Quest MetricsDb implementation
13 files changed, 227 insertions, 200 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java index f1835302b18..c73a19bd9e2 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java @@ -101,6 +101,10 @@ public class Autoscaler { return Duration.ofHours(1); } + static Duration maxScalingWindow() { + return Duration.ofHours(12); + } + /** Measurements are currently taken once a minute. See also scalingWindow */ static int minimumMeasurementsPerNode(ClusterSpec.Type clusterType) { if (clusterType.isContent()) return 60; diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java index 58bc08165d0..2123ecd0224 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ClusterTimeseries.java @@ -30,7 +30,7 @@ public class ClusterTimeseries { this.clusterNodes = clusterNodes; ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); this.nodeTimeseries = db.getNodeTimeseries(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterType)), - clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); + clusterNodes.stream().map(Node::hostname).collect(Collectors.toSet())); this.startTimePerNode = metricStartTimes(cluster, clusterNodes, nodeRepository); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MemoryMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MemoryMetricsDb.java index c00cd60dfcf..022a565e7f2 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MemoryMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MemoryMetricsDb.java @@ -5,6 +5,7 @@ import com.yahoo.collections.Pair; import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeRepository; +import javax.inject.Inject; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; @@ -13,6 +14,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; /** * An in-memory implementation of the metrics Db. @@ -44,13 +47,22 @@ public class MemoryMetricsDb implements MetricsDb { } @Override - public void gc(Clock clock) { + public List<NodeTimeseries> getNodeTimeseries(Instant startTime, Set<String> hostnames) { + synchronized (lock) { + return hostnames.stream() + .map(hostname -> db.getOrDefault(hostname, new NodeTimeseries(hostname, List.of())).justAfter(startTime)) + .collect(Collectors.toList()); + } + } + + @Override + public void gc() { synchronized (lock) { // Each measurement is Object + long + float = 16 + 8 + 4 = 28 bytes // 12 hours with 1k nodes and 3 resources and 1 measurement/sec is about 5Gb for (String hostname : db.keySet()) { var timeseries = db.get(hostname); - timeseries = timeseries.justAfter(clock.instant().minus(Autoscaler.scalingWindow(timeseries.type()))); + timeseries = timeseries.justAfter(nodeRepository.clock().instant().minus(Autoscaler.maxScalingWindow())); if (timeseries.isEmpty()) db.remove(hostname); else @@ -60,21 +72,6 @@ public class MemoryMetricsDb implements MetricsDb { } @Override - public List<NodeTimeseries> getNodeTimeseries(Instant startTime, List<String> hostnames) { - synchronized (lock) { - List<NodeTimeseries> measurementsList = new ArrayList<>(hostnames.size()); - for (String hostname : hostnames) { - NodeTimeseries measurements = db.get(hostname); - if (measurements == null) continue; - measurements = measurements.justAfter(startTime); - if (measurements.isEmpty()) continue; - measurementsList.add(measurements); - } - return measurementsList; - } - } - - @Override public void close() {} private void add(String hostname, MetricSnapshot snapshot) { @@ -83,9 +80,7 @@ public class MemoryMetricsDb implements MetricsDb { Optional<Node> node = nodeRepository.getNode(hostname); if (node.isEmpty()) return; if (node.get().allocation().isEmpty()) return; - timeseries = new NodeTimeseries(hostname, - node.get().allocation().get().membership().cluster().type(), - new ArrayList<>()); + timeseries = new NodeTimeseries(hostname, new ArrayList<>()); db.put(hostname, timeseries); } db.put(hostname, timeseries.add(snapshot)); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java index 8e0f103d57b..7861cf4698d 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java @@ -26,17 +26,17 @@ public class MetricSnapshot { this.generation = generation; } + public Instant at() { return at; } public double cpu() { return cpu; } public double memory() { return memory; } public double disk() { return disk; } public long generation() { return generation; } - public Instant at() { return at; } @Override - public String toString() { return "metrics at " + at + ": " + - "cpu: " + cpu + - "memory: " + memory + - "disk: " + disk + - "generation: " + generation; } + public String toString() { return "metrics at " + at + ":" + + " cpu: " + cpu + + " memory: " + memory + + " disk: " + disk + + " generation: " + generation; } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsDb.java index c9cf295cade..ea4ce4b44de 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsDb.java @@ -8,6 +8,7 @@ import java.time.Clock; import java.time.Instant; import java.util.Collection; import java.util.List; +import java.util.Set; /** * An in-memory time-series database of node metrics. @@ -19,14 +20,14 @@ public interface MetricsDb { /** Adds snapshots to this. */ void add(Collection<Pair<String, MetricSnapshot>> nodeMetrics); - /** Must be called intermittently (as long as add is called) to gc old data */ - void gc(Clock clock); - /** - * Returns a list of time series for each of the given host names - * which have any values after startTime. + * Returns a list with one entry for each hostname containing + * the snapshots recorded after the given time (or an empty snapshot if none). */ - List<NodeTimeseries> getNodeTimeseries(Instant startTime, List<String> hostnames); + List<NodeTimeseries> getNodeTimeseries(Instant startTime, Set<String> hostnames); + + /** Must be called intermittently (as long as add is called) to gc old data */ + void gc(); void close(); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeTimeseries.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeTimeseries.java index 778a2110d2c..6cba3928b8f 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeTimeseries.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeTimeseries.java @@ -17,13 +17,11 @@ import java.util.stream.Collectors; public class NodeTimeseries { private final String hostname; - private final ClusterSpec.Type type; private final List<MetricSnapshot> snapshots; // Note: This transfers ownership of the snapshot list to this - NodeTimeseries(String hostname, ClusterSpec.Type type, List<MetricSnapshot> snapshots) { + NodeTimeseries(String hostname, List<MetricSnapshot> snapshots) { this.hostname = hostname; - this.type = type; this.snapshots = snapshots; } @@ -31,8 +29,6 @@ public class NodeTimeseries { public int size() { return snapshots.size(); } - public ClusterSpec.Type type() { return type; } - public MetricSnapshot get(int index) { return snapshots.get(index); } public List<MetricSnapshot> asList() { return Collections.unmodifiableList(snapshots); } @@ -42,11 +38,11 @@ public class NodeTimeseries { public NodeTimeseries add(MetricSnapshot snapshot) { List<MetricSnapshot> list = new ArrayList<>(snapshots); list.add(snapshot); - return new NodeTimeseries(hostname(), type(), list); + return new NodeTimeseries(hostname(), list); } public NodeTimeseries justAfter(Instant oldestTime) { - return new NodeTimeseries(hostname, type, + return new NodeTimeseries(hostname, snapshots.stream() .filter(snapshot -> snapshot.at().equals(oldestTime) || snapshot.at().isAfter(oldestTime)) .collect(Collectors.toList())); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java index 244aee3c117..d81b40c2473 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java @@ -1,6 +1,8 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.autoscale; +import com.yahoo.collections.ListMap; +import com.yahoo.collections.Pair; import com.yahoo.io.IOUtils; import io.questdb.cairo.CairoConfiguration; import io.questdb.cairo.CairoEngine; @@ -13,28 +15,48 @@ import io.questdb.griffin.SqlCompiler; import io.questdb.griffin.SqlException; import io.questdb.griffin.SqlExecutionContext; import io.questdb.griffin.SqlExecutionContextImpl; -import io.questdb.std.Os; import io.questdb.std.str.Path; +import javax.inject.Inject; import java.io.File; +import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; -public class QuestMetricsDb implements AutoCloseable { +/** + * An implementation of the metrics Db backed by Quest: + * This provides local persistent storage of metrics with fast, multi-threaded lookup and write, + * suitable for production. + * + * @author bratseth + */ +public class QuestMetricsDb implements MetricsDb { + private static final Logger log = Logger.getLogger(QuestMetricsDb.class.getName()); private static final String tableName = "metrics"; + private final Clock clock; private final String dataDir; private final CairoEngine engine; + @Inject public QuestMetricsDb() { + this("data", Clock.systemUTC()); + } + + public QuestMetricsDb(String dataDir, Clock clock) { + this.clock = clock; + this.dataDir = dataDir; System.setProperty("questdbLog", "etc/quest-log.conf"); // silence Questdb's custom logging system - dataDir = "data"; IOUtils.createDirectory(dataDir + "/" + tableName); CairoConfiguration configuration = new DefaultCairoConfiguration(dataDir); engine = new CairoEngine(configuration); @@ -42,61 +64,40 @@ public class QuestMetricsDb implements AutoCloseable { } @Override - public void close() { - if (engine != null) - engine.close(); - } - - public void addMetrics() { + public void add(Collection<Pair<String, MetricSnapshot>> snapshots) { try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), tableName)) { - for (int i = 0; i < 10; i++) { - TableWriter.Row row = writer.newRow(Os.currentTimeMicros()); - row.putStr(0, "host" + i); - row.putTimestamp(1, Instant.now().toEpochMilli()); - row.putFloat(2, i * 1.1F); - row.putFloat(3, i * 2.2F); - row.putFloat(4, i * 3.3F); - row.putFloat(5, i); // really a long, but keep this uniform? + for (var snapshot : snapshots) { + long atMillis = snapshot.getSecond().at().toEpochMilli(); + TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds + row.putStr(0, snapshot.getFirst()); + row.putFloat(2, (float)snapshot.getSecond().cpu()); + row.putFloat(3, (float)snapshot.getSecond().memory()); + row.putFloat(4, (float)snapshot.getSecond().disk()); + row.putLong(5, snapshot.getSecond().generation()); row.append(); } writer.commit(); } } - private void ensureExists(String tableName) { - SqlExecutionContext context = newContext(); - if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), tableName)) return; - + @Override + public List<NodeTimeseries> getNodeTimeseries(Instant startTime, Set<String> hostnames) { try (SqlCompiler compiler = new SqlCompiler(engine)) { - compiler.compile("create table " + tableName + - " (host string, at timestamp, cpu_util float, mem_total_util float, disk_util float, application_generation float)" + - " timestamp(at)" + - "PARTITION BY DAY;", - context); + SqlExecutionContext context = newContext(); + var snapshots = getSnapshots(startTime, hostnames, compiler, context); + return snapshots.entrySet().stream() + .map(entry -> new NodeTimeseries(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); } catch (SqlException e) { - throw new IllegalStateException("Could not create Quest db table '" + tableName + "'", e); - } - } - - private void readData(String tableName, CairoEngine engine, SqlExecutionContextImpl context) throws SqlException { - try (SqlCompiler compiler = new SqlCompiler(engine)) { - try (RecordCursorFactory factory = compiler.compile(tableName, context).getRecordCursorFactory()) { - try (RecordCursor cursor = factory.getCursor(context)) { - Record record = cursor.getRecord(); - double cpuUtilSum = 0; - int rowCount = 0; - while (cursor.hasNext()) { - cpuUtilSum += record.getFloat(2); - rowCount++; - } - } - } + throw new IllegalStateException("Could not read timeseries data in Quest stored in " + dataDir, e); } } - private void gc() throws SqlException { - int maxAgeDays = 3; + @Override + public void gc() { + // Since we remove full days at once we need to keep at least the scaling window + 1 day + Instant oldestToKeep = clock.instant().minus(Autoscaler.maxScalingWindow().plus(Duration.ofDays(1))); SqlExecutionContext context = newContext(); try (SqlCompiler compiler = new SqlCompiler(engine)) { File tableRoot = new File(dataDir, tableName); @@ -106,12 +107,72 @@ public class QuestMetricsDb implements AutoCloseable { if ( ! partitionDir.isDirectory()) continue; DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC")); Instant partitionDay = Instant.from(formatter.parse(dirEntry + "T00:00:00")); - if (partitionDay.isBefore(Instant.now().minus(Duration.ofDays(maxAgeDays)))) + if (partitionDay.isBefore(oldestToKeep)) removeList.add(dirEntry); } - compiler.compile("alter table " + tableName + " drop partition " + - removeList.stream().map(dir -> "'" + dir + "'").collect(Collectors.joining(",")), + if ( ! removeList.isEmpty()) + compiler.compile("alter table " + tableName + " drop partition " + + removeList.stream().map(dir -> "'" + dir + "'").collect(Collectors.joining(",")), + context); + } + catch (SqlException e) { + log.log(Level.WARNING, "Failed to gc old metrics data in " + dataDir, e); + } + } + + @Override + public void close() { + if (engine != null) + engine.close(); + } + + private void ensureExists(String tableName) { + SqlExecutionContext context = newContext(); + if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), tableName)) return; + + try (SqlCompiler compiler = new SqlCompiler(engine)) { + compiler.compile("create table " + tableName + + " (hostname string, at timestamp, cpu_util float, mem_total_util float, disk_util float, application_generation long)" + + " timestamp(at)" + + "PARTITION BY DAY;", context); + // We should do this if we get a version where selecting on stringhs work embedded, see below + // compiler.compile("alter table " + tableName + " alter column hostname add index", context); + } + catch (SqlException e) { + throw new IllegalStateException("Could not create Quest db table '" + tableName + "'", e); + } + } + + private ListMap<String, MetricSnapshot> getSnapshots(Instant startTime, + Set<String> hostnames, + SqlCompiler compiler, + SqlExecutionContext context) throws SqlException { + DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC")); + String from = formatter.format(startTime).substring(0, 19) + ".000000Z"; + String to = formatter.format(clock.instant()).substring(0, 19) + ".000000Z"; + String sql = "select * from " + tableName + " where at in('" + from + "', '" + to + "');"; + + // WHERE clauses does not work: + // String sql = "select * from " + tableName + " where hostname in('host1', 'host2', 'host3');"; + + try (RecordCursorFactory factory = compiler.compile(sql, context).getRecordCursorFactory()) { + ListMap<String, MetricSnapshot> snapshots = new ListMap<>(); + try (RecordCursor cursor = factory.getCursor(context)) { + Record record = cursor.getRecord(); + while (cursor.hasNext()) { + String hostname = record.getStr(0).toString(); + if (hostnames.contains(hostname)) { + snapshots.put(hostname, + new MetricSnapshot(Instant.ofEpochMilli(record.getTimestamp(1) / 1000), + record.getFloat(2), + record.getFloat(3), + record.getFloat(4), + record.getLong(5))); + } + } + } + return snapshots; } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java index de8065fe32b..5770564d23a 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java @@ -47,7 +47,7 @@ public class NodeMetricsDbMaintainer extends NodeRepositoryMaintainer { log.log(Level.WARNING, "Could not update metrics for " + application + ": " + Exceptions.toMessageString(e)); } } - metricsDb.gc(nodeRepository().clock()); + metricsDb.gc(); // Suppress failures for manual zones for now to avoid noise if (nodeRepository().zone().environment().isManuallyDeployed()) return true; diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/GroupPreparer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/GroupPreparer.java index 7f62eb8632c..99557cb0908 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/GroupPreparer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/GroupPreparer.java @@ -67,7 +67,7 @@ public class GroupPreparer { { MutableInteger probePrepareHighestIndex = new MutableInteger(highestIndex.get()); NodeAllocation probeAllocation = prepareAllocation(application, cluster, requestedNodes, surplusActiveNodes, - probePrepareHighestIndex, wantedGroups, allocateFully, PROBE_LOCK); + probePrepareHighestIndex, wantedGroups, allocateFully, PROBE_LOCK); if (probeAllocation.fulfilledAndNoChanges()) { List<Node> acceptedNodes = probeAllocation.finalNodes(); surplusActiveNodes.removeAll(acceptedNodes); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingIntegrationTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingIntegrationTest.java index eac27a5e21a..9332eb79f20 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingIntegrationTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingIntegrationTest.java @@ -47,7 +47,7 @@ public class AutoscalingIntegrationTest { tester.clock().advance(Duration.ofSeconds(10)); tester.nodeMetricsDb().add(fetcher.fetchMetrics(application1)); tester.clock().advance(Duration.ofSeconds(10)); - tester.nodeMetricsDb().gc(tester.clock()); + tester.nodeMetricsDb().gc(); } ClusterResources min = new ClusterResources(2, 1, nodes); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsFetcherDbTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsFetcherDbTest.java index 9dad8ed9df8..bc5303e14b8 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsFetcherDbTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsFetcherDbTest.java @@ -15,6 +15,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; import static org.junit.Assert.assertEquals; @@ -40,16 +41,16 @@ public class MetricsFetcherDbTest { Collection<Pair<String, MetricSnapshot>> values = new ArrayList<>(); for (int i = 0; i < 40; i++) { values.add(new Pair<>(node0, new MetricSnapshot(clock.instant(), 0.9f, 0.6f, 0.6f, 0))); - clock.advance(Duration.ofMinutes(10)); + clock.advance(Duration.ofMinutes(120)); } db.add(values); // Avoid off-by-one bug when the below windows starts exactly on one of the above getEpochSecond() timestamps. clock.advance(Duration.ofMinutes(1)); - assertEquals(35, measurementCount(db.getNodeTimeseries(clock.instant().minus(Duration.ofHours(6)), List.of(node0)))); - db.gc(clock); - assertEquals( 5, measurementCount(db.getNodeTimeseries(clock.instant().minus(Duration.ofHours(6)), List.of(node0)))); + assertEquals(35, measurementCount(db.getNodeTimeseries(clock.instant().minus(Duration.ofHours(72)), Set.of(node0)))); + db.gc(); + assertEquals( 5, measurementCount(db.getNodeTimeseries(clock.instant().minus(Duration.ofHours(72)), Set.of(node0)))); } private int measurementCount(List<NodeTimeseries> measurements) { diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDbTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDbTest.java index 1f3cc11fb5e..a1cc66ffa28 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDbTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDbTest.java @@ -1,136 +1,105 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.autoscale; +import com.yahoo.collections.Pair; import com.yahoo.io.IOUtils; -import io.questdb.cairo.CairoConfiguration; -import io.questdb.cairo.CairoEngine; -import io.questdb.cairo.DefaultCairoConfiguration; -import io.questdb.cairo.TableWriter; -import io.questdb.cairo.sql.Record; -import io.questdb.cairo.sql.RecordCursor; -import io.questdb.cairo.sql.RecordCursorFactory; -import io.questdb.griffin.SqlCompiler; -import io.questdb.griffin.SqlException; -import io.questdb.griffin.SqlExecutionContextImpl; -import io.questdb.std.str.Path; +import com.yahoo.test.ManualClock; import org.junit.Test; import java.io.File; import java.time.Duration; import java.time.Instant; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; /** - * Standalone test of setting up a Quest Db partitioned by days, - * writing data over the days and then removing old entries. + * Tests the Quest metrics db. * * @author bratseth */ public class QuestMetricsDbTest { - private final Instant now = Instant.from(DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC")) - .parse("2020-10-05T00:00:00")); - + private static final double delta = 0.0000001; @Test - public void testQuestMetricsDb() throws SqlException { - System.setProperty("questdbLog", "etc/quest-log.conf"); // silence Questdb's custom logging system - String dataDir = "data/QuestMetricsDbTest"; - IOUtils.recursiveDeleteDir(new File(dataDir)); // Without this, dropping partitions sometimes fail + public void testReadWrite() { + String dataDir = "data/QuestMetricsDbReadWrite"; + IOUtils.recursiveDeleteDir(new File(dataDir)); IOUtils.createDirectory(dataDir + "/metrics"); - CairoConfiguration configuration = new DefaultCairoConfiguration(dataDir); - try (CairoEngine engine = new CairoEngine(configuration)) { // process-wide singleton - try (SqlCompiler compiler = new SqlCompiler(engine)) { - SqlExecutionContextImpl context = new SqlExecutionContextImpl(engine, 1); // for single thread - initDb("metrics", engine, context, compiler); - - assertEquals(0, readRows("metrics", context, compiler)); - - writeRows(1000, 10, "metrics", engine, context); - assertEquals(1000, readRows("metrics", context, compiler)); - - deleteData(3, "metrics", dataDir, context, compiler); - assertEquals(300, readRows("metrics", context, compiler)); - } - } - } - - private void initDb(String tableName, CairoEngine engine, SqlExecutionContextImpl context, SqlCompiler compiler) throws SqlException { - if ( ! exists(tableName, engine, context)) - create(tableName, context, compiler); - else - clear(tableName, context, compiler); - } - - private void writeRows(int rows, int days, String tableName, CairoEngine engine, SqlExecutionContextImpl context) { - long oldest = now.minus(Duration.ofDays(days)).toEpochMilli(); - long timeStep = (now.toEpochMilli() - oldest) / rows; - - try (TableWriter writer = engine.getWriter(context.getCairoSecurityContext(), tableName)) { - for (int i = 0; i < rows; i++) { - long time = oldest + i * timeStep; - TableWriter.Row row = writer.newRow(time * 1000); // in microseconds - row.putStr(0, "host" + i); - row.putTimestamp(1, time); - row.putFloat(2, i * 1.1F); - row.putFloat(3, i * 2.2F); - row.putFloat(4, i * 3.3F); - row.putFloat(5, i); // really a long, but keep this uniform? - row.append(); - } - writer.commit(); - } - } - - private boolean exists(String tableName, CairoEngine engine, SqlExecutionContextImpl context) { - return 0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), tableName); - } - - private void create(String tableName, SqlExecutionContextImpl context, SqlCompiler compiler) throws SqlException { - compiler.compile("create table " + tableName + - " (host string, at timestamp, cpu_util float, mem_total_util float, disk_util float, application_generation float)" + - " timestamp(at)" + - "PARTITION BY DAY;", - context); + ManualClock clock = new ManualClock("2020-10-01T00:00:00"); + QuestMetricsDb db = new QuestMetricsDb(dataDir, clock); + Instant startTime = clock.instant(); + clock.advance(Duration.ofSeconds(1)); + db.add(timeseries(1000, Duration.ofSeconds(1), clock, "host1", "host2", "host3")); + + clock.advance(Duration.ofSeconds(1)); + + // Read all of one host + List<NodeTimeseries> nodeTimeSeries1 = db.getNodeTimeseries(startTime, Set.of("host1")); + assertEquals(1, nodeTimeSeries1.size()); + assertEquals("host1", nodeTimeSeries1.get(0).hostname()); + assertEquals(1000, nodeTimeSeries1.get(0).size()); + MetricSnapshot snapshot = nodeTimeSeries1.get(0).asList().get(0); + assertEquals(startTime.plus(Duration.ofSeconds(1)), snapshot.at()); + assertEquals(0.1, snapshot.cpu(), delta); + assertEquals(0.2, snapshot.memory(), delta); + assertEquals(0.4, snapshot.disk(), delta); + assertEquals(1, snapshot.generation(), delta); + + // Read all from 2 hosts + List<NodeTimeseries> nodeTimeSeries2 = db.getNodeTimeseries(startTime, Set.of("host2", "host3")); + assertEquals(2, nodeTimeSeries2.size()); + assertEquals(Set.of("host2", "host3"), nodeTimeSeries2.stream().map(ts -> ts.hostname()).collect(Collectors.toSet())); + assertEquals(1000, nodeTimeSeries2.get(0).size()); + assertEquals(1000, nodeTimeSeries2.get(1).size()); + + // Read a short interval from 3 hosts + List<NodeTimeseries> nodeTimeSeries3 = db.getNodeTimeseries(clock.instant().minus(Duration.ofSeconds(3)), + Set.of("host1", "host2", "host3")); + assertEquals(3, nodeTimeSeries3.size()); + assertEquals(Set.of("host1", "host2", "host3"), nodeTimeSeries3.stream().map(ts -> ts.hostname()).collect(Collectors.toSet())); + assertEquals(2, nodeTimeSeries3.get(0).size()); + assertEquals(2, nodeTimeSeries3.get(1).size()); + assertEquals(2, nodeTimeSeries3.get(2).size()); } - private void clear(String tableName, SqlExecutionContextImpl context, SqlCompiler compiler) throws SqlException { - compiler.compile("truncate table " + tableName, context); - } - - private int readRows(String tableName, SqlExecutionContextImpl context, SqlCompiler compiler) throws SqlException { - try (RecordCursorFactory factory = compiler.compile(tableName, context).getRecordCursorFactory()) { - try (RecordCursor cursor = factory.getCursor(context)) { - Record record = cursor.getRecord(); - int rowCount = 0; - while (cursor.hasNext()) { - rowCount++; - } - return rowCount; - } - } + @Test + public void testGc() { + String dataDir = "data/QuestMetricsDbGc"; + IOUtils.recursiveDeleteDir(new File(dataDir)); + IOUtils.createDirectory(dataDir + "/metrics"); + ManualClock clock = new ManualClock("2020-10-01T00:00:00"); + QuestMetricsDb db = new QuestMetricsDb(dataDir, clock); + Instant startTime = clock.instant(); + int dayOffset = 3; + clock.advance(Duration.ofHours(dayOffset)); + db.add(timeseries(24 * 10, Duration.ofHours(1), clock, "host1", "host2", "host3")); + + assertEquals(24 * 10, db.getNodeTimeseries(startTime, Set.of("host1")).get(0).size()); + db.gc(); + assertEquals(24 * 1 + dayOffset, db.getNodeTimeseries(startTime, Set.of("host1")).get(0).size()); + db.gc(); // no-op + assertEquals(24 * 1 + dayOffset, db.getNodeTimeseries(startTime, Set.of("host1")).get(0).size()); } - private void deleteData(int maxAgeDays, String tableName, String dataDir, SqlExecutionContextImpl context, SqlCompiler compiler) throws SqlException { - File tableRoot = new File(dataDir, tableName); - List<String> removeList = new ArrayList<>(); - for (String dirEntry : tableRoot.list()) { - File partitionDir = new File(tableRoot, dirEntry); - if ( ! partitionDir.isDirectory()) continue; - DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC")); - Instant partitionDay = Instant.from(formatter.parse(dirEntry + "T00:00:00")); - if (partitionDay.isBefore(now.minus(Duration.ofDays(maxAgeDays)))) - removeList.add(dirEntry); + private Collection<Pair<String, MetricSnapshot>> timeseries(int countPerHost, Duration sampleRate, ManualClock clock, + String ... hosts) { + Collection<Pair<String, MetricSnapshot>> timeseries = new ArrayList<>(); + for (int i = 1; i <= countPerHost; i++) { + for (String host : hosts) + timeseries.add(new Pair<>(host, new MetricSnapshot(clock.instant(), + i * 0.1, + i * 0.2, + i * 0.4, + i % 100))); + clock.advance(sampleRate); } - compiler.compile("alter table " + tableName + " drop partition " + - removeList.stream().map(dir -> "'" + dir + "'").collect(Collectors.joining(",")), - context); + return timeseries; } } diff --git a/vespajlib/src/main/java/com/yahoo/collections/Pair.java b/vespajlib/src/main/java/com/yahoo/collections/Pair.java index 6587d1804f9..3c593ab0fe9 100644 --- a/vespajlib/src/main/java/com/yahoo/collections/Pair.java +++ b/vespajlib/src/main/java/com/yahoo/collections/Pair.java @@ -46,7 +46,7 @@ public class Pair<F, S> { @Override public String toString() { - return "(" + first + "," + second + ")"; + return "(" + first + ", " + second + ")"; } } |