From 4f18bc8a12eebfff2854536f562f6941b2cd3254 Mon Sep 17 00:00:00 2001 From: Jon Bratseth Date: Tue, 22 Jun 2021 19:33:34 +0200 Subject: Split into tables --- .../hosted/provision/autoscale/QuestMetricsDb.java | 305 +++++++++++---------- 1 file changed, 160 insertions(+), 145 deletions(-) (limited to 'node-repository') diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java index dc9d5b22f5a..4726993e298 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -49,15 +50,14 @@ import java.util.stream.Collectors; public class QuestMetricsDb extends AbstractComponent implements MetricsDb { private static final Logger log = Logger.getLogger(QuestMetricsDb.class.getName()); - private static final String nodeTable = "metrics"; - private static final String clusterTable = "clusterMetrics"; + + private final Table nodeTable; + private final Table clusterTable; private final Clock clock; private final String dataDir; - private CairoEngine engine; - private ThreadLocal sqlCompiler; - - private long highestTimestampAdded = 0; + private final CairoEngine engine; + private final ThreadLocal sqlCompiler; private volatile int nullRecords = 0; @@ -73,16 +73,6 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { && ! new File(Defaults.getDefaults().vespaHome()).exists()) dataDir = "data"; // We're injected, but not on a node with Vespa installed this.dataDir = dataDir; - initializeDb(); - } - - private void initializeDb() { - IOUtils.createDirectory(dataDir + "/" + nodeTable); - IOUtils.createDirectory(dataDir + "/" + clusterTable); - - // https://stackoverflow.com/questions/67785629/what-does-max-txn-txn-inflight-limit-reached-in-questdb-and-how-to-i-avoid-it - new File(dataDir + "/" + nodeTable + "/_txn_scoreboard").delete(); - new File(dataDir + "/" + clusterTable + "/_txn_scoreboard").delete(); // silence Questdb's custom logging system IOUtils.writeFile(new File(dataDir, "quest-log.conf"), new byte[0]); @@ -90,6 +80,8 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { CairoConfiguration configuration = new DefaultCairoConfiguration(dataDir); engine = new CairoEngine(configuration); sqlCompiler = ThreadLocal.withInitial(() -> new SqlCompiler(engine)); + nodeTable = new Table(dataDir, "metrics", clock); + clusterTable = new Table(dataDir, "clusterMetrics", clock); ensureTablesExist(); } @@ -98,14 +90,14 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { @Override public void addNodeMetrics(Collection> snapshots) { - try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), nodeTable)) { + try (TableWriter writer = nodeTable.getWriter()) { addNodeMetrics(snapshots, writer); } catch (CairoException e) { if (e.getMessage().contains("Cannot read offset")) { // This error seems non-recoverable - repair(e); - try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), nodeTable)) { + nodeTable.repair(e); + try (TableWriter writer = nodeTable.getWriter()) { addNodeMetrics(snapshots, writer); } } @@ -114,10 +106,9 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { private void addNodeMetrics(Collection> snapshots, TableWriter writer) { for (var snapshot : snapshots) { - long atMillis = adjustIfRecent(snapshot.getSecond().at().toEpochMilli(), highestTimestampAdded); - if (atMillis < highestTimestampAdded) continue; // Ignore old data - highestTimestampAdded = atMillis; - TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds + Optional atMillis = nodeTable.adjustOrDiscard(snapshot.getSecond().at()); + if (atMillis.isEmpty()) return; + TableWriter.Row row = writer.newRow(atMillis.get() * 1000); // in microseconds row.putStr(0, snapshot.getFirst()); // (1 is timestamp) row.putFloat(2, (float)snapshot.getSecond().load().cpu()); @@ -134,14 +125,14 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { @Override public void addClusterMetrics(ApplicationId application, Map snapshots) { - try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), clusterTable)) { + try (TableWriter writer = clusterTable.getWriter()) { addClusterMetrics(application, snapshots, writer); } catch (CairoException e) { if (e.getMessage().contains("Cannot read offset")) { // This error seems non-recoverable - repair(e); - try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), clusterTable)) { + clusterTable.repair(e); + try (TableWriter writer = clusterTable.getWriter()) { addClusterMetrics(application, snapshots, writer); } } @@ -150,10 +141,9 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { private void addClusterMetrics(ApplicationId applicationId, Map snapshots, TableWriter writer) { for (var snapshot : snapshots.entrySet()) { - long atMillis = adjustIfRecent(snapshot.getValue().at().toEpochMilli(), highestTimestampAdded); - if (atMillis < highestTimestampAdded) continue; // Ignore old data - highestTimestampAdded = atMillis; - TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds + Optional atMillis = clusterTable.adjustOrDiscard(snapshot.getValue().at()); + if (atMillis.isEmpty()) return; + TableWriter.Row row = writer.newRow(atMillis.get() * 1000); // in microseconds row.putStr(0, applicationId.serializedForm()); row.putStr(1, snapshot.getKey().value()); // (2 is timestamp) @@ -192,39 +182,8 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { @Override public void gc() { nullRecords = 0; - gc(nodeTable); - gc(clusterTable); - } - - private void gc(String table) { - // We remove full days at once and we want to see at least three days to not every only see weekend data - Instant oldestToKeep = clock.instant().minus(Duration.ofDays(4)); - SqlExecutionContext context = newContext(); - int partitions = 0; - try { - File tableRoot = new File(dataDir, table); - List removeList = new ArrayList<>(); - for (String dirEntry : tableRoot.list()) { - File partitionDir = new File(tableRoot, dirEntry); - if ( ! partitionDir.isDirectory()) continue; - - partitions++; - DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC")); - Instant partitionDay = Instant.from(formatter.parse(dirEntry + "T00:00:00")); - if (partitionDay.isBefore(oldestToKeep)) - removeList.add(dirEntry); - - } - // Remove unless all partitions are old: Removing all partitions "will be supported in the future" - if ( removeList.size() < partitions && ! removeList.isEmpty()) { - issue("alter table " + table + " drop partition list " + - removeList.stream().map(dir -> "'" + dir + "'").collect(Collectors.joining(",")), - context); - } - } - catch (SqlException e) { - log.log(Level.WARNING, "Failed to gc old metrics data in " + dataDir + " table " + table, e); - } + nodeTable.gc(); + clusterTable.gc(); } @Override @@ -236,122 +195,76 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { engine.close(); } - /** - * Repairs this db on corruption. - * - * @param e the exception indicating corruption - */ - private void repair(Exception e) { - log.log(Level.WARNING, "QuestDb seems corrupted, wiping data and starting over", e); - IOUtils.recursiveDeleteDir(new File(dataDir)); - initializeDb(); - } - - private boolean exists(String table, SqlExecutionContext context) { - return 0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), table); - } - private void ensureTablesExist() { - SqlExecutionContext context = newContext(); - if (exists(nodeTable, context)) - ensureNodeTableIsUpdated(context); + if (nodeTable.exists()) + ensureNodeTableIsUpdated(); else - createNodeTable(context); + createNodeTable(); - if (exists(clusterTable, context)) - ensureClusterTableIsUpdated(context); + if (clusterTable.exists()) + ensureClusterTableIsUpdated(); else - createClusterTable(context); + createClusterTable(); } - private void createNodeTable(SqlExecutionContext context) { + private void ensureNodeTableIsUpdated() { try { - issue("create table " + nodeTable + + if (0 == engine.getStatus(newContext().getCairoSecurityContext(), new Path(), nodeTable.name)) { + // Example: nodeTable.ensureColumnExists("write_rate", "float"); + } + } catch (Exception e) { + nodeTable.repair(e); + } + } + + private void ensureClusterTableIsUpdated() { + try { + if (0 == engine.getStatus(newContext().getCairoSecurityContext(), new Path(), clusterTable.name)) { + // Example: clusterTable.ensureColumnExists("write_rate", "float"); + } + } catch (Exception e) { + clusterTable.repair(e); + } + } + + private void createNodeTable() { + try { + issue("create table " + nodeTable.name + " (hostname string, at timestamp, cpu_util float, mem_total_util float, disk_util float," + " application_generation long, inService boolean, stable boolean, queries_rate float)" + " timestamp(at)" + "PARTITION BY DAY;", - context); + newContext()); // We should do this if we get a version where selecting on strings work embedded, see below // compiler.compile("alter table " + tableName + " alter column hostname add index", context); } catch (SqlException e) { - throw new IllegalStateException("Could not create Quest db table '" + nodeTable + "'", e); + throw new IllegalStateException("Could not create Quest db table '" + nodeTable.name + "'", e); } } - private void createClusterTable(SqlExecutionContext context) { + private void createClusterTable() { try { - issue("create table " + clusterTable + + issue("create table " + clusterTable.name + " (application string, cluster string, at timestamp, queries_rate float, write_rate float)" + " timestamp(at)" + "PARTITION BY DAY;", - context); + newContext()); // We should do this if we get a version where selecting on strings work embedded, see below // compiler.compile("alter table " + tableName + " alter column cluster add index", context); } catch (SqlException e) { - throw new IllegalStateException("Could not create Quest db table '" + clusterTable + "'", e); - } - } - - private void ensureNodeTableIsUpdated(SqlExecutionContext context) { - try { - if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), nodeTable)) { - ensureColumnExists("queries_rate", "float", nodeTable,context); // TODO: Remove after March 2021 - } - } catch (SqlException e) { - repair(e); - } - } - - private void ensureClusterTableIsUpdated(SqlExecutionContext context) { - try { - if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), nodeTable)) { - ensureColumnExists("write_rate", "float", nodeTable, context); // TODO: Remove after March 2021 - } - } catch (SqlException e) { - repair(e); + throw new IllegalStateException("Could not create Quest db table '" + clusterTable.name + "'", e); } } - private void ensureColumnExists(String column, String columnType, - String table, SqlExecutionContext context) throws SqlException { - if (columnNamesOf(table, context).contains(column)) return; - issue("alter table " + table + " add column " + column + " " + columnType, context); - } - - private List columnNamesOf(String tableName, SqlExecutionContext context) throws SqlException { - List columns = new ArrayList<>(); - try (RecordCursorFactory factory = issue("show columns from " + tableName, context).getRecordCursorFactory()) { - try (RecordCursor cursor = factory.getCursor(context)) { - Record record = cursor.getRecord(); - while (cursor.hasNext()) { - columns.add(record.getStr(0).toString()); - } - } - } - return columns; - } - - private long adjustIfRecent(long timestamp, long highestTimestampAdded) { - if (timestamp >= highestTimestampAdded) return timestamp; - - // We cannot add old data to QuestDb, but we want to use all recent information - long oneMinute = 60 * 1000; - if (timestamp >= highestTimestampAdded - oneMinute) return highestTimestampAdded; - - // Too old; discard - return timestamp; - } - private ListMap getNodeSnapshots(Instant startTime, Set hostnames, SqlExecutionContext context) throws SqlException { DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC")); String from = formatter.format(startTime).substring(0, 19) + ".000000Z"; String to = formatter.format(clock.instant()).substring(0, 19) + ".000000Z"; - String sql = "select * from " + nodeTable + " where at between('" + from + "', '" + to + "');"; + String sql = "select * from " + nodeTable.name + " where at between('" + from + "', '" + to + "');"; // WHERE clauses does not work: // String sql = "select * from " + tableName + " where hostname in('host1', 'host2', 'host3');"; @@ -384,7 +297,7 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { } private ClusterTimeseries getClusterSnapshots(ApplicationId application, ClusterSpec.Id cluster) throws SqlException { - String sql = "select * from " + clusterTable; + String sql = "select * from " + clusterTable.name; var context = newContext(); try (RecordCursorFactory factory = issue(sql, context).getRecordCursorFactory()) { List snapshots = new ArrayList<>(); @@ -414,4 +327,106 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { return new SqlExecutionContextImpl(engine, 1); } + /** A questDb table */ + private class Table { + + private final String name; + private final Clock clock; + private final File dir; + private long highestTimestampAdded = 0; + + Table(String dataDir, String name, Clock clock) { + this.name = name; + this.clock = clock; + this.dir = new File(dataDir, name); + IOUtils.createDirectory(dir.getPath()); + // https://stackoverflow.com/questions/67785629/what-does-max-txn-txn-inflight-limit-reached-in-questdb-and-how-to-i-avoid-it + new File(dir + "/_txn_scoreboard").delete(); + } + + boolean exists() { + return 0 == engine.getStatus(newContext().getCairoSecurityContext(), new Path(), name); + } + + TableWriter getWriter() { + return engine.getWriter(newContext().getCairoSecurityContext(), name); + } + + void gc() { + // We remove full days at once and we want to see at least three days to not every only see weekend data + Instant oldestToKeep = clock.instant().minus(Duration.ofDays(4)); + SqlExecutionContext context = newContext(); + int partitions = 0; + try { + List removeList = new ArrayList<>(); + for (String dirEntry : dir.list()) { + File partitionDir = new File(dir, dirEntry); + if ( ! partitionDir.isDirectory()) continue; + + partitions++; + DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC")); + Instant partitionDay = Instant.from(formatter.parse(dirEntry + "T00:00:00")); + if (partitionDay.isBefore(oldestToKeep)) + removeList.add(dirEntry); + + } + // Remove unless all partitions are old: Removing all partitions "will be supported in the future" + if ( removeList.size() < partitions && ! removeList.isEmpty()) { + issue("alter table " + name + " drop partition list " + + removeList.stream().map(dir -> "'" + dir + "'").collect(Collectors.joining(",")), + context); + } + } + catch (SqlException e) { + log.log(Level.WARNING, "Failed to gc old metrics data in " + dir + " table " + name, e); + } + } + + /** + * Repairs this db on corruption. + * + * @param e the exception indicating corruption + */ + private void repair(Exception e) { + log.log(Level.WARNING, "QuestDb seems corrupted, wiping data and starting over", e); + IOUtils.recursiveDeleteDir(dir); + IOUtils.createDirectory(dir.getPath()); + ensureTablesExist(); + } + + void ensureColumnExists(String column, String columnType) throws SqlException { + if (columnNames().contains(column)) return; + issue("alter table " + name + " add column " + column + " " + columnType, newContext()); + } + + private Optional adjustOrDiscard(Instant at) { + long timestamp = at.toEpochMilli(); + if (timestamp >= highestTimestampAdded) { + highestTimestampAdded = timestamp; + return Optional.of(timestamp); + } + + // We cannot add old data to QuestDb, but we want to use all recent information + if (timestamp >= highestTimestampAdded - 60 * 1000) return Optional.of(highestTimestampAdded); + + // Too old; discard + return Optional.empty(); + } + + private List columnNames() throws SqlException { + var context = newContext(); + List columns = new ArrayList<>(); + try (RecordCursorFactory factory = issue("show columns from " + name, context).getRecordCursorFactory()) { + try (RecordCursor cursor = factory.getCursor(context)) { + Record record = cursor.getRecord(); + while (cursor.hasNext()) { + columns.add(record.getStr(0).toString()); + } + } + } + return columns; + } + + } + } -- cgit v1.2.3