aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@oath.com>2021-06-26 00:01:13 +0200
committerGitHub <noreply@github.com>2021-06-26 00:01:13 +0200
commita644c0ffa9c047f1719999a250c498ac757fb31c (patch)
tree9cf0f705ca624da740feff2fd54c9c2becd54f00
parentb5a6980e0d88c55aed11d7a5db7cf60d32e1c771 (diff)
parente966dc36f5c086b853b151074f66619ce1a65cd0 (diff)
Merge pull request #18418 from vespa-engine/bratseth/lock-tables-rebased
Bratseth/lock tables rebased
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java1
-rw-r--r--container-search/src/main/java/com/yahoo/search/grouping/GroupingRequest.java5
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java360
3 files changed, 194 insertions, 172 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java
index 740b9592efc..2b91941d1d4 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/GroupingListHit.java
@@ -26,4 +26,5 @@ public class GroupingListHit extends Hit {
private final List<Grouping> groupingList;
private final DocsumDefinitionSet defs;
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/grouping/GroupingRequest.java b/container-search/src/main/java/com/yahoo/search/grouping/GroupingRequest.java
index 13c23234910..8e57434a049 100644
--- a/container-search/src/main/java/com/yahoo/search/grouping/GroupingRequest.java
+++ b/container-search/src/main/java/com/yahoo/search/grouping/GroupingRequest.java
@@ -2,7 +2,6 @@
package com.yahoo.search.grouping;
import com.yahoo.net.URI;
-import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.grouping.request.GroupingOperation;
@@ -12,7 +11,9 @@ import com.yahoo.search.grouping.result.RootId;
import com.yahoo.search.query.Select;
import com.yahoo.search.result.Hit;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
/**
* An instance of this class represents one of many grouping requests that are attached to a {@link Query}. Use the
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java
index dc9d5b22f5a..3a83486cddb 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -49,15 +50,14 @@ import java.util.stream.Collectors;
public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
private static final Logger log = Logger.getLogger(QuestMetricsDb.class.getName());
- private static final String nodeTable = "metrics";
- private static final String clusterTable = "clusterMetrics";
+
+ private final Table nodeTable;
+ private final Table clusterTable;
private final Clock clock;
private final String dataDir;
- private CairoEngine engine;
- private ThreadLocal<SqlCompiler> sqlCompiler;
-
- private long highestTimestampAdded = 0;
+ private final CairoEngine engine;
+ private final ThreadLocal<SqlCompiler> sqlCompiler;
private volatile int nullRecords = 0;
@@ -72,24 +72,18 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
if (dataDir.startsWith(Defaults.getDefaults().vespaHome())
&& ! new File(Defaults.getDefaults().vespaHome()).exists())
dataDir = "data"; // We're injected, but not on a node with Vespa installed
- this.dataDir = dataDir;
- initializeDb();
- }
-
- private void initializeDb() {
- IOUtils.createDirectory(dataDir + "/" + nodeTable);
- IOUtils.createDirectory(dataDir + "/" + clusterTable);
-
- // https://stackoverflow.com/questions/67785629/what-does-max-txn-txn-inflight-limit-reached-in-questdb-and-how-to-i-avoid-it
- new File(dataDir + "/" + nodeTable + "/_txn_scoreboard").delete();
- new File(dataDir + "/" + clusterTable + "/_txn_scoreboard").delete();
// silence Questdb's custom logging system
- IOUtils.writeFile(new File(dataDir, "quest-log.conf"), new byte[0]);
- System.setProperty("out", dataDir + "/quest-log.conf");
- CairoConfiguration configuration = new DefaultCairoConfiguration(dataDir);
- engine = new CairoEngine(configuration);
+ String logConfig = dataDir + "/quest-log.conf";
+ IOUtils.createDirectory(logConfig);
+ IOUtils.writeFile(new File(logConfig), new byte[0]);
+ System.setProperty("out", logConfig);
+
+ this.dataDir = dataDir;
+ engine = new CairoEngine(new DefaultCairoConfiguration(dataDir));
sqlCompiler = ThreadLocal.withInitial(() -> new SqlCompiler(engine));
+ nodeTable = new Table(dataDir, "metrics", clock);
+ clusterTable = new Table(dataDir, "clusterMetrics", clock);
ensureTablesExist();
}
@@ -98,14 +92,14 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
@Override
public void addNodeMetrics(Collection<Pair<String, NodeMetricSnapshot>> snapshots) {
- try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), nodeTable)) {
+ try (TableWriter writer = nodeTable.getWriter()) {
addNodeMetrics(snapshots, writer);
}
catch (CairoException e) {
if (e.getMessage().contains("Cannot read offset")) {
// This error seems non-recoverable
- repair(e);
- try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), nodeTable)) {
+ nodeTable.repair(e);
+ try (TableWriter writer = nodeTable.getWriter()) {
addNodeMetrics(snapshots, writer);
}
}
@@ -113,35 +107,36 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
}
private void addNodeMetrics(Collection<Pair<String, NodeMetricSnapshot>> snapshots, TableWriter writer) {
- for (var snapshot : snapshots) {
- long atMillis = adjustIfRecent(snapshot.getSecond().at().toEpochMilli(), highestTimestampAdded);
- if (atMillis < highestTimestampAdded) continue; // Ignore old data
- highestTimestampAdded = atMillis;
- TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds
- row.putStr(0, snapshot.getFirst());
- // (1 is timestamp)
- row.putFloat(2, (float)snapshot.getSecond().load().cpu());
- row.putFloat(3, (float)snapshot.getSecond().load().memory());
- row.putFloat(4, (float)snapshot.getSecond().load().disk());
- row.putLong(5, snapshot.getSecond().generation());
- row.putBool(6, snapshot.getSecond().inService());
- row.putBool(7, snapshot.getSecond().stable());
- row.putFloat(8, (float)snapshot.getSecond().queryRate());
- row.append();
+ synchronized (nodeTable.writeLock) {
+ for (var snapshot : snapshots) {
+ Optional<Long> atMillis = nodeTable.adjustOrDiscard(snapshot.getSecond().at());
+ if (atMillis.isEmpty()) continue;
+ TableWriter.Row row = writer.newRow(atMillis.get() * 1000); // in microseconds
+ row.putStr(0, snapshot.getFirst());
+ // (1 is timestamp)
+ row.putFloat(2, (float) snapshot.getSecond().load().cpu());
+ row.putFloat(3, (float) snapshot.getSecond().load().memory());
+ row.putFloat(4, (float) snapshot.getSecond().load().disk());
+ row.putLong(5, snapshot.getSecond().generation());
+ row.putBool(6, snapshot.getSecond().inService());
+ row.putBool(7, snapshot.getSecond().stable());
+ row.putFloat(8, (float) snapshot.getSecond().queryRate());
+ row.append();
+ }
+ writer.commit();
}
- writer.commit();
}
@Override
public void addClusterMetrics(ApplicationId application, Map<ClusterSpec.Id, ClusterMetricSnapshot> snapshots) {
- try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), clusterTable)) {
+ try (TableWriter writer = clusterTable.getWriter()) {
addClusterMetrics(application, snapshots, writer);
}
catch (CairoException e) {
if (e.getMessage().contains("Cannot read offset")) {
// This error seems non-recoverable
- repair(e);
- try (TableWriter writer = engine.getWriter(newContext().getCairoSecurityContext(), clusterTable)) {
+ clusterTable.repair(e);
+ try (TableWriter writer = clusterTable.getWriter()) {
addClusterMetrics(application, snapshots, writer);
}
}
@@ -149,19 +144,20 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
}
private void addClusterMetrics(ApplicationId applicationId, Map<ClusterSpec.Id, ClusterMetricSnapshot> snapshots, TableWriter writer) {
- for (var snapshot : snapshots.entrySet()) {
- long atMillis = adjustIfRecent(snapshot.getValue().at().toEpochMilli(), highestTimestampAdded);
- if (atMillis < highestTimestampAdded) continue; // Ignore old data
- highestTimestampAdded = atMillis;
- TableWriter.Row row = writer.newRow(atMillis * 1000); // in microseconds
- row.putStr(0, applicationId.serializedForm());
- row.putStr(1, snapshot.getKey().value());
- // (2 is timestamp)
- row.putFloat(3, (float)snapshot.getValue().queryRate());
- row.putFloat(4, (float)snapshot.getValue().writeRate());
- row.append();
+ synchronized (clusterTable.writeLock) {
+ for (var snapshot : snapshots.entrySet()) {
+ Optional<Long> atMillis = clusterTable.adjustOrDiscard(snapshot.getValue().at());
+ if (atMillis.isEmpty()) continue;
+ TableWriter.Row row = writer.newRow(atMillis.get() * 1000); // in microseconds
+ row.putStr(0, applicationId.serializedForm());
+ row.putStr(1, snapshot.getKey().value());
+ // (2 is timestamp)
+ row.putFloat(3, (float) snapshot.getValue().queryRate());
+ row.putFloat(4, (float) snapshot.getValue().writeRate());
+ row.append();
+ }
+ writer.commit();
}
- writer.commit();
}
@Override
@@ -192,39 +188,8 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
@Override
public void gc() {
nullRecords = 0;
- gc(nodeTable);
- gc(clusterTable);
- }
-
- private void gc(String table) {
- // We remove full days at once and we want to see at least three days to not every only see weekend data
- Instant oldestToKeep = clock.instant().minus(Duration.ofDays(4));
- SqlExecutionContext context = newContext();
- int partitions = 0;
- try {
- File tableRoot = new File(dataDir, table);
- List<String> removeList = new ArrayList<>();
- for (String dirEntry : tableRoot.list()) {
- File partitionDir = new File(tableRoot, dirEntry);
- if ( ! partitionDir.isDirectory()) continue;
-
- partitions++;
- DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC"));
- Instant partitionDay = Instant.from(formatter.parse(dirEntry + "T00:00:00"));
- if (partitionDay.isBefore(oldestToKeep))
- removeList.add(dirEntry);
-
- }
- // Remove unless all partitions are old: Removing all partitions "will be supported in the future"
- if ( removeList.size() < partitions && ! removeList.isEmpty()) {
- issue("alter table " + table + " drop partition list " +
- removeList.stream().map(dir -> "'" + dir + "'").collect(Collectors.joining(",")),
- context);
- }
- }
- catch (SqlException e) {
- log.log(Level.WARNING, "Failed to gc old metrics data in " + dataDir + " table " + table, e);
- }
+ nodeTable.gc();
+ clusterTable.gc();
}
@Override
@@ -236,122 +201,74 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
engine.close();
}
- /**
- * Repairs this db on corruption.
- *
- * @param e the exception indicating corruption
- */
- private void repair(Exception e) {
- log.log(Level.WARNING, "QuestDb seems corrupted, wiping data and starting over", e);
- IOUtils.recursiveDeleteDir(new File(dataDir));
- initializeDb();
- }
-
- private boolean exists(String table, SqlExecutionContext context) {
- return 0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), table);
- }
-
private void ensureTablesExist() {
- SqlExecutionContext context = newContext();
- if (exists(nodeTable, context))
- ensureNodeTableIsUpdated(context);
+ if (nodeTable.exists())
+ ensureNodeTableIsUpdated();
else
- createNodeTable(context);
+ createNodeTable();
- if (exists(clusterTable, context))
- ensureClusterTableIsUpdated(context);
+ if (clusterTable.exists())
+ ensureClusterTableIsUpdated();
else
- createClusterTable(context);
+ createClusterTable();
+ }
+
+ private void ensureNodeTableIsUpdated() {
+ try {
+ // Example: nodeTable.ensureColumnExists("write_rate", "float");
+ } catch (Exception e) {
+ nodeTable.repair(e);
+ }
}
- private void createNodeTable(SqlExecutionContext context) {
+ private void ensureClusterTableIsUpdated() {
try {
- issue("create table " + nodeTable +
+ if (0 == engine.getStatus(newContext().getCairoSecurityContext(), new Path(), clusterTable.name)) {
+ // Example: clusterTable.ensureColumnExists("write_rate", "float");
+ }
+ } catch (Exception e) {
+ clusterTable.repair(e);
+ }
+ }
+
+ private void createNodeTable() {
+ try {
+ issue("create table " + nodeTable.name +
" (hostname string, at timestamp, cpu_util float, mem_total_util float, disk_util float," +
" application_generation long, inService boolean, stable boolean, queries_rate float)" +
" timestamp(at)" +
"PARTITION BY DAY;",
- context);
+ newContext());
// We should do this if we get a version where selecting on strings work embedded, see below
// compiler.compile("alter table " + tableName + " alter column hostname add index", context);
}
catch (SqlException e) {
- throw new IllegalStateException("Could not create Quest db table '" + nodeTable + "'", e);
+ throw new IllegalStateException("Could not create Quest db table '" + nodeTable.name + "'", e);
}
}
- private void createClusterTable(SqlExecutionContext context) {
+ private void createClusterTable() {
try {
- issue("create table " + clusterTable +
+ issue("create table " + clusterTable.name +
" (application string, cluster string, at timestamp, queries_rate float, write_rate float)" +
" timestamp(at)" +
"PARTITION BY DAY;",
- context);
+ newContext());
// We should do this if we get a version where selecting on strings work embedded, see below
// compiler.compile("alter table " + tableName + " alter column cluster add index", context);
}
catch (SqlException e) {
- throw new IllegalStateException("Could not create Quest db table '" + clusterTable + "'", e);
+ throw new IllegalStateException("Could not create Quest db table '" + clusterTable.name + "'", e);
}
}
- private void ensureNodeTableIsUpdated(SqlExecutionContext context) {
- try {
- if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), nodeTable)) {
- ensureColumnExists("queries_rate", "float", nodeTable,context); // TODO: Remove after March 2021
- }
- } catch (SqlException e) {
- repair(e);
- }
- }
-
- private void ensureClusterTableIsUpdated(SqlExecutionContext context) {
- try {
- if (0 == engine.getStatus(context.getCairoSecurityContext(), new Path(), nodeTable)) {
- ensureColumnExists("write_rate", "float", nodeTable, context); // TODO: Remove after March 2021
- }
- } catch (SqlException e) {
- repair(e);
- }
- }
-
- private void ensureColumnExists(String column, String columnType,
- String table, SqlExecutionContext context) throws SqlException {
- if (columnNamesOf(table, context).contains(column)) return;
- issue("alter table " + table + " add column " + column + " " + columnType, context);
- }
-
- private List<String> columnNamesOf(String tableName, SqlExecutionContext context) throws SqlException {
- List<String> columns = new ArrayList<>();
- try (RecordCursorFactory factory = issue("show columns from " + tableName, context).getRecordCursorFactory()) {
- try (RecordCursor cursor = factory.getCursor(context)) {
- Record record = cursor.getRecord();
- while (cursor.hasNext()) {
- columns.add(record.getStr(0).toString());
- }
- }
- }
- return columns;
- }
-
- private long adjustIfRecent(long timestamp, long highestTimestampAdded) {
- if (timestamp >= highestTimestampAdded) return timestamp;
-
- // We cannot add old data to QuestDb, but we want to use all recent information
- long oneMinute = 60 * 1000;
- if (timestamp >= highestTimestampAdded - oneMinute) return highestTimestampAdded;
-
- // Too old; discard
- return timestamp;
- }
-
private ListMap<String, NodeMetricSnapshot> getNodeSnapshots(Instant startTime,
Set<String> hostnames,
SqlExecutionContext context) throws SqlException {
DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC"));
String from = formatter.format(startTime).substring(0, 19) + ".000000Z";
String to = formatter.format(clock.instant()).substring(0, 19) + ".000000Z";
- String sql = "select * from " + nodeTable + " where at between('" + from + "', '" + to + "');";
+ String sql = "select * from " + nodeTable.name + " where at between('" + from + "', '" + to + "');";
// WHERE clauses does not work:
// String sql = "select * from " + tableName + " where hostname in('host1', 'host2', 'host3');";
@@ -384,7 +301,7 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
}
private ClusterTimeseries getClusterSnapshots(ApplicationId application, ClusterSpec.Id cluster) throws SqlException {
- String sql = "select * from " + clusterTable;
+ String sql = "select * from " + clusterTable.name;
var context = newContext();
try (RecordCursorFactory factory = issue(sql, context).getRecordCursorFactory()) {
List<ClusterMetricSnapshot> snapshots = new ArrayList<>();
@@ -414,4 +331,107 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
return new SqlExecutionContextImpl(engine, 1);
}
+ /** A questDb table */
+ private class Table {
+
+ private final Object writeLock = new Object();
+ private final String name;
+ private final Clock clock;
+ private final File dir;
+ private long highestTimestampAdded = 0;
+
+ Table(String dataDir, String name, Clock clock) {
+ this.name = name;
+ this.clock = clock;
+ this.dir = new File(dataDir, name);
+ IOUtils.createDirectory(dir.getPath());
+ // https://stackoverflow.com/questions/67785629/what-does-max-txn-txn-inflight-limit-reached-in-questdb-and-how-to-i-avoid-it
+ new File(dir + "/_txn_scoreboard").delete();
+ }
+
+ boolean exists() {
+ return 0 == engine.getStatus(newContext().getCairoSecurityContext(), new Path(), name);
+ }
+
+ TableWriter getWriter() {
+ return engine.getWriter(newContext().getCairoSecurityContext(), name);
+ }
+
+ void gc() {
+ // We remove full days at once and we want to see at least three days to not every only see weekend data
+ Instant oldestToKeep = clock.instant().minus(Duration.ofDays(4));
+ SqlExecutionContext context = newContext();
+ int partitions = 0;
+ try {
+ List<String> removeList = new ArrayList<>();
+ for (String dirEntry : dir.list()) {
+ File partitionDir = new File(dir, dirEntry);
+ if ( ! partitionDir.isDirectory()) continue;
+
+ partitions++;
+ DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.of("UTC"));
+ Instant partitionDay = Instant.from(formatter.parse(dirEntry + "T00:00:00"));
+ if (partitionDay.isBefore(oldestToKeep))
+ removeList.add(dirEntry);
+
+ }
+ // Remove unless all partitions are old: Removing all partitions "will be supported in the future"
+ if ( removeList.size() < partitions && ! removeList.isEmpty()) {
+ issue("alter table " + name + " drop partition list " +
+ removeList.stream().map(dir -> "'" + dir + "'").collect(Collectors.joining(",")),
+ context);
+ }
+ }
+ catch (SqlException e) {
+ log.log(Level.WARNING, "Failed to gc old metrics data in " + dir + " table " + name, e);
+ }
+ }
+
+ /**
+ * Repairs this db on corruption.
+ *
+ * @param e the exception indicating corruption
+ */
+ private void repair(Exception e) {
+ log.log(Level.WARNING, "QuestDb seems corrupted, wiping data and starting over", e);
+ IOUtils.recursiveDeleteDir(dir);
+ IOUtils.createDirectory(dir.getPath());
+ ensureTablesExist();
+ }
+
+ void ensureColumnExists(String column, String columnType) throws SqlException {
+ if (columnNames().contains(column)) return;
+ issue("alter table " + name + " add column " + column + " " + columnType, newContext());
+ }
+
+ private Optional<Long> adjustOrDiscard(Instant at) {
+ long timestamp = at.toEpochMilli();
+ if (timestamp >= highestTimestampAdded) {
+ highestTimestampAdded = timestamp;
+ return Optional.of(timestamp);
+ }
+
+ // We cannot add old data to QuestDb, but we want to use all recent information
+ if (timestamp >= highestTimestampAdded - 60 * 1000) return Optional.of(highestTimestampAdded);
+
+ // Too old; discard
+ return Optional.empty();
+ }
+
+ private List<String> columnNames() throws SqlException {
+ var context = newContext();
+ List<String> columns = new ArrayList<>();
+ try (RecordCursorFactory factory = issue("show columns from " + name, context).getRecordCursorFactory()) {
+ try (RecordCursor cursor = factory.getCursor(context)) {
+ Record record = cursor.getRecord();
+ while (cursor.hasNext()) {
+ columns.add(record.getStr(0).toString());
+ }
+ }
+ }
+ return columns;
+ }
+
+ }
+
}