aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java53
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java3
-rw-r--r--yolean/abi-spec.json4
-rw-r--r--yolean/src/main/java/com/yahoo/yolean/concurrent/ConcurrentResourcePool.java24
-rw-r--r--yolean/src/main/java/com/yahoo/yolean/concurrent/ResourceFactory.java9
5 files changed, 56 insertions, 37 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java
index b90b8e24f91..dbfe90b5a5b 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java
@@ -10,7 +10,6 @@ import com.yahoo.config.provision.ClusterSpec;
import com.yahoo.io.IOUtils;
import com.yahoo.vespa.defaults.Defaults;
import com.yahoo.yolean.concurrent.ConcurrentResourcePool;
-import com.yahoo.yolean.concurrent.ResourceFactory;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.DefaultCairoConfiguration;
@@ -37,8 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -59,9 +57,10 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
private final Clock clock;
private final String dataDir;
- private final AtomicReference<CairoEngine> engine = new AtomicReference<>();
+ private final CairoEngine engine;
private final ConcurrentResourcePool<SqlCompiler> sqlCompilerPool;
- private final AtomicInteger nullRecords = new AtomicInteger();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
@Inject
public QuestMetricsDb() {
@@ -82,18 +81,19 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
System.setProperty("out", logConfig);
this.dataDir = dataDir;
- engine.set(new CairoEngine(new DefaultCairoConfiguration(dataDir)));
- sqlCompilerPool = new ConcurrentResourcePool<>(new ResourceFactory<>() {
- @Override
- public SqlCompiler create() {
- return new SqlCompiler(engine.get());
- }
- });
+ engine = new CairoEngine(new DefaultCairoConfiguration(dataDir));
+ sqlCompilerPool = new ConcurrentResourcePool<>(() -> new SqlCompiler(engine()));
nodeTable = new Table(dataDir, "metrics", clock);
clusterTable = new Table(dataDir, "clusterMetrics", clock);
ensureTablesExist();
}
+ private CairoEngine engine() {
+ if (closed.get())
+ throw new IllegalStateException("Attempted to access QuestDb after calling close");
+ return engine;
+ }
+
@Override
public Clock clock() { return clock; }
@@ -190,11 +190,8 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
}
}
- public int getNullRecordsCount() { return nullRecords.get(); }
-
@Override
public void gc() {
- nullRecords.set(0);
nodeTable.gc();
clusterTable.gc();
}
@@ -204,12 +201,14 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
@Override
public void close() {
- CairoEngine myEngine = engine.getAndSet(null);
- for (SqlCompiler sqlCompiler : sqlCompilerPool) {
- sqlCompiler.close();
- }
- if (myEngine != null) {
- myEngine.close();
+ if (closed.get()) return;
+ closed.set(true);
+ synchronized (nodeTable.writeLock) {
+ synchronized (clusterTable.writeLock) {
+ for (SqlCompiler sqlCompiler : sqlCompilerPool)
+ sqlCompiler.close();
+ engine.close();
+ }
}
}
@@ -235,7 +234,7 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
private void ensureClusterTableIsUpdated() {
try {
- if (0 == engine.get().getStatus(newContext().getCairoSecurityContext(), new Path(), clusterTable.name)) {
+ if (0 == engine().getStatus(newContext().getCairoSecurityContext(), new Path(), clusterTable.name)) {
// Example: clusterTable.ensureColumnExists("write_rate", "float");
}
} catch (Exception e) {
@@ -290,10 +289,6 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
try (RecordCursor cursor = factory.getCursor(context)) {
Record record = cursor.getRecord();
while (cursor.hasNext()) {
- if (record == null || record.getStr(0) == null) { // Observed to happen. QuestDb bug?
- nullRecords.incrementAndGet();
- continue;
- }
String hostname = record.getStr(0).toString();
if (hostnames.isEmpty() || hostnames.contains(hostname)) {
snapshots.put(hostname,
@@ -345,7 +340,7 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
}
private SqlExecutionContext newContext() {
- return new SqlExecutionContextImpl(engine.get(), 1);
+ return new SqlExecutionContextImpl(engine(), 1);
}
/** A questDb table */
@@ -367,11 +362,11 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb {
}
boolean exists() {
- return 0 == engine.get().getStatus(newContext().getCairoSecurityContext(), new Path(), name);
+ return 0 == engine().getStatus(newContext().getCairoSecurityContext(), new Path(), name);
}
TableWriter getWriter() {
- return engine.get().getWriter(newContext().getCairoSecurityContext(), name);
+ return engine().getWriter(newContext().getCairoSecurityContext(), name);
}
void gc() {
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java
index d671900d08c..4d16c90c002 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java
@@ -53,9 +53,6 @@ public class NodeMetricsDbMaintainer extends NodeRepositoryMaintainer {
Thread.sleep(pauseMs);
}
- if (nodeRepository().metricsDb().getNullRecordsCount() > 0)
- log.warning(nodeRepository().metricsDb().getNullRecordsCount() + " records returned null");
-
nodeRepository().metricsDb().gc();
return asSuccessFactor(attempts, failures.get());
diff --git a/yolean/abi-spec.json b/yolean/abi-spec.json
index 82bf59ebf87..85aaaf5f64e 100644
--- a/yolean/abi-spec.json
+++ b/yolean/abi-spec.json
@@ -169,6 +169,7 @@
],
"methods": [
"public void <init>(com.yahoo.yolean.concurrent.ResourceFactory)",
+ "public void <init>(java.util.function.Supplier)",
"public final java.lang.Object alloc()",
"public final void free(java.lang.Object)",
"public java.util.Iterator iterator()"
@@ -209,7 +210,8 @@
],
"methods": [
"public void <init>()",
- "public abstract java.lang.Object create()"
+ "public abstract java.lang.Object create()",
+ "public final java.util.function.Supplier asSupplier()"
],
"fields": []
},
diff --git a/yolean/src/main/java/com/yahoo/yolean/concurrent/ConcurrentResourcePool.java b/yolean/src/main/java/com/yahoo/yolean/concurrent/ConcurrentResourcePool.java
index 24d4cfe4318..a00de4866d0 100644
--- a/yolean/src/main/java/com/yahoo/yolean/concurrent/ConcurrentResourcePool.java
+++ b/yolean/src/main/java/com/yahoo/yolean/concurrent/ConcurrentResourcePool.java
@@ -4,24 +4,41 @@ package com.yahoo.yolean.concurrent;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
/**
+ * A pool of a resource. This create new instances of the resource on request until enough are created
+ * to deliver a unique one to all threads needing one concurrently and then reuse those instances
+ * in subsequent requests.
+ *
* @author baldersheim
*/
public class ConcurrentResourcePool<T> implements Iterable<T> {
private final Queue<T> pool = new ConcurrentLinkedQueue<>();
- private final ResourceFactory<T> factory;
+ private final Supplier<T> factory;
+ // TODO: Deprecate
public ConcurrentResourcePool(ResourceFactory<T> factory) {
+ this.factory = factory.asSupplier();
+ }
+
+ public ConcurrentResourcePool(Supplier<T> factory) {
this.factory = factory;
}
+ /**
+ * Allocates an instance of the resource to the requestor.
+ * The resource will be allocated exclusively to the requestor until it calls free(instance).
+ *
+ * @return a reused or newly created instance of the resource
+ */
public final T alloc() {
- final T e = pool.poll();
- return e != null ? e : factory.create();
+ T e = pool.poll();
+ return e != null ? e : factory.get();
}
+ /** Frees an instance previously acquired bty alloc */
public final void free(T e) {
pool.offer(e);
}
@@ -30,4 +47,5 @@ public class ConcurrentResourcePool<T> implements Iterable<T> {
public Iterator<T> iterator() {
return pool.iterator();
}
+
}
diff --git a/yolean/src/main/java/com/yahoo/yolean/concurrent/ResourceFactory.java b/yolean/src/main/java/com/yahoo/yolean/concurrent/ResourceFactory.java
index f926283a47f..3a99b189ed8 100644
--- a/yolean/src/main/java/com/yahoo/yolean/concurrent/ResourceFactory.java
+++ b/yolean/src/main/java/com/yahoo/yolean/concurrent/ResourceFactory.java
@@ -1,11 +1,18 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.yolean.concurrent;
+import java.util.function.Supplier;
+
/**
* @author baldersheim
- * @since 5.2
*/
+// TODO: Deprecate
public abstract class ResourceFactory<T> {
public abstract T create();
+
+ public final Supplier<T> asSupplier() {
+ return () -> create();
+ }
+
}