diff options
5 files changed, 56 insertions, 37 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java index b90b8e24f91..dbfe90b5a5b 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/QuestMetricsDb.java @@ -10,7 +10,6 @@ import com.yahoo.config.provision.ClusterSpec; import com.yahoo.io.IOUtils; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.yolean.concurrent.ConcurrentResourcePool; -import com.yahoo.yolean.concurrent.ResourceFactory; import io.questdb.cairo.CairoEngine; import io.questdb.cairo.CairoException; import io.questdb.cairo.DefaultCairoConfiguration; @@ -37,8 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -59,9 +57,10 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { private final Clock clock; private final String dataDir; - private final AtomicReference<CairoEngine> engine = new AtomicReference<>(); + private final CairoEngine engine; private final ConcurrentResourcePool<SqlCompiler> sqlCompilerPool; - private final AtomicInteger nullRecords = new AtomicInteger(); + private final AtomicBoolean closed = new AtomicBoolean(false); + @Inject public QuestMetricsDb() { @@ -82,18 +81,19 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { System.setProperty("out", logConfig); this.dataDir = dataDir; - engine.set(new CairoEngine(new DefaultCairoConfiguration(dataDir))); - sqlCompilerPool = new ConcurrentResourcePool<>(new ResourceFactory<>() { - @Override - public SqlCompiler create() { - return new SqlCompiler(engine.get()); - } - }); + engine = new CairoEngine(new DefaultCairoConfiguration(dataDir)); + sqlCompilerPool = new ConcurrentResourcePool<>(() -> new SqlCompiler(engine())); nodeTable = new Table(dataDir, "metrics", clock); clusterTable = new Table(dataDir, "clusterMetrics", clock); ensureTablesExist(); } + private CairoEngine engine() { + if (closed.get()) + throw new IllegalStateException("Attempted to access QuestDb after calling close"); + return engine; + } + @Override public Clock clock() { return clock; } @@ -190,11 +190,8 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { } } - public int getNullRecordsCount() { return nullRecords.get(); } - @Override public void gc() { - nullRecords.set(0); nodeTable.gc(); clusterTable.gc(); } @@ -204,12 +201,14 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { @Override public void close() { - CairoEngine myEngine = engine.getAndSet(null); - for (SqlCompiler sqlCompiler : sqlCompilerPool) { - sqlCompiler.close(); - } - if (myEngine != null) { - myEngine.close(); + if (closed.get()) return; + closed.set(true); + synchronized (nodeTable.writeLock) { + synchronized (clusterTable.writeLock) { + for (SqlCompiler sqlCompiler : sqlCompilerPool) + sqlCompiler.close(); + engine.close(); + } } } @@ -235,7 +234,7 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { private void ensureClusterTableIsUpdated() { try { - if (0 == engine.get().getStatus(newContext().getCairoSecurityContext(), new Path(), clusterTable.name)) { + if (0 == engine().getStatus(newContext().getCairoSecurityContext(), new Path(), clusterTable.name)) { // Example: clusterTable.ensureColumnExists("write_rate", "float"); } } catch (Exception e) { @@ -290,10 +289,6 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { try (RecordCursor cursor = factory.getCursor(context)) { Record record = cursor.getRecord(); while (cursor.hasNext()) { - if (record == null || record.getStr(0) == null) { // Observed to happen. QuestDb bug? - nullRecords.incrementAndGet(); - continue; - } String hostname = record.getStr(0).toString(); if (hostnames.isEmpty() || hostnames.contains(hostname)) { snapshots.put(hostname, @@ -345,7 +340,7 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { } private SqlExecutionContext newContext() { - return new SqlExecutionContextImpl(engine.get(), 1); + return new SqlExecutionContextImpl(engine(), 1); } /** A questDb table */ @@ -367,11 +362,11 @@ public class QuestMetricsDb extends AbstractComponent implements MetricsDb { } boolean exists() { - return 0 == engine.get().getStatus(newContext().getCairoSecurityContext(), new Path(), name); + return 0 == engine().getStatus(newContext().getCairoSecurityContext(), new Path(), name); } TableWriter getWriter() { - return engine.get().getWriter(newContext().getCairoSecurityContext(), name); + return engine().getWriter(newContext().getCairoSecurityContext(), name); } void gc() { diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java index d671900d08c..4d16c90c002 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeMetricsDbMaintainer.java @@ -53,9 +53,6 @@ public class NodeMetricsDbMaintainer extends NodeRepositoryMaintainer { Thread.sleep(pauseMs); } - if (nodeRepository().metricsDb().getNullRecordsCount() > 0) - log.warning(nodeRepository().metricsDb().getNullRecordsCount() + " records returned null"); - nodeRepository().metricsDb().gc(); return asSuccessFactor(attempts, failures.get()); diff --git a/yolean/abi-spec.json b/yolean/abi-spec.json index 82bf59ebf87..85aaaf5f64e 100644 --- a/yolean/abi-spec.json +++ b/yolean/abi-spec.json @@ -169,6 +169,7 @@ ], "methods": [ "public void <init>(com.yahoo.yolean.concurrent.ResourceFactory)", + "public void <init>(java.util.function.Supplier)", "public final java.lang.Object alloc()", "public final void free(java.lang.Object)", "public java.util.Iterator iterator()" @@ -209,7 +210,8 @@ ], "methods": [ "public void <init>()", - "public abstract java.lang.Object create()" + "public abstract java.lang.Object create()", + "public final java.util.function.Supplier asSupplier()" ], "fields": [] }, diff --git a/yolean/src/main/java/com/yahoo/yolean/concurrent/ConcurrentResourcePool.java b/yolean/src/main/java/com/yahoo/yolean/concurrent/ConcurrentResourcePool.java index 24d4cfe4318..a00de4866d0 100644 --- a/yolean/src/main/java/com/yahoo/yolean/concurrent/ConcurrentResourcePool.java +++ b/yolean/src/main/java/com/yahoo/yolean/concurrent/ConcurrentResourcePool.java @@ -4,24 +4,41 @@ package com.yahoo.yolean.concurrent; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Supplier; /** + * A pool of a resource. This create new instances of the resource on request until enough are created + * to deliver a unique one to all threads needing one concurrently and then reuse those instances + * in subsequent requests. + * * @author baldersheim */ public class ConcurrentResourcePool<T> implements Iterable<T> { private final Queue<T> pool = new ConcurrentLinkedQueue<>(); - private final ResourceFactory<T> factory; + private final Supplier<T> factory; + // TODO: Deprecate public ConcurrentResourcePool(ResourceFactory<T> factory) { + this.factory = factory.asSupplier(); + } + + public ConcurrentResourcePool(Supplier<T> factory) { this.factory = factory; } + /** + * Allocates an instance of the resource to the requestor. + * The resource will be allocated exclusively to the requestor until it calls free(instance). + * + * @return a reused or newly created instance of the resource + */ public final T alloc() { - final T e = pool.poll(); - return e != null ? e : factory.create(); + T e = pool.poll(); + return e != null ? e : factory.get(); } + /** Frees an instance previously acquired bty alloc */ public final void free(T e) { pool.offer(e); } @@ -30,4 +47,5 @@ public class ConcurrentResourcePool<T> implements Iterable<T> { public Iterator<T> iterator() { return pool.iterator(); } + } diff --git a/yolean/src/main/java/com/yahoo/yolean/concurrent/ResourceFactory.java b/yolean/src/main/java/com/yahoo/yolean/concurrent/ResourceFactory.java index f926283a47f..3a99b189ed8 100644 --- a/yolean/src/main/java/com/yahoo/yolean/concurrent/ResourceFactory.java +++ b/yolean/src/main/java/com/yahoo/yolean/concurrent/ResourceFactory.java @@ -1,11 +1,18 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.yolean.concurrent; +import java.util.function.Supplier; + /** * @author baldersheim - * @since 5.2 */ +// TODO: Deprecate public abstract class ResourceFactory<T> { public abstract T create(); + + public final Supplier<T> asSupplier() { + return () -> create(); + } + } |