diff options
Diffstat (limited to 'node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java')
-rw-r--r-- | node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java | 186 |
1 files changed, 40 insertions, 146 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java index efa1de6bb97..37e70e3539a 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java @@ -5,8 +5,6 @@ import com.google.inject.Inject; import com.yahoo.collections.ListMap; import com.yahoo.collections.Pair; import com.yahoo.component.AbstractComponent; -import com.yahoo.config.provision.ApplicationId; -import com.yahoo.config.provision.ClusterSpec; import com.yahoo.io.IOUtils; import com.yahoo.vespa.defaults.Defaults; import io.questdb.cairo.CairoConfiguration; @@ -32,7 +30,6 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -48,8 +45,7 @@ import java.util.stream.Collectors; public class QuestMetricsDb extends AbstractComponent implements MetricsDb { private static final Logger log = Logger.getLogger(QuestMetricsDb.class.getName()); - private static final String nodeTable = "metrics"; - private static final String clusterTable = "clusterMetrics"; + private static final String table = "metrics"; private final Clock clock; private final String dataDir; @@ -73,8 +69,7 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { } private void initializeDb() { - IOUtils.createDirectory(dataDir + "/" + nodeTable); - IOUtils.createDirectory(dataDir + "/" + clusterTable); + IOUtils.createDirectory(dataDir + "/" + table); // silence Questdb's custom logging system IOUtils.writeFile(new File(dataDir, "quest-log.conf"), new byte[0]); @@ -83,36 +78,32 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { CairoConfiguration configuration = new DefaultCairoConfiguration(dataDir); engine = new CairoEngine(configuration); - ensureTablesExist(); + ensureExists(table); } @Override - public Clock clock() { return clock; } - - @Override - public void addNodeMetrics(Collection<Pair<String, NodeMetricSnapshot>> snapshots) { - try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), nodeTable)) { - addNodeMetrics(snapshots, writer); + public void add(Collection<Pair<String, MetricSnapshot>> snapshots) { + try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), table)) { + add(snapshots, writer); } catch (CairoException e) { if (e.getMessage().contains("Cannot read offset")) { // This error seems non-recoverable repair(e); - try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), nodeTable)) { - addNodeMetrics(snapshots, writer); + try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), table)) { + add(snapshots, writer); } } } } - private void addNodeMetrics(Collection<Pair<String, NodeMetricSnapshot>> snapshots, TableWriter writer) { + private void add(Collection<Pair<String, MetricSnapshot>> snapshots, TableWriter writer) { for (var snapshot : snapshots) { long atMillis = adjustIfRecent(snapshot.getSecond().at().toEpochMilli(), highestTimestampAdded); if (atMillis < highestTimestampAdded) continue; // Ignore old data highestTimestampAdded = atMillis; TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds row.putStr(0, snapshot.getFirst()); - // (1 is timestamp) row.putFloat(2, (float)snapshot.getSecond().cpu()); row.putFloat(3, (float)snapshot.getSecond().memory()); row.putFloat(4, (float)snapshot.getSecond().disk()); @@ -126,70 +117,23 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { } @Override - public void addClusterMetrics(ApplicationId application, Map<ClusterSpec.Id, ClusterMetricSnapshot> snapshots) { - try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), clusterTable)) { - addClusterMetrics(application, snapshots, writer); - } - catch (CairoException e) { - if (e.getMessage().contains("Cannot read offset")) { - // This error seems non-recoverable - repair(e); - try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), clusterTable)) { - addClusterMetrics(application, snapshots, writer); - } - } - } - } - - private void addClusterMetrics(ApplicationId applicationId, Map<ClusterSpec.Id, ClusterMetricSnapshot> snapshots, TableWriter writer) { - for (var snapshot : snapshots.entrySet()) { - long atMillis = adjustIfRecent(snapshot.getValue().at().toEpochMilli(), highestTimestampAdded); - if (atMillis < highestTimestampAdded) continue; // Ignore old data - highestTimestampAdded = atMillis; - TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds - row.putStr(0, applicationId.serializedForm()); - row.putStr(1, snapshot.getKey().value()); - // (2 is timestamp) - row.putFloat(3, (float)snapshot.getValue().queryRate()); - row.append(); - } - writer.commit(); - } - - @Override public List<NodeTimeseries> getNodeTimeseries(Duration period, Set<String> hostnames) { try (SqlCompiler compiler = new SqlCompiler(engine)) { SqlExecutionContext context = newContext(); - var snapshots = getNodeSnapshots(clock.instant().minus(period), hostnames, compiler, context); + var snapshots = getSnapshots(clock.instant().minus(period), hostnames, compiler, context); return snapshots.entrySet().stream() .map(entry -> new NodeTimeseries(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); } catch (SqlException e) { - throw new IllegalStateException("Could not read node timeseries data in Quest stored in " + dataDir, e); - } - } - - @Override - public ClusterTimeseries getClusterTimeseries(ApplicationId applicationId, ClusterSpec.Id clusterId) { - try (SqlCompiler compiler = new SqlCompiler(engine)) { - SqlExecutionContext context = newContext(); - return getClusterSnapshots(applicationId, clusterId, compiler, context); - } - catch (SqlException e) { - throw new IllegalStateException("Could not read cluster timeseries data in Quest stored in " + dataDir, e); + throw new IllegalStateException("Could not read timeseries data in Quest stored in " + dataDir, e); } } @Override public void gc() { - gc(nodeTable); - gc(clusterTable); - } - - private void gc(String table) { - // We remove full days at once and we want to see at least three days to not every only see weekend data - Instant oldestToKeep = clock.instant().minus(Duration.ofDays(4)); + // Since we remove full days at once we need to keep at least the scaling window + 1 day + Instant oldestToKeep = clock.instant().minus(Autoscaler.maxScalingWindow().plus(Duration.ofDays(1))); SqlExecutionContext context = newContext(); int partitions = 0; try (SqlCompiler compiler = new SqlCompiler(engine)) { @@ -213,7 +157,7 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { context); } catch (SqlException e) { - log.log(Level.WARNING, "Failed to gc old metrics data in " + dataDir + " table " + table, e); + log.log(Level.WARNING, "Failed to gc old metrics data in " + dataDir, e); } } @@ -237,26 +181,18 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { initializeDb(); } - private boolean exists(String table, SqlExecutionContext context) { - return 0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), table); - } - - private void ensureTablesExist() { + private void ensureExists(String table) { SqlExecutionContext context = newContext(); - if (exists(nodeTable, context)) - ensureNodeTableIsUpdated(context); - else - createNodeTable(context); - - if (exists(clusterTable, context)) - ensureClusterTableIsUpdated(context); - else - createClusterTable(context); + if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), table)) { // table exists + ensureTableIsUpdated(table, context); + } else { + createTable(table, context); + } } - private void createNodeTable(SqlExecutionContext context) { + private void createTable(String table, SqlExecutionContext context) { try (SqlCompiler compiler = new SqlCompiler(engine)) { - compiler.compile("create table " + nodeTable + + compiler.compile("create table " + table + " (hostname string, at timestamp, cpu_util float, mem_total_util float, disk_util float," + " application_generation long, inService boolean, stable boolean, queries_rate float)" + " timestamp(at)" + @@ -266,39 +202,20 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { // compiler.compile("alter table " + tableName + " alter column hostname add index", context); } catch (SqlException e) { - throw new IllegalStateException("Could not create Quest db table '" + nodeTable + "'", e); - } - } - - private void createClusterTable(SqlExecutionContext context) { - try (SqlCompiler compiler = new SqlCompiler(engine)) { - compiler.compile("create table " + clusterTable + - " (application string, cluster string, at timestamp, queries_rate float)" + - " timestamp(at)" + - "PARTITION BY DAY;", - context); - // We should do this if we get a version where selecting on strings work embedded, see below - // compiler.compile("alter table " + tableName + " alter column cluster add index", context); - } - catch (SqlException e) { - throw new IllegalStateException("Could not create Quest db table '" + clusterTable + "'", e); + throw new IllegalStateException("Could not create Quest db table '" + table + "'", e); } } - private void ensureNodeTableIsUpdated(SqlExecutionContext context) { + private void ensureTableIsUpdated(String table, SqlExecutionContext context) { try (SqlCompiler compiler = new SqlCompiler(engine)) { - if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), nodeTable)) { - ensureColumnExists("queries_rate", "float", nodeTable, compiler, context); // TODO: Remove after March 2021 + if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), table)) { + ensureColumnExists("queries_rate", "float", table, compiler, context); // TODO: Remove after March 2021 } } catch (SqlException e) { repair(e); } } - private void ensureClusterTableIsUpdated(SqlExecutionContext context) { - // Nothing to do for now - } - private void ensureColumnExists(String column, String columnType, String table, SqlCompiler compiler, SqlExecutionContext context) throws SqlException { if (columnNamesOf(table, compiler, context).contains(column)) return; @@ -329,34 +246,34 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { return timestamp; } - private ListMap<String, NodeMetricSnapshot> getNodeSnapshots(Instant startTime, - Set<String> hostnames, - SqlCompiler compiler, - SqlExecutionContext context) throws SqlException { + private ListMap<String, MetricSnapshot> getSnapshots(Instant startTime, + Set<String> hostnames, + SqlCompiler compiler, + SqlExecutionContext context) throws SqlException { DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC")); String from = formatter.format(startTime).substring(0, 19) + ".000000Z"; String to = formatter.format(clock.instant()).substring(0, 19) + ".000000Z"; - String sql = "select * from " + nodeTable + " where at in('" + from + "', '" + to + "');"; + String sql = "select * from " + table + " where at in('" + from + "', '" + to + "');"; // WHERE clauses does not work: // String sql = "select * from " + tableName + " where hostname in('host1', 'host2', 'host3');"; try (RecordCursorFactory factory = compiler.compile(sql, context).getRecordCursorFactory()) { - ListMap<String, NodeMetricSnapshot> snapshots = new ListMap<>(); + ListMap<String, MetricSnapshot> snapshots = new ListMap<>(); try (RecordCursor cursor = factory.getCursor(context)) { Record record = cursor.getRecord(); while (cursor.hasNext()) { String hostname = record.getStr(0).toString(); if (hostnames.contains(hostname)) { snapshots.put(hostname, - new NodeMetricSnapshot(Instant.ofEpochMilli(record.getTimestamp(1) / 1000), - record.getFloat(2), - record.getFloat(3), - record.getFloat(4), - record.getLong(5), - record.getBool(6), - record.getBool(7), - record.getFloat(8))); + new MetricSnapshot(Instant.ofEpochMilli(record.getTimestamp(1) / 1000), + record.getFloat(2), + record.getFloat(3), + record.getFloat(4), + record.getLong(5), + record.getBool(6), + record.getBool(7), + record.getFloat(8))); } } } @@ -364,29 +281,6 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { } } - private ClusterTimeseries getClusterSnapshots(ApplicationId application, - ClusterSpec.Id cluster, - SqlCompiler compiler, - SqlExecutionContext context) throws SqlException { - String sql = "select * from " + clusterTable; - try (RecordCursorFactory factory = compiler.compile(sql, context).getRecordCursorFactory()) { - List<ClusterMetricSnapshot> snapshots = new ArrayList<>(); - try (RecordCursor cursor = factory.getCursor(context)) { - Record record = cursor.getRecord(); - while (cursor.hasNext()) { - String applicationIdString = record.getStr(0).toString(); - if ( ! application.serializedForm().equals(applicationIdString)) continue; - String clusterId = record.getStr(1).toString(); - if (cluster.value().equals(clusterId)) { - snapshots.add(new ClusterMetricSnapshot(Instant.ofEpochMilli(record.getTimestamp(2) / 1000), - record.getFloat(3))); - } - } - } - return new ClusterTimeseries(cluster, snapshots); - } - } - private SqlExecutionContext newContext() { return new SqlExecutionContextImpl(engine, 1); } |