diff options
-rw-r--r-- | node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java | 58 |
1 files changed, 43 insertions, 15 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java index 946d47e9246..ad989d5abfd 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java @@ -9,6 +9,7 @@ import com.yahoo.io.IOUtils; import com.yahoo.vespa.defaults.Defaults; import io.questdb.cairo.CairoConfiguration; import io.questdb.cairo.CairoEngine; +import io.questdb.cairo.CairoException; import io.questdb.cairo.DefaultCairoConfiguration; import io.questdb.cairo.TableWriter; import io.questdb.cairo.sql.Record; @@ -48,7 +49,7 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { private final Clock clock; private final String dataDir; - private final CairoEngine engine; + private CairoEngine engine; private long highestTimestampAdded = 0; @@ -64,7 +65,10 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { && ! new File(Defaults.getDefaults().vespaHome()).exists()) dataDir = "data"; // We're injected, but not on a node with Vespa installed this.dataDir = dataDir; + initializeDb(); + } + private void initializeDb() { IOUtils.createDirectory(dataDir + "/" + tableName); // silence Questdb's custom logging system @@ -80,24 +84,37 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { @Override public void add(Collection<Pair<String, MetricSnapshot>> snapshots) { try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), tableName)) { - for (var snapshot : snapshots) { - long atMillis = adjustIfRecent(snapshot.getSecond().at().toEpochMilli(), highestTimestampAdded); - if (atMillis < highestTimestampAdded) continue; // Ignore old data - highestTimestampAdded = atMillis; - TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds - row.putStr(0, snapshot.getFirst()); - row.putFloat(2, (float)snapshot.getSecond().cpu()); - row.putFloat(3, (float)snapshot.getSecond().memory()); - row.putFloat(4, (float)snapshot.getSecond().disk()); - row.putLong(5, snapshot.getSecond().generation()); - row.putBool(6, snapshot.getSecond().inService()); - row.putBool(7, snapshot.getSecond().stable()); - row.append(); + add(snapshots, writer); + } + catch (CairoException e) { + if (e.getMessage().contains("Cannot read offset")) { + // This error seems non-recoverable + repair(e); + try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), tableName)) { + add(snapshots, writer); + } } - writer.commit(); } } + private void add(Collection<Pair<String, MetricSnapshot>> snapshots, TableWriter writer) { + for (var snapshot : snapshots) { + long atMillis = adjustIfRecent(snapshot.getSecond().at().toEpochMilli(), highestTimestampAdded); + if (atMillis < highestTimestampAdded) continue; // Ignore old data + highestTimestampAdded = atMillis; + TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds + row.putStr(0, snapshot.getFirst()); + row.putFloat(2, (float)snapshot.getSecond().cpu()); + row.putFloat(3, (float)snapshot.getSecond().memory()); + row.putFloat(4, (float)snapshot.getSecond().disk()); + row.putLong(5, snapshot.getSecond().generation()); + row.putBool(6, snapshot.getSecond().inService()); + row.putBool(7, snapshot.getSecond().stable()); + row.append(); + } + writer.commit(); + } + @Override public List<NodeTimeseries> getNodeTimeseries(Instant startTime, Set<String> hostnames) { try (SqlCompiler compiler = new SqlCompiler(engine)) { @@ -152,6 +169,17 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { engine.close(); } + /** + * Repairs this db on corruption. + * + * @param e the exception indicating corruption + */ + private void repair(CairoException e) { + log.log(Level.WARNING, "QuestDb seems corrupted, wiping data and starting over", e); + IOUtils.recursiveDeleteDir(new File(dataDir)); + initializeDb(); + } + private void ensureExists(String tableName) { SqlExecutionContext context = newContext(); if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), tableName)) return; |