summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java8
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java68
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java110
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java27
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Logger.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java127
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java175
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/package-info.java6
-rw-r--r--container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def5
-rw-r--r--container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java50
-rw-r--r--container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java107
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/archive/CuratorArchiveBucketDb.java5
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ArchiveAccessMaintainer.java2
-rw-r--r--eval/src/vespa/eval/instruction/simple_join_count.h2
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java12
-rw-r--r--parent/pom.xml2
-rw-r--r--searchlib/src/vespa/searchlib/attribute/postingstore.cpp14
-rw-r--r--searchlib/src/vespa/searchlib/tensor/small_subspaces_buffer_type.h2
-rw-r--r--vdslib/src/main/java/com/yahoo/vdslib/state/State.java29
-rw-r--r--vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp27
-rw-r--r--vespalib/src/tests/btree/btree_store/btree_store_test.cpp18
-rw-r--r--vespalib/src/tests/datastore/array_store/array_store_test.cpp8
-rw-r--r--vespalib/src/tests/datastore/free_list/CMakeLists.txt9
-rw-r--r--vespalib/src/tests/datastore/free_list/free_list_test.cpp127
-rw-r--r--vespalib/src/vespa/vespalib/btree/btree.hpp4
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreenodeallocator.h2
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreenodestore.h4
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreenodestore.hpp5
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreestore.h12
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreestore.hpp44
-rw-r--r--vespalib/src/vespa/vespalib/datastore/CMakeLists.txt8
-rw-r--r--vespalib/src/vespa/vespalib/datastore/array_store.hpp23
-rw-r--r--vespalib/src/vespa/vespalib/datastore/buffer_free_list.cpp56
-rw-r--r--vespalib/src/vespa/vespalib/datastore/buffer_free_list.h56
-rw-r--r--vespalib/src/vespa/vespalib/datastore/compacting_buffers.cpp38
-rw-r--r--vespalib/src/vespa/vespalib/datastore/compacting_buffers.h32
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastore.hpp2
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastorebase.cpp10
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastorebase.h6
-rw-r--r--vespalib/src/vespa/vespalib/datastore/free_list.cpp36
-rw-r--r--vespalib/src/vespa/vespalib/datastore/free_list.h34
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store.hpp36
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h10
44 files changed, 1195 insertions, 177 deletions
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java
index bcef4af6620..3703878f595 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java
@@ -217,8 +217,12 @@ class ServletRequestReader {
synchronized (monitor) {
errorDuringRead = t;
- if (state != State.READING) return;
- state = State.ALL_DATA_READ;
+ if (state == State.REQUEST_CONTENT_CLOSED) {
+ return;
+ }
+ if (state == State.READING) {
+ state = State.ALL_DATA_READ;
+ }
shouldCloseRequestContentChannel = numberOfOutstandingUserCalls == 0;
if (shouldCloseRequestContentChannel) {
state = State.REQUEST_CONTENT_CLOSED;
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
new file mode 100644
index 00000000000..5cb3aa24f98
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
@@ -0,0 +1,68 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.logging;
+
+import com.yahoo.concurrent.DaemonThreadFactory;
+import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+
+/**
+ * Abstract class that deals with storing event entries on disk and making sure all stored
+ * entries are eventually sent
+ *
+ * @author hmusum
+ */
+abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements Runnable {
+
+ protected static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName());
+
+ private static final ScheduledExecutorService executorService =
+ new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("AbstractSpoolingLogger-send-"));
+
+ protected final Spooler spooler;
+
+ public AbstractSpoolingLogger() {
+ this(new Spooler());
+ }
+
+ public AbstractSpoolingLogger(Spooler spooler) {
+ this.spooler = spooler;
+ executorService.scheduleWithFixedDelay(this, 0, 10L, TimeUnit.MILLISECONDS);
+ }
+
+ public void run() {
+ try {
+ var entries = spooler.processFiles();
+ log.log(Level.INFO, "Entries: " + entries.size());
+ entries.forEach(this::transport);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public boolean send(LoggerEntry entry) {
+ log.log(Level.INFO, "Sending");
+ try {
+ executor.execute(() -> spooler.write(entry));
+ } catch (RejectedExecutionException e) {
+ return false;
+ }
+ return true;
+ }
+
+ // TODO Call from a component or make this class a component
+ public void shutdown() {
+ executorService.shutdown();
+ try {
+ if ( ! executorService.awaitTermination(10, TimeUnit.SECONDS))
+ log.log(Level.WARNING, "Timeout elapsed waiting for termination");
+ } catch (InterruptedException e) {
+ log.log(Level.WARNING, "Failure when waiting for termination: " + e.getMessage());
+ }
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
new file mode 100644
index 00000000000..1df236a037f
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
@@ -0,0 +1,110 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+
+abstract class AbstractThreadedLogger implements Logger {
+
+ private final static java.util.logging.Logger log = java.util.logging.Logger.getLogger(AbstractThreadedLogger.class.getName());
+
+ final static int DEFAULT_MAX_THREADS = 1;
+ final static int DEFAULT_QUEUE_SIZE = 1000;
+
+ protected final WorkerThreadExecutor executor;
+
+ AbstractThreadedLogger() {
+ this(DEFAULT_MAX_THREADS, DEFAULT_QUEUE_SIZE);
+ }
+
+ AbstractThreadedLogger(int threads, int queueSize) {
+ executor = new WorkerThreadExecutor(threads, queueSize);
+ }
+
+ AbstractThreadedLogger(int threads, int queueSize, ThreadFactory factory) {
+ executor = new WorkerThreadExecutor(threads, queueSize, factory);
+ }
+
+ @Override
+ public boolean send(LoggerEntry entry) {
+ return enqueue(entry);
+ }
+
+ protected boolean enqueue(LoggerEntry entry) {
+ // Todo: metric things
+ try {
+ executor.execute(() -> dequeue(entry));
+ } catch (RejectedExecutionException e) {
+ return false;
+ }
+ return true;
+ }
+
+ protected void dequeue(LoggerEntry entry) {
+ transport(entry); // This happens in worker thread
+ }
+
+ /**
+ * Actually transports the entry to it's destination
+ */
+ abstract void transport(LoggerEntry entry);
+
+
+ private static class WorkerThread extends Thread {
+
+ public WorkerThread(Runnable r) {
+ super(r);
+ }
+
+ @Override
+ public void run() {
+ try {
+ super.run();
+ } catch (Exception e) {
+ log.log(Level.SEVERE, String.format("Error while sending logger entry: %s", e), e);
+ }
+ }
+
+ }
+
+ protected static class WorkerThreadExecutor implements Executor {
+
+ protected final ThreadPoolExecutor executor;
+
+ WorkerThreadExecutor(int threads, int queueSize) {
+ this(threads, queueSize, WorkerThread::new);
+ }
+
+ WorkerThreadExecutor(int threads, int queueSize, ThreadFactory threadFactory) {
+ executor = new ThreadPoolExecutor(
+ threads, threads,
+ 0L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(queueSize),
+ threadFactory);
+ }
+
+ public void close() {
+ try {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ //
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Override
+ public void execute(Runnable r) {
+ executor.execute(r);
+ }
+
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java
new file mode 100644
index 00000000000..6d815059066
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class LocalDiskLogger extends AbstractThreadedLogger {
+
+ private String logFilePath;
+
+ public LocalDiskLogger(LocalDiskLoggerConfig config) {
+ logFilePath = config.path();
+ }
+
+ @Override
+ void transport(LoggerEntry entry) {
+ String json = entry.toJson();
+ try (FileWriter fw = new FileWriter(logFilePath, true)) {
+ fw.write(json);
+ fw.write(System.getProperty("line.separator"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Logger.java b/container-search/src/main/java/com/yahoo/search/logging/Logger.java
new file mode 100644
index 00000000000..3938b01c7b7
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/Logger.java
@@ -0,0 +1,13 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+public interface Logger {
+
+ default LoggerEntry.Builder newEntry() {
+ return new LoggerEntry.Builder(this);
+ }
+
+ boolean send(LoggerEntry entry);
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
new file mode 100644
index 00000000000..692de3e6cef
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
@@ -0,0 +1,127 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.search.Query;
+import com.yahoo.slime.JsonDecoder;
+import com.yahoo.slime.Slime;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+
+public class LoggerEntry {
+
+ private final Long timestamp;
+ private final Query query;
+ private final ByteBuffer blob;
+
+ private LoggerEntry(Builder builder) {
+ timestamp = builder.timestamp; // or set automatically if not set
+ query = builder.query;
+ blob = builder.blob;
+ }
+
+ public Long timestamp() {
+ return timestamp;
+ }
+
+ public Query query() {
+ return query;
+ }
+
+ public String queryString() {
+ String queryString = null;
+ if (query != null) {
+ if (query.getHttpRequest() != null && query.getHttpRequest().getUri() != null) {
+ queryString = query.getHttpRequest().getUri().getPath();
+ if (query.getHttpRequest().getUri().getQuery() != null) {
+ queryString += "?" + query.getHttpRequest().getUri().getRawQuery();
+ }
+ }
+ }
+ return queryString;
+ }
+
+ public ByteBuffer blob() {
+ return blob;
+ }
+
+ public String toString() {
+ return toJson();
+ }
+
+ public String toJson() {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+ g.writeStartObject();
+
+ g.writeNumberField("timestamp", timestamp == null ? 0 : timestamp);
+ g.writeStringField("query", queryString());
+ g.writeStringField("blob", Base64.getEncoder().encodeToString(blob.array()));
+
+ g.writeEndObject();
+ g.close();
+ return out.toString();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ // TODO: Rename method here and above? (serialize/deserialize)
+ // TODO: Use Slime or Jackson for both
+ public static LoggerEntry fromJson(byte[] content) throws IOException {
+ var decoder = new JsonDecoder();
+ var slime = decoder.decode(new Slime(), content);
+
+ var timestamp = slime.get().field("timestamp").asLong();
+ var query = new Query(slime.get().field("query").asString());
+ var blob = slime.get().field("blob").asData();
+
+ return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob));
+ }
+
+ public static class Builder {
+
+ private final Logger logger;
+
+ private Long timestamp;
+ private Query query;
+ private ByteBuffer blob;
+
+ // For testing
+ public Builder() { this(entry -> false); }
+
+ public Builder(Logger logger) {
+ this.logger = logger;
+ }
+
+ public Builder timestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public Builder query(Query query) {
+ this.query = query;
+ return this;
+ }
+
+ public Builder blob(byte[] bytes) {
+ blob = ByteBuffer.allocate(bytes.length);
+ blob.put(bytes).limit(blob.position()).position(0);
+ return this;
+ }
+
+ public boolean send() {
+ return logger.send(new LoggerEntry(this));
+ }
+
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
new file mode 100644
index 00000000000..8fa51a0b47c
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -0,0 +1,175 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.logging;
+
+import ai.vespa.validation.Validation;
+import com.yahoo.vespa.defaults.Defaults;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.stream.Stream;
+
+/**
+ * Spooler that will write an entry to a file and read files that are ready to be sent
+ *
+ * @author hmusum
+ */
+public class Spooler {
+
+ private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName());
+ private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events"));
+ private static final Comparator<File> ordering = new TimestampCompare();
+
+ private Path processingPath;
+ private Path readyPath;
+ private Path failuresPath;
+ private Path successesPath;
+
+ AtomicInteger fileCounter = new AtomicInteger(1);
+
+ private final Path spoolPath;
+
+ public Spooler() {
+ this(defaultSpoolPath);
+ }
+
+ public Spooler(Path spoolPath) {
+ this.spoolPath = spoolPath;
+ createDirs(spoolPath);
+ }
+
+ public boolean write(LoggerEntry entry) {
+ writeEntry(entry);
+
+ return true;
+ }
+
+ public List<LoggerEntry> processFiles() throws IOException {
+ List<Path> files = listFilesInPath(readyPath);
+
+ if (files.size() == 0) {
+ log.log(Level.INFO, "No files in ready path " + readyPath.toFile().getAbsolutePath());
+ return List.of();
+ }
+ log.log(Level.FINE, "Files in ready path: " + files.size());
+
+ List<File> fileList = getFiles(files, 50); // TODO
+ if ( ! fileList.isEmpty()) {
+ return processFiles(fileList);
+ }
+
+ return List.of();
+ }
+
+ List<Path> listFilesInPath(Path path) throws IOException {
+ List<Path> files;
+ System.out.println("Path " + path + " exists: " + path.toFile().exists());
+ try (Stream<Path> stream = Files.list(path)) {
+ files = stream.toList();
+ // TODO: Or check if stream is empty
+ } catch (NoSuchFileException e) {
+ return List.of(); // No files, this is OK
+ }
+ return files;
+ }
+
+ public ArrayList<LoggerEntry> processFiles(List<File> files) throws IOException {
+ ArrayList<LoggerEntry> entries = new ArrayList<>();
+ for (File f : files) {
+ log.log(Level.INFO, "Found file " + f);
+ var content = Files.readAllBytes(f.toPath());
+ var entry = LoggerEntry.fromJson(content);
+ Path file = f.toPath();
+ Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
+ Files.move(file, target);
+ entries.add(entry);
+ }
+ return entries;
+ }
+
+ public Path processingPath() { return processingPath; }
+ public Path readyPath() { return readyPath; }
+ public Path successesPath() { return successesPath; }
+ public Path failuresPath() { return failuresPath; }
+
+ List<File> getDirectories(File[] files) {
+ List<File> fileList = new ArrayList<>();
+
+ for (File f : files) {
+ if (f.isDirectory()) {
+ fileList.add(f);
+ }
+ }
+
+ Collections.sort(fileList);
+ return fileList;
+ }
+
+ List<File> getFiles(List<Path> files, int count) {
+ Validation.requireAtLeast(count, "count must be a positive number", 1);
+ List<File> fileList = new ArrayList<>();
+
+ for (Path p : files) {
+ File f = p.toFile();
+ if (!f.isDirectory()) {
+ fileList.add(f);
+ }
+
+ // Grab only some files
+ if (fileList.size() > count) {
+ break;
+ }
+ }
+
+ fileList.sort(ordering);
+ return fileList;
+ }
+
+ private void writeEntry(LoggerEntry entry) {
+ String fileName = String.valueOf(fileCounter);
+ Path file = spoolPath.resolve(processingPath).resolve(fileName);
+ try {
+ Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
+ Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
+ log.log(Level.INFO, "Moving file from " + file + " to " + target);
+ Files.move(file, target);
+ fileCounter.addAndGet(1);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void createDirs(Path spoolerPath) {
+ processingPath = createDir(spoolerPath.resolve("processing"));
+ readyPath = createDir(spoolerPath.resolve("ready"));
+ failuresPath = createDir(spoolerPath.resolve("failures"));
+ successesPath = createDir(spoolerPath.resolve("successes"));
+ }
+
+ private static Path createDir(Path path) {
+ File file = path.toFile();
+ if (file.exists() && file.canRead() && file.canWrite()) {
+ log.log(Level.INFO, "Directory " + path + " already exists");
+ } else if (file.mkdirs()) {
+ log.log(Level.INFO, "Created " + path);
+ } else {
+ log.log(Level.WARNING, "Could not create " + path + ", please check permissions");
+ }
+ return path;
+ }
+
+ private static class TimestampCompare implements Comparator<File> {
+ public int compare(File a, File b) {
+ return (int) (a.lastModified() - b.lastModified());
+ }
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/package-info.java b/container-search/src/main/java/com/yahoo/search/logging/package-info.java
new file mode 100644
index 00000000000..e2bc14f4faf
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/package-info.java
@@ -0,0 +1,6 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+@ExportPackage
+package com.yahoo.search.logging;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def b/container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def
new file mode 100644
index 00000000000..fc42c0dbd20
--- /dev/null
+++ b/container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def
@@ -0,0 +1,5 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+namespace=search.logging
+
+## Path where logs should be put
+path string default=/dev/null
diff --git a/container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java b/container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java
new file mode 100644
index 00000000000..bf6938424d7
--- /dev/null
+++ b/container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java
@@ -0,0 +1,50 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import com.yahoo.io.IOUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Base64;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class LocalDiskLoggerTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void testLocalDiskLogger() throws InterruptedException, IOException {
+ File logFile = tempDir.resolve("localdisklogger.log").toFile();
+
+ LocalDiskLoggerConfig.Builder builder = new LocalDiskLoggerConfig.Builder();
+ builder.path(logFile.getAbsolutePath());
+ LocalDiskLogger logger = new LocalDiskLogger(builder.build());
+
+ logger.newEntry()
+ .blob("Yo entry".getBytes())
+ .send();
+ waitForFile(logFile);
+
+ String test = IOUtils.readAll(new FileReader(logFile));
+ assertTrue(test.contains(Base64.getEncoder().encodeToString("Yo entry".getBytes())));
+ }
+
+ private void waitForFile(File file) throws InterruptedException {
+ int waitFor = 10;
+ while ( ! file.exists() && --waitFor > 0) {
+ Thread.sleep(10);
+ }
+ if ( ! file.exists()) {
+ fail("Local disk logger file was not created");
+ }
+ }
+
+}
diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
new file mode 100644
index 00000000000..abd3b27b70b
--- /dev/null
+++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
@@ -0,0 +1,107 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.logging;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SpoolerTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void testSpoolingLogger() throws IOException {
+ Path spoolDir = tempDir.resolve("spool");
+
+ Spooler spooler = new Spooler(spoolDir);
+
+ TestLogger logger = new TestLogger(spooler);
+ assertTrue(logger.newEntry()
+ .blob("Yo entry".getBytes())
+ .send());
+ assertTrue(logger.newEntry()
+ .blob("Yo entry 2".getBytes())
+ .send());
+
+ Path readyPath = spooler.readyPath();
+ Path readyFile1 = readyPath.resolve("1");
+ waitUntilFileExists(readyFile1);
+ Path readyFile2 = readyPath.resolve("2");
+ waitUntilFileExists(readyFile2);
+
+ // Check content after being moved to ready path
+ String content = Files.readString(readyFile1);
+ assertTrue(content.contains(Base64.getEncoder().encodeToString("Yo entry".getBytes())));
+ assertTrue(Files.readString(readyFile2).contains(Base64.getEncoder().encodeToString("Yo entry 2".getBytes())));
+
+ // Process files (read, transport files)
+ logger.manualRun();
+ assertEquals(2, logger.entriesSent());
+
+ // No files in processing or ready, 2 files in successes
+ assertEquals(0, spooler.listFilesInPath(spooler.processingPath()).size());
+ assertEquals(0, spooler.listFilesInPath(readyPath).size());
+ assertEquals(2, spooler.listFilesInPath(spooler.successesPath()).size());
+ assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size());
+ }
+
+ private void waitUntilFileExists(Path path) {
+ Instant end = Instant.now().plus(Duration.ofSeconds(1));
+ while (!path.toFile().exists() && Instant.now().isBefore(end)) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ assertTrue(path.toFile().exists());
+ }
+
+
+ private static class TestLogger extends AbstractSpoolingLogger {
+
+ private final List<LoggerEntry> entriesSent = new ArrayList<>();
+
+ public TestLogger(Spooler spooler) {
+ super(spooler);
+ }
+
+ @Override
+ void transport(LoggerEntry entry) {
+ System.out.println("Called transport()");
+ entriesSent.add(entry);
+ }
+
+ @Override
+ public void run() {
+ // Do nothing, use manualRun
+ }
+
+ @Override
+ public boolean send(LoggerEntry entry) {
+ return spooler.write(entry);
+ }
+
+ public void manualRun() {
+ super.run();
+ }
+
+ int entriesSent() {
+ return entriesSent.size();
+ }
+
+ }
+
+}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/archive/CuratorArchiveBucketDb.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/archive/CuratorArchiveBucketDb.java
index 0a0adcfc252..ac32fe5799d 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/archive/CuratorArchiveBucketDb.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/archive/CuratorArchiveBucketDb.java
@@ -1,10 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.controller.archive;
-import com.yahoo.config.provision.SystemName;
import com.yahoo.config.provision.TenantName;
import com.yahoo.config.provision.zone.ZoneId;
-import com.yahoo.text.Text;
import com.yahoo.vespa.hosted.controller.Controller;
import com.yahoo.vespa.hosted.controller.api.integration.archive.ArchiveBucket;
import com.yahoo.vespa.hosted.controller.api.integration.archive.ArchiveService;
@@ -14,7 +12,6 @@ import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
-import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -34,12 +31,10 @@ public class CuratorArchiveBucketDb {
private final ArchiveService archiveService;
private final CuratorDb curatorDb;
- private final SystemName system;
public CuratorArchiveBucketDb(Controller controller) {
this.archiveService = controller.serviceRegistry().archiveService();
this.curatorDb = controller.curator();
- this.system = controller.zoneRegistry().system();
}
public Optional<URI> archiveUriFor(ZoneId zoneId, TenantName tenant, boolean createIfMissing) {
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ArchiveAccessMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ArchiveAccessMaintainer.java
index 788360996ff..eed4fd0245d 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ArchiveAccessMaintainer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ArchiveAccessMaintainer.java
@@ -73,7 +73,7 @@ public class ArchiveAccessMaintainer extends ControllerMaintainer {
.filter(t -> t instanceof CloudTenant)
.map(t -> (CloudTenant) t)
.collect(Collectors.toUnmodifiableMap(
- Tenant::name, cloudTenant -> cloudTenant.archiveAccess()));
+ Tenant::name, CloudTenant::archiveAccess));
}
}
diff --git a/eval/src/vespa/eval/instruction/simple_join_count.h b/eval/src/vespa/eval/instruction/simple_join_count.h
index a566d2a7e68..1c74388d914 100644
--- a/eval/src/vespa/eval/instruction/simple_join_count.h
+++ b/eval/src/vespa/eval/instruction/simple_join_count.h
@@ -16,7 +16,7 @@ class SimpleJoinCount : public tensor_function::Op2
private:
uint64_t _dense_factor;
public:
- SimpleJoinCount(const TensorFunction &lhs_in, const TensorFunction &rhs_in, size_t dense_factor_in);
+ SimpleJoinCount(const TensorFunction &lhs_in, const TensorFunction &rhs_in, uint64_t dense_factor_in);
InterpretedFunction::Instruction compile_self(const ValueBuilderFactory &factory, Stash &stash) const override;
bool result_is_mutable() const override { return true; }
uint64_t dense_factor() const { return _dense_factor; }
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index 2dca3511044..d8e0ceb01bd 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -46,13 +46,13 @@ public class Flags {
private static volatile TreeMap<FlagId, FlagDefinition> flags = new TreeMap<>();
- public static final UnboundBooleanFlag MAIN_CHAIN_GRAPH = defineFeatureFlag(
- "main-chain-graph", true,
- List.of("hakonhall"), "2022-07-06", "2022-10-05",
- "Whether to run all tasks in the main task chain up to the one failing to converge (false), or " +
- "run all tasks in the main task chain whose dependencies have converged (true). And when suspending, " +
+ public static final UnboundBooleanFlag ROOT_CHAIN_GRAPH = defineFeatureFlag(
+ "root-chain-graph", true,
+ List.of("hakonhall"), "2022-10-05", "2022-11-04",
+ "Whether to run all tasks in the root task chain up to the one failing to converge (false), or " +
+ "run all tasks in the root task chain whose dependencies have converged (true). And when suspending, " +
"whether to run the tasks in sequence (false) or in reverse sequence (true).",
- "On first tick of the main chain after (re)start of host admin.",
+ "On first tick of the root chain after (re)start of host admin.",
ZONE_ID, NODE_TYPE, HOSTNAME);
public static final UnboundDoubleFlag DEFAULT_TERM_WISE_LIMIT = defineDoubleFlag(
diff --git a/parent/pom.xml b/parent/pom.xml
index fd076c5cc35..f9155acbfd5 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -1061,7 +1061,7 @@
<org.json.version>20220320</org.json.version>
<org.lz4.version>1.8.0</org.lz4.version>
<prometheus.client.version>0.6.0</prometheus.client.version>
- <protobuf.version>3.19.2</protobuf.version>
+ <protobuf.version>3.21.7</protobuf.version>
<spifly.version>1.3.5</spifly.version>
<surefire.version>2.22.2</surefire.version>
<zookeeper.client.version>3.8.0</zookeeper.client.version>
diff --git a/searchlib/src/vespa/searchlib/attribute/postingstore.cpp b/searchlib/src/vespa/searchlib/attribute/postingstore.cpp
index 79ec976ac50..2aa05e7fa9f 100644
--- a/searchlib/src/vespa/searchlib/attribute/postingstore.cpp
+++ b/searchlib/src/vespa/searchlib/attribute/postingstore.cpp
@@ -8,6 +8,7 @@
#include <vespa/vespalib/btree/btreeiterator.hpp>
#include <vespa/vespalib/btree/btreerootbase.cpp>
#include <vespa/vespalib/datastore/datastore.hpp>
+#include <vespa/vespalib/datastore/compacting_buffers.h>
#include <vespa/vespalib/datastore/compaction_spec.h>
#include <vespa/vespalib/datastore/entry_ref_filter.h>
#include <vespa/vespalib/datastore/buffer_type.hpp>
@@ -712,14 +713,14 @@ template <typename DataT>
void
PostingStore<DataT>::compact_worst_btree_nodes(const CompactionStrategy& compaction_strategy)
{
- auto to_hold = this->start_compact_worst_btree_nodes(compaction_strategy);
+ auto compacting_buffers = this->start_compact_worst_btree_nodes(compaction_strategy);
EntryRefFilter filter(RefType::numBuffers(), RefType::offset_bits);
// Only look at buffers containing bitvectors and btree roots
filter.add_buffers(this->_treeType.get_active_buffers());
filter.add_buffers(_bvType.get_active_buffers());
_dictionary.foreach_posting_list([this](const std::vector<EntryRef>& refs)
{ move_btree_nodes(refs); }, filter);
- this->finish_compact_worst_btree_nodes(to_hold);
+ compacting_buffers->finish();
}
template <typename DataT>
@@ -727,12 +728,11 @@ void
PostingStore<DataT>::compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy)
{
- auto to_hold = this->start_compact_worst_buffers(compaction_spec, compaction_strategy);
+ auto compacting_buffers = this->start_compact_worst_buffers(compaction_spec, compaction_strategy);
bool compact_btree_roots = false;
- EntryRefFilter filter(RefType::numBuffers(), RefType::offset_bits);
- filter.add_buffers(to_hold);
+ auto filter = compacting_buffers->make_entry_ref_filter();
// Start with looking at buffers being compacted
- for (uint32_t buffer_id : to_hold) {
+ for (uint32_t buffer_id : compacting_buffers->get_buffer_ids()) {
if (isBTree(_store.getBufferState(buffer_id).getTypeId())) {
compact_btree_roots = true;
}
@@ -745,7 +745,7 @@ PostingStore<DataT>::compact_worst_buffers(CompactionSpec compaction_spec, const
_dictionary.normalize_posting_lists([this](std::vector<EntryRef>& refs)
{ return move(refs); },
filter);
- this->finishCompact(to_hold);
+ compacting_buffers->finish();
}
template <typename DataT>
diff --git a/searchlib/src/vespa/searchlib/tensor/small_subspaces_buffer_type.h b/searchlib/src/vespa/searchlib/tensor/small_subspaces_buffer_type.h
index a778183c5a2..5622e9970b8 100644
--- a/searchlib/src/vespa/searchlib/tensor/small_subspaces_buffer_type.h
+++ b/searchlib/src/vespa/searchlib/tensor/small_subspaces_buffer_type.h
@@ -27,7 +27,7 @@ public:
SmallSubspacesBufferType(const SmallSubspacesBufferType&) = delete;
SmallSubspacesBufferType& operator=(const SmallSubspacesBufferType&) = delete;
SmallSubspacesBufferType(SmallSubspacesBufferType&&) noexcept = default;
- SmallSubspacesBufferType& operator=(SmallSubspacesBufferType&&) noexcept = default;
+ SmallSubspacesBufferType& operator=(SmallSubspacesBufferType&&) noexcept = delete;
SmallSubspacesBufferType(uint32_t array_size, const AllocSpec& spec, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator, TensorBufferTypeMapper& type_mapper) noexcept;
~SmallSubspacesBufferType() override;
void cleanHold(void* buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) override;
diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/State.java b/vdslib/src/main/java/com/yahoo/vdslib/state/State.java
index d4baff5861a..541364d90a9 100644
--- a/vdslib/src/main/java/com/yahoo/vdslib/state/State.java
+++ b/vdslib/src/main/java/com/yahoo/vdslib/state/State.java
@@ -13,31 +13,25 @@ import java.util.ArrayList;
public enum State {
// The order declares the ordinals, and defines what states are above/below others
- UNKNOWN ("-", false, true, true, false, false, false, false, false), // This state is used by the fleetcontroller to indicate
- // that we have failed to contact the node. It should never be
- // sent out of the fleetcontroller
- MAINTENANCE ("m", false, false, false, true, true, false, true, true),
- DOWN ("d", true, true, true, true, true, true, true, true), // Down is not valid reported state sent from the node itself.
- STOPPING ("s", false, true, true, false, false, true, true, true),
- INITIALIZING("i", false, true, true, false, false, true, true, true),
- RETIRED ("r", false, false, false, false, true, false, true, true),
- UP ("u", true, true, true, true, true, true, true, true);
+ UNKNOWN ("-", true, true, false, false, false, false, false), // This state is used by the fleetcontroller to indicate
+ // that we have failed to contact the node. It should never be
+ // sent out of the fleetcontroller
+ MAINTENANCE ("m", false, false, true, true, false, true, true),
+ DOWN ("d", true, true, true, true, true, true, true), // Down is not valid reported state sent from the node itself.
+ STOPPING ("s", true, true, false, false, true, true, true),
+ INITIALIZING("i", true, true, false, false, true, true, true),
+ RETIRED ("r", false, false, false, true, false, true, true),
+ UP ("u", true, true, true, true, true, true, true);
- private final boolean validDiskState;
private final boolean validClusterState;
private final ArrayList<Boolean> validReportedNodeState = new ArrayList<>();
private final ArrayList<Boolean> validWantedNodeState = new ArrayList<>();
private final ArrayList<Boolean> validCurrentNodeState = new ArrayList<>();
private final String serializedAs;
- private State(String serialized, boolean validDisk, boolean validDistReported, boolean validStorReported,
- boolean validDistWanted, boolean validStorWanted, boolean validCluster, boolean validDistCurrent,
- boolean validStorCurrent)
- {
- validDiskState = validDisk;
+ State(String serialized, boolean validDistReported, boolean validStorReported, boolean validDistWanted,
+ boolean validStorWanted, boolean validCluster, boolean validDistCurrent, boolean validStorCurrent) {
validClusterState = validCluster;
- assert(NodeType.STORAGE.ordinal() == 0);
- assert(NodeType.DISTRIBUTOR.ordinal() == 1);
validReportedNodeState.add(validStorReported);
validReportedNodeState.add(validDistReported);
validWantedNodeState.add(validStorWanted);
@@ -56,7 +50,6 @@ public enum State {
public String serialize() { return serializedAs; }
- public boolean validDiskState() { return validDiskState; }
public boolean validClusterState() { return validClusterState; }
public boolean validReportedNodeState(NodeType type) { return validReportedNodeState.get(type.ordinal()); }
public boolean validWantedNodeState(NodeType type) { return validWantedNodeState.get(type.ordinal()); }
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index df1e1006828..7aafb7c364e 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -58,6 +58,7 @@ vespa_define_module(
src/tests/datastore/compact_buffer_candidates
src/tests/datastore/datastore
src/tests/datastore/fixed_size_hash_map
+ src/tests/datastore/free_list
src/tests/datastore/sharded_hash_map
src/tests/datastore/unique_store
src/tests/datastore/unique_store_dictionary
diff --git a/vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp b/vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp
index 4716e91c2c4..c68ff07491e 100644
--- a/vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp
+++ b/vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp
@@ -64,8 +64,7 @@ public:
uint32_t get(EntryRef ref) const { return _store.getEntry(ref); }
uint32_t get_acquire(const AtomicEntryRef& ref) const { return get(ref.load_acquire()); }
uint32_t get_relaxed(const AtomicEntryRef& ref) const { return get(ref.load_relaxed()); }
- std::vector<uint32_t> start_compact();
- void finish_compact(std::vector<uint32_t> to_hold);
+ std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact();
static constexpr bool is_indirect = true;
static uint32_t get_offset_bits() { return StoreRefType::offset_bits; }
static uint32_t get_num_buffers() { return StoreRefType::numBuffers(); }
@@ -79,19 +78,13 @@ RealIntStore::RealIntStore()
RealIntStore::~RealIntStore() = default;
-std::vector<uint32_t>
+std::unique_ptr<vespalib::datastore::CompactingBuffers>
RealIntStore::start_compact()
{
// Use a compaction strategy that will compact all active buffers
CompactionStrategy compaction_strategy(0.0, 0.0, get_num_buffers(), 1.0);
CompactionSpec compaction_spec(true, false);
- return _store.startCompactWorstBuffers(compaction_spec, compaction_strategy);
-}
-
-void
-RealIntStore::finish_compact(std::vector<uint32_t> to_hold)
-{
- _store.finishCompact(to_hold);
+ return _store.start_compact_worst_buffers(compaction_spec, compaction_strategy);
}
EntryRef
@@ -347,9 +340,8 @@ void
Fixture<Params>::compact_keys()
{
if constexpr (KeyStore::is_indirect) {
- auto to_hold = _keys.start_compact();
- EntryRefFilter filter(_keys.get_num_buffers(), _keys.get_offset_bits());
- filter.add_buffers(to_hold);
+ auto compacting_buffers = _keys.start_compact();
+ auto filter = compacting_buffers->make_entry_ref_filter();
auto itr = _tree.begin();
while (itr.valid()) {
auto old_ref = itr.getKey().load_relaxed();
@@ -359,7 +351,7 @@ Fixture<Params>::compact_keys()
}
++itr;
}
- _keys.finish_compact(std::move(to_hold));
+ compacting_buffers->finish();
}
_compact_keys.track_compacted();
}
@@ -369,9 +361,8 @@ void
Fixture<Params>::compact_values()
{
if constexpr (ValueStore::is_indirect) {
- auto to_hold = _values.start_compact();
- EntryRefFilter filter(_values.get_num_buffers(), _values.get_offset_bits());
- filter.add_buffers(to_hold);
+ auto compacting_buffers = _values.start_compact();
+ auto filter = compacting_buffers->make_entry_ref_filter();
auto itr = _tree.begin();
while (itr.valid()) {
auto old_ref = itr.getData().load_relaxed();
@@ -381,7 +372,7 @@ Fixture<Params>::compact_values()
}
++itr;
}
- _values.finish_compact(std::move(to_hold));
+ compacting_buffers->finish();
}
_compact_values.track_compacted();
}
diff --git a/vespalib/src/tests/btree/btree_store/btree_store_test.cpp b/vespalib/src/tests/btree/btree_store/btree_store_test.cpp
index 5e2aa89b59e..4da34c64ed9 100644
--- a/vespalib/src/tests/btree/btree_store/btree_store_test.cpp
+++ b/vespalib/src/tests/btree/btree_store/btree_store_test.cpp
@@ -5,7 +5,9 @@
#include <vespa/vespalib/btree/btreeroot.hpp>
#include <vespa/vespalib/btree/btreestore.hpp>
#include <vespa/vespalib/datastore/buffer_type.hpp>
+#include <vespa/vespalib/datastore/compacting_buffers.h>
#include <vespa/vespalib/datastore/compaction_strategy.h>
+#include <vespa/vespalib/datastore/entry_ref_filter.h>
#include <vespa/vespalib/gtest/gtest.h>
using vespalib::GenerationHandler;
@@ -114,7 +116,6 @@ void
BTreeStoreTest::test_compact_sequence(uint32_t sequence_length)
{
auto &store = _store;
- uint32_t entry_ref_offset_bits = TreeStore::RefType::offset_bits;
EntryRef ref1 = add_sequence(4, 4 + sequence_length);
EntryRef ref2 = add_sequence(5, 5 + sequence_length);
std::vector<EntryRef> refs;
@@ -136,13 +137,10 @@ BTreeStoreTest::test_compact_sequence(uint32_t sequence_length)
for (uint32_t pass = 0; pass < 15; ++pass) {
CompactionSpec compaction_spec(true, false);
CompactionStrategy compaction_strategy;
- auto to_hold = store.start_compact_worst_buffers(compaction_spec, compaction_strategy);
- std::vector<bool> filter(TreeStore::RefType::numBuffers());
- for (auto buffer_id : to_hold) {
- filter[buffer_id] = true;
- }
+ auto compacting_buffers = store.start_compact_worst_buffers(compaction_spec, compaction_strategy);
+ auto filter = compacting_buffers->make_entry_ref_filter();
for (auto& ref : refs) {
- if (ref.valid() && filter[ref.buffer_id(entry_ref_offset_bits)]) {
+ if (ref.valid() && filter.has(ref)) {
move_refs.emplace_back(ref);
change_writer.emplace_back(ref);
}
@@ -150,7 +148,7 @@ BTreeStoreTest::test_compact_sequence(uint32_t sequence_length)
store.move(move_refs);
change_writer.write(move_refs);
move_refs.clear();
- store.finishCompact(to_hold);
+ compacting_buffers->finish();
inc_generation();
}
EXPECT_NE(ref1, refs[0]);
@@ -174,9 +172,9 @@ TEST_F(BTreeStoreTest, require_that_nodes_for_multiple_btrees_are_compacted)
auto usage_before = store.getMemoryUsage();
for (uint32_t pass = 0; pass < 15; ++pass) {
CompactionStrategy compaction_strategy;
- auto to_hold = store.start_compact_worst_btree_nodes(compaction_strategy);
+ auto compacting_buffers = store.start_compact_worst_btree_nodes(compaction_strategy);
store.move_btree_nodes(refs);
- store.finish_compact_worst_btree_nodes(to_hold);
+ compacting_buffers->finish();
inc_generation();
}
EXPECT_EQ(make_exp_sequence(4, 40), get_sequence(refs[0]));
diff --git a/vespalib/src/tests/datastore/array_store/array_store_test.cpp b/vespalib/src/tests/datastore/array_store/array_store_test.cpp
index 2ff2897461b..1e8632aee95 100644
--- a/vespalib/src/tests/datastore/array_store/array_store_test.cpp
+++ b/vespalib/src/tests/datastore/array_store/array_store_test.cpp
@@ -205,11 +205,11 @@ VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(NumberStoreFreeListsDisabledMultiTest,
TEST_P(NumberStoreTest, control_static_sizes) {
#ifdef _LIBCPP_VERSION
- EXPECT_EQ(440u, sizeof(f.store));
- EXPECT_EQ(296u, sizeof(NumberStoreTest::ArrayStoreType::DataStoreType));
+ EXPECT_EQ(464u, sizeof(store));
+ EXPECT_EQ(304u, sizeof(NumberStoreTest::ArrayStoreType::DataStoreType));
#else
- EXPECT_EQ(488u, sizeof(store));
- EXPECT_EQ(328u, sizeof(NumberStoreTest::ArrayStoreType::DataStoreType));
+ EXPECT_EQ(496u, sizeof(store));
+ EXPECT_EQ(336u, sizeof(NumberStoreTest::ArrayStoreType::DataStoreType));
#endif
EXPECT_EQ(112u, sizeof(NumberStoreTest::ArrayStoreType::SmallBufferType));
MemoryUsage usage = store.getMemoryUsage();
diff --git a/vespalib/src/tests/datastore/free_list/CMakeLists.txt b/vespalib/src/tests/datastore/free_list/CMakeLists.txt
new file mode 100644
index 00000000000..97aff6d7ae7
--- /dev/null
+++ b/vespalib/src/tests/datastore/free_list/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_datastore_free_list_test_app TEST
+ SOURCES
+ free_list_test.cpp
+ DEPENDS
+ vespalib
+ GTest::GTest
+)
+vespa_add_test(NAME vespalib_datastore_free_list_test_app COMMAND vespalib_datastore_free_list_test_app)
diff --git a/vespalib/src/tests/datastore/free_list/free_list_test.cpp b/vespalib/src/tests/datastore/free_list/free_list_test.cpp
new file mode 100644
index 00000000000..d80020d3dc5
--- /dev/null
+++ b/vespalib/src/tests/datastore/free_list/free_list_test.cpp
@@ -0,0 +1,127 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/datastore/entryref.hpp>
+#include <vespa/vespalib/datastore/free_list.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vector>
+
+using namespace vespalib::datastore;
+
+using MyEntryRef = EntryRefT<8, 4>;
+constexpr uint32_t array_size = 6;
+
+struct FreeListTest : public testing::Test
+{
+ FreeList list;
+ std::atomic<ElemCount> dead_elems;
+ std::vector<BufferFreeList> bufs;
+ FreeListTest()
+ : list(),
+ bufs()
+ {
+ for (size_t i = 0; i < 3; ++i) {
+ bufs.emplace_back(dead_elems);
+ bufs.back().on_active(array_size);
+ }
+ }
+ void TearDown() override {
+ for (auto& buf : bufs) {
+ buf.disable();
+ }
+ }
+ void enable(uint32_t buffer_id) {
+ bufs[buffer_id].enable(list);
+ }
+ void enable_all() {
+ for (auto& buf : bufs) {
+ buf.enable(list);
+ }
+ }
+ void push_entry(MyEntryRef ref) {
+ bufs[ref.bufferId()].push_entry(ref);
+ }
+ MyEntryRef pop_entry() {
+ return {list.pop_entry()};
+ }
+};
+
+TEST_F(FreeListTest, entry_refs_are_reused_in_lifo_order)
+{
+ enable(0);
+ push_entry({10, 0});
+ push_entry({11, 0});
+ push_entry({12, 0});
+ EXPECT_EQ(MyEntryRef(12, 0), pop_entry());
+ EXPECT_EQ(MyEntryRef(11, 0), pop_entry());
+ EXPECT_EQ(MyEntryRef(10, 0), pop_entry());
+}
+
+TEST_F(FreeListTest, buffer_free_list_attaches_and_detaches_from_free_list)
+{
+ enable(0);
+ EXPECT_TRUE(list.empty());
+ push_entry({10, 0});
+ EXPECT_EQ(1, list.size());
+ push_entry({11, 0});
+ pop_entry();
+ EXPECT_EQ(1, list.size());
+ pop_entry();
+ EXPECT_TRUE(list.empty());
+}
+
+TEST_F(FreeListTest, disable_clears_all_entry_refs_and_detaches_from_free_list)
+{
+ enable(0);
+ push_entry({10, 0});
+ EXPECT_EQ(1, list.size());
+ EXPECT_FALSE(bufs[0].empty());
+ EXPECT_TRUE(bufs[0].enabled());
+
+ bufs[0].disable();
+ EXPECT_TRUE(list.empty());
+ EXPECT_TRUE(bufs[0].empty());
+ EXPECT_FALSE(bufs[0].enabled());
+}
+
+TEST_F(FreeListTest, buffer_free_lists_are_reused_in_lifo_order)
+{
+ enable_all();
+ EXPECT_TRUE(list.empty());
+ push_entry({10, 0});
+ EXPECT_EQ(1, list.size());
+ push_entry({11, 0});
+ push_entry({20, 1});
+ EXPECT_EQ(2, list.size());
+ push_entry({21, 1});
+ push_entry({30, 2});
+ EXPECT_EQ(3, list.size());
+ push_entry({31, 2});
+
+ EXPECT_EQ(MyEntryRef(31, 2), pop_entry());
+ EXPECT_EQ(MyEntryRef(30, 2), pop_entry());
+ EXPECT_EQ(2, list.size());
+ EXPECT_EQ(MyEntryRef(21, 1), pop_entry());
+ EXPECT_EQ(MyEntryRef(20, 1), pop_entry());
+ EXPECT_EQ(1, list.size());
+ EXPECT_EQ(MyEntryRef(11, 0), pop_entry());
+
+ push_entry({32, 2});
+ EXPECT_EQ(2, list.size());
+
+ EXPECT_EQ(MyEntryRef(32, 2), pop_entry());
+ EXPECT_EQ(1, list.size());
+ EXPECT_EQ(MyEntryRef(10, 0), pop_entry());
+ EXPECT_TRUE(list.empty());
+}
+
+TEST_F(FreeListTest, dead_elems_count_is_updated_when_popping_an_entry)
+{
+ enable(0);
+ push_entry({10, 0});
+ dead_elems.store(18, std::memory_order_relaxed);
+ pop_entry();
+ EXPECT_EQ(18 - array_size, dead_elems.load(std::memory_order_relaxed));
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
+
diff --git a/vespalib/src/vespa/vespalib/btree/btree.hpp b/vespalib/src/vespa/vespalib/btree/btree.hpp
index 473d1f4735e..c6d8886254d 100644
--- a/vespalib/src/vespa/vespalib/btree/btree.hpp
+++ b/vespalib/src/vespa/vespalib/btree/btree.hpp
@@ -28,9 +28,9 @@ template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
void
BTree<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::compact_worst(const datastore::CompactionStrategy& compaction_strategy)
{
- auto to_hold = _alloc.start_compact_worst(compaction_strategy);
+ auto compacting_buffers = _alloc.start_compact_worst(compaction_strategy);
_tree.move_nodes(_alloc);
- _alloc.finishCompact(to_hold);
+ compacting_buffers->finish();
}
}
diff --git a/vespalib/src/vespa/vespalib/btree/btreenodeallocator.h b/vespalib/src/vespa/vespalib/btree/btreenodeallocator.h
index 27e73b3a2b6..86c9621f869 100644
--- a/vespalib/src/vespa/vespalib/btree/btreenodeallocator.h
+++ b/vespalib/src/vespa/vespalib/btree/btreenodeallocator.h
@@ -166,7 +166,7 @@ public:
bool getCompacting(EntryRef ref) const { return _nodeStore.getCompacting(ref); }
std::vector<uint32_t> startCompact() { return _nodeStore.startCompact(); }
- std::vector<uint32_t> start_compact_worst(const CompactionStrategy& compaction_strategy) { return _nodeStore.start_compact_worst(compaction_strategy); }
+ std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact_worst(const CompactionStrategy& compaction_strategy) { return _nodeStore.start_compact_worst(compaction_strategy); }
void finishCompact(const std::vector<uint32_t> &toHold) {
return _nodeStore.finishCompact(toHold);
diff --git a/vespalib/src/vespa/vespalib/btree/btreenodestore.h b/vespalib/src/vespa/vespalib/btree/btreenodestore.h
index d4a5ae42ef8..d05ec840f83 100644
--- a/vespalib/src/vespa/vespalib/btree/btreenodestore.h
+++ b/vespalib/src/vespa/vespalib/btree/btreenodestore.h
@@ -6,6 +6,8 @@
#include "btreetraits.h"
#include <vespa/vespalib/datastore/datastore.h>
+namespace vespalib::datastore { class CompactingBuffers; }
+
namespace vespalib::btree {
class BTreeNodeReclaimer
@@ -160,7 +162,7 @@ public:
std::vector<uint32_t> startCompact();
- std::vector<uint32_t> start_compact_worst(const CompactionStrategy& compaction_strategy);
+ std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact_worst(const CompactionStrategy& compaction_strategy);
void finishCompact(const std::vector<uint32_t> &toHold);
diff --git a/vespalib/src/vespa/vespalib/btree/btreenodestore.hpp b/vespalib/src/vespa/vespalib/btree/btreenodestore.hpp
index 91953507eb0..0f9eeb9daec 100644
--- a/vespalib/src/vespa/vespalib/btree/btreenodestore.hpp
+++ b/vespalib/src/vespa/vespalib/btree/btreenodestore.hpp
@@ -3,6 +3,7 @@
#pragma once
#include "btreenodestore.h"
+#include <vespa/vespalib/datastore/compacting_buffers.h>
#include <vespa/vespalib/datastore/compaction_spec.h>
#include <vespa/vespalib/datastore/datastore.hpp>
@@ -70,11 +71,11 @@ startCompact()
template <typename KeyT, typename DataT, typename AggrT,
size_t INTERNAL_SLOTS, size_t LEAF_SLOTS>
-std::vector<uint32_t>
+std::unique_ptr<vespalib::datastore::CompactingBuffers>
BTreeNodeStore<KeyT, DataT, AggrT, INTERNAL_SLOTS, LEAF_SLOTS>::
start_compact_worst(const CompactionStrategy &compaction_strategy)
{
- return _store.startCompactWorstBuffers(datastore::CompactionSpec(true, false), compaction_strategy);
+ return _store.start_compact_worst_buffers(datastore::CompactionSpec(true, false), compaction_strategy);
}
template <typename KeyT, typename DataT, typename AggrT,
diff --git a/vespalib/src/vespa/vespalib/btree/btreestore.h b/vespalib/src/vespa/vespalib/btree/btreestore.h
index a79259c6e57..54bc397175d 100644
--- a/vespalib/src/vespa/vespalib/btree/btreestore.h
+++ b/vespalib/src/vespa/vespalib/btree/btreestore.h
@@ -149,13 +149,6 @@ public:
KeyDataTypeRefPair
allocKeyDataCopy(const KeyDataType *rhs, uint32_t clusterSize);
- std::vector<uint32_t>
- startCompact();
-
- void
- finishCompact(const std::vector<uint32_t> &toHold);
-
-
const KeyDataType *
lower_bound(const KeyDataType *b, const KeyDataType *e,
const KeyType &key, CompareT comp);
@@ -394,11 +387,10 @@ public:
void
foreach_frozen(EntryRef ref, FunctionType func) const;
- std::vector<uint32_t> start_compact_worst_btree_nodes(const CompactionStrategy& compaction_strategy);
- void finish_compact_worst_btree_nodes(const std::vector<uint32_t>& to_hold);
+ std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact_worst_btree_nodes(const CompactionStrategy& compaction_strategy);
void move_btree_nodes(const std::vector<EntryRef>& refs);
- std::vector<uint32_t> start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy);
+ std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy);
void move(std::vector<EntryRef>& refs);
private:
diff --git a/vespalib/src/vespa/vespalib/btree/btreestore.hpp b/vespalib/src/vespa/vespalib/btree/btreestore.hpp
index c0985ff8f94..ffd337d642b 100644
--- a/vespalib/src/vespa/vespalib/btree/btreestore.hpp
+++ b/vespalib/src/vespa/vespalib/btree/btreestore.hpp
@@ -5,6 +5,7 @@
#include "btreestore.h"
#include "btreebuilder.h"
#include "btreebuilder.hpp"
+#include <vespa/vespalib/datastore/compacting_buffers.h>
#include <vespa/vespalib/datastore/compaction_spec.h>
#include <vespa/vespalib/datastore/datastore.hpp>
#include <vespa/vespalib/util/optimized.h>
@@ -116,34 +117,6 @@ allocKeyDataCopy(const KeyDataType *rhs, uint32_t clusterSize)
allocArray(vespalib::ConstArrayRef<KeyDataType>(rhs, clusterSize));
}
-
-template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
- typename TraitsT, typename AggrCalcT>
-std::vector<uint32_t>
-BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::startCompact()
-{
- std::vector<uint32_t> ret = _store.startCompact(clusterLimit);
- for (uint32_t clusterSize = 1; clusterSize <= clusterLimit; ++clusterSize) {
- uint32_t typeId = clusterSize - 1;
- std::vector<uint32_t> toHold = _store.startCompact(typeId);
- for (auto i : toHold) {
- ret.push_back(i);
- }
- }
- return ret;
-}
-
-
-template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
- typename TraitsT, typename AggrCalcT>
-void
-BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::
-finishCompact(const std::vector<uint32_t> &toHold)
-{
- _store.finishCompact(toHold);
-}
-
-
template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
typename TraitsT, typename AggrCalcT>
const typename BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::
@@ -971,7 +944,7 @@ getAggregated(const EntryRef ref) const
template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
typename TraitsT, typename AggrCalcT>
-std::vector<uint32_t>
+std::unique_ptr<vespalib::datastore::CompactingBuffers>
BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::
start_compact_worst_btree_nodes(const CompactionStrategy& compaction_strategy)
{
@@ -983,15 +956,6 @@ template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
typename TraitsT, typename AggrCalcT>
void
BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::
-finish_compact_worst_btree_nodes(const std::vector<uint32_t>& to_hold)
-{
- _allocator.finishCompact(to_hold);
-}
-
-template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
- typename TraitsT, typename AggrCalcT>
-void
-BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::
move_btree_nodes(const std::vector<EntryRef>& refs)
{
for (auto& ref : refs) {
@@ -1006,12 +970,12 @@ move_btree_nodes(const std::vector<EntryRef>& refs)
template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
typename TraitsT, typename AggrCalcT>
-std::vector<uint32_t>
+std::unique_ptr<vespalib::datastore::CompactingBuffers>
BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::
start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy)
{
freeze();
- return _store.startCompactWorstBuffers(compaction_spec, compaction_strategy);
+ return _store.start_compact_worst_buffers(compaction_spec, compaction_strategy);
}
template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
diff --git a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt
index abbdc79c527..a2b561b6792 100644
--- a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt
@@ -4,15 +4,19 @@ vespa_add_library(vespalib_vespalib_datastore OBJECT
array_store.cpp
array_store_config.cpp
atomic_entry_ref.cpp
+ buffer_free_list.cpp
buffer_type.cpp
bufferstate.cpp
- compaction_strategy.cpp
compact_buffer_candidates.cpp
+ compacting_buffers.cpp
+ compaction_strategy.cpp
+ compaction_strategy.cpp
datastore.cpp
datastorebase.cpp
- entryref.cpp
entry_ref_filter.cpp
+ entryref.cpp
fixed_size_hash_map.cpp
+ free_list.cpp
large_array_buffer_type.cpp
sharded_hash_map.cpp
small_array_buffer_type.cpp
diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.hpp b/vespalib/src/vespa/vespalib/datastore/array_store.hpp
index 4fc13396f6b..e79398271fb 100644
--- a/vespalib/src/vespa/vespalib/datastore/array_store.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/array_store.hpp
@@ -3,6 +3,7 @@
#pragma once
#include "array_store.h"
+#include "compacting_buffers.h"
#include "compaction_spec.h"
#include "entry_ref_filter.h"
#include "datastore.hpp"
@@ -150,24 +151,20 @@ template <typename EntryT, typename RefT, typename TypeMapperT>
class CompactionContext : public ICompactionContext {
private:
using ArrayStoreType = ArrayStore<EntryT, RefT, TypeMapperT>;
- DataStoreBase &_dataStore;
ArrayStoreType &_store;
- std::vector<uint32_t> _bufferIdsToCompact;
+ std::unique_ptr<vespalib::datastore::CompactingBuffers> _compacting_buffers;
EntryRefFilter _filter;
public:
- CompactionContext(DataStoreBase &dataStore,
- ArrayStoreType &store,
- std::vector<uint32_t> bufferIdsToCompact)
- : _dataStore(dataStore),
- _store(store),
- _bufferIdsToCompact(std::move(bufferIdsToCompact)),
- _filter(RefT::numBuffers(), RefT::offset_bits)
+ CompactionContext(ArrayStoreType &store,
+ std::unique_ptr<vespalib::datastore::CompactingBuffers> compacting_buffers)
+ : _store(store),
+ _compacting_buffers(std::move(compacting_buffers)),
+ _filter(_compacting_buffers->make_entry_ref_filter())
{
- _filter.add_buffers(_bufferIdsToCompact);
}
~CompactionContext() override {
- _dataStore.finishCompact(_bufferIdsToCompact);
+ _compacting_buffers->finish();
}
void compact(vespalib::ArrayRef<AtomicEntryRef> refs) override {
for (auto &atomic_entry_ref : refs) {
@@ -186,9 +183,9 @@ template <typename EntryT, typename RefT, typename TypeMapperT>
ICompactionContext::UP
ArrayStore<EntryT, RefT, TypeMapperT>::compactWorst(CompactionSpec compaction_spec, const CompactionStrategy &compaction_strategy)
{
- std::vector<uint32_t> bufferIdsToCompact = _store.startCompactWorstBuffers(compaction_spec, compaction_strategy);
+ auto compacting_buffers = _store.start_compact_worst_buffers(compaction_spec, compaction_strategy);
return std::make_unique<arraystore::CompactionContext<EntryT, RefT, TypeMapperT>>
- (_store, *this, std::move(bufferIdsToCompact));
+ (*this, std::move(compacting_buffers));
}
template <typename EntryT, typename RefT, typename TypeMapperT>
diff --git a/vespalib/src/vespa/vespalib/datastore/buffer_free_list.cpp b/vespalib/src/vespa/vespalib/datastore/buffer_free_list.cpp
new file mode 100644
index 00000000000..0a440e3e867
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/buffer_free_list.cpp
@@ -0,0 +1,56 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "buffer_free_list.h"
+#include "free_list.h"
+#include <cassert>
+
+namespace vespalib::datastore {
+
+void
+BufferFreeList::attach()
+{
+ assert(_free_list != nullptr);
+ _free_list->attach(*this);
+}
+
+void
+BufferFreeList::detach()
+{
+ assert(_free_list != nullptr);
+ _free_list->detach(*this);
+}
+
+BufferFreeList::BufferFreeList(std::atomic<ElemCount>& dead_elems)
+ : _dead_elems(dead_elems),
+ _array_size(0),
+ _free_list(),
+ _free_refs()
+{
+}
+
+BufferFreeList::~BufferFreeList()
+{
+ assert(_free_list == nullptr);
+ assert(_free_refs.empty());
+}
+
+void
+BufferFreeList::enable(FreeList& free_list)
+{
+ assert(_free_list == nullptr);
+ assert(_free_refs.empty());
+ _free_list = &free_list;
+}
+
+void
+BufferFreeList::disable()
+{
+ if (!empty()) {
+ detach();
+ EntryRefArray().swap(_free_refs);
+ }
+ _free_list = nullptr;
+}
+
+}
+
diff --git a/vespalib/src/vespa/vespalib/datastore/buffer_free_list.h b/vespalib/src/vespa/vespalib/datastore/buffer_free_list.h
new file mode 100644
index 00000000000..9ad24f7f03e
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/buffer_free_list.h
@@ -0,0 +1,56 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "entryref.h"
+#include "buffer_type.h"
+#include <vespa/vespalib/util/array.h>
+
+namespace vespalib::datastore {
+
+class FreeList;
+
+/**
+ * Class containing the free list for a single buffer.
+ *
+ * The free list is a stack of EntryRef's that can be reused.
+ */
+class BufferFreeList {
+private:
+ using EntryRefArray = vespalib::Array<EntryRef>;
+
+ std::atomic<ElemCount>& _dead_elems;
+ uint32_t _array_size;
+ FreeList* _free_list;
+ EntryRefArray _free_refs;
+
+ void attach();
+ void detach();
+
+public:
+ BufferFreeList(std::atomic<ElemCount>& dead_elems);
+ ~BufferFreeList();
+ void enable(FreeList& free_list);
+ void disable();
+
+ void on_active(uint32_t array_size) { _array_size = array_size; }
+ bool enabled() const { return _free_list != nullptr; }
+ bool empty() const { return _free_refs.empty(); }
+ void push_entry(EntryRef ref) {
+ if (empty()) {
+ attach();
+ }
+ _free_refs.push_back(ref);
+ }
+ EntryRef pop_entry() {
+ EntryRef ret = _free_refs.back();
+ _free_refs.pop_back();
+ if (empty()) {
+ detach();
+ }
+ _dead_elems.store(_dead_elems.load(std::memory_order_relaxed) - _array_size, std::memory_order_relaxed);
+ return ret;
+ }
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/compacting_buffers.cpp b/vespalib/src/vespa/vespalib/datastore/compacting_buffers.cpp
new file mode 100644
index 00000000000..e350ef5056e
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/compacting_buffers.cpp
@@ -0,0 +1,38 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "compacting_buffers.h"
+#include "datastorebase.h"
+#include "entry_ref_filter.h"
+#include <cassert>
+
+namespace vespalib::datastore {
+
+CompactingBuffers::CompactingBuffers(DataStoreBase& store, uint32_t num_buffers, uint32_t offset_bits, std::vector<uint32_t> buffer_ids)
+ : _store(store),
+ _num_buffers(num_buffers),
+ _offset_bits(offset_bits),
+ _buffer_ids(std::move(buffer_ids))
+{
+}
+
+CompactingBuffers::~CompactingBuffers()
+{
+ assert(_buffer_ids.empty());
+}
+
+void
+CompactingBuffers::finish()
+{
+ _store.finishCompact(_buffer_ids);
+ _buffer_ids.clear();
+}
+
+EntryRefFilter
+CompactingBuffers::make_entry_ref_filter() const
+{
+ EntryRefFilter filter(_num_buffers, _offset_bits);
+ filter.add_buffers(_buffer_ids);
+ return filter;
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/compacting_buffers.h b/vespalib/src/vespa/vespalib/datastore/compacting_buffers.h
new file mode 100644
index 00000000000..87e698c4eca
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/compacting_buffers.h
@@ -0,0 +1,32 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+namespace vespalib::datastore {
+
+class DataStoreBase;
+class EntryRefFilter;
+
+/*
+ * Class representing the buffers currently being compacted in a data store.
+ */
+class CompactingBuffers
+{
+ DataStoreBase& _store;
+ uint32_t _num_buffers;
+ uint32_t _offset_bits;
+ std::vector<uint32_t> _buffer_ids;
+public:
+ CompactingBuffers(DataStoreBase& store, uint32_t num_buffers, uint32_t offset_bits, std::vector<uint32_t> buffer_ids);
+ ~CompactingBuffers();
+ DataStoreBase& get_store() const noexcept { return _store; }
+ const std::vector<uint32_t>& get_buffer_ids() const noexcept { return _buffer_ids; }
+ bool empty() const noexcept { return _buffer_ids.empty(); }
+ void finish();
+ EntryRefFilter make_entry_ref_filter() const;
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/datastore.hpp b/vespalib/src/vespa/vespalib/datastore/datastore.hpp
index 4d09ffe4bc6..72d08460eea 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastore.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/datastore.hpp
@@ -13,7 +13,7 @@ namespace vespalib::datastore {
template <typename RefT>
DataStoreT<RefT>::DataStoreT()
- : DataStoreBase(RefType::numBuffers(), RefType::offsetSize())
+ : DataStoreBase(RefType::numBuffers(), RefType::offset_bits, RefType::offsetSize())
{
}
diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
index 60671afb1a0..79113f76941 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
@@ -2,6 +2,7 @@
#include "datastorebase.h"
#include "compact_buffer_candidates.h"
+#include "compacting_buffers.h"
#include "compaction_spec.h"
#include "compaction_strategy.h"
#include <vespa/vespalib/util/array.hpp>
@@ -79,7 +80,7 @@ public:
}
};
-DataStoreBase::DataStoreBase(uint32_t numBuffers, size_t maxArrays)
+DataStoreBase::DataStoreBase(uint32_t numBuffers, uint32_t offset_bits, size_t maxArrays)
: _buffers(numBuffers),
_primary_buffer_ids(),
_states(numBuffers),
@@ -90,6 +91,7 @@ DataStoreBase::DataStoreBase(uint32_t numBuffers, size_t maxArrays)
_elemHold1List(),
_elemHold2List(),
_numBuffers(numBuffers),
+ _offset_bits(offset_bits),
_hold_buffer_count(0u),
_maxArrays(maxArrays),
_compaction_count(0u),
@@ -529,8 +531,8 @@ DataStoreBase::markCompacting(uint32_t bufferId)
inc_compaction_count();
}
-std::vector<uint32_t>
-DataStoreBase::startCompactWorstBuffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy)
+std::unique_ptr<CompactingBuffers>
+DataStoreBase::start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy)
{
// compact memory usage
CompactBufferCandidates elem_buffers(_numBuffers, compaction_strategy.get_max_buffers(), compaction_strategy.get_active_buffers_ratio(), compaction_strategy.getMaxDeadBytesRatio() / 2, CompactionStrategy::DEAD_BYTES_SLACK);
@@ -567,7 +569,7 @@ DataStoreBase::startCompactWorstBuffers(CompactionSpec compaction_spec, const Co
for (auto buffer_id : result) {
markCompacting(buffer_id);
}
- return result;
+ return std::make_unique<CompactingBuffers>(*this, _numBuffers, _offset_bits, std::move(result));
}
void
diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.h b/vespalib/src/vespa/vespalib/datastore/datastorebase.h
index 20104670085..40730252139 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastorebase.h
+++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.h
@@ -12,6 +12,7 @@
namespace vespalib::datastore {
+class CompactingBuffers;
class CompactionSpec;
class CompactionStrategy;
@@ -159,13 +160,14 @@ protected:
ElemHold2List _elemHold2List;
const uint32_t _numBuffers;
+ const uint32_t _offset_bits;
uint32_t _hold_buffer_count;
const size_t _maxArrays;
mutable std::atomic<uint64_t> _compaction_count;
vespalib::GenerationHolder _genHolder;
- DataStoreBase(uint32_t numBuffers, size_t maxArrays);
+ DataStoreBase(uint32_t numBuffers, uint32_t offset_bits, size_t maxArrays);
DataStoreBase(const DataStoreBase &) = delete;
DataStoreBase &operator=(const DataStoreBase &) = delete;
@@ -376,7 +378,7 @@ public:
}
uint32_t startCompactWorstBuffer(uint32_t typeId);
- std::vector<uint32_t> startCompactWorstBuffers(CompactionSpec compaction_spec, const CompactionStrategy &compaction_strategy);
+ std::unique_ptr<CompactingBuffers> start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy &compaction_strategy);
uint64_t get_compaction_count() const { return _compaction_count.load(std::memory_order_relaxed); }
void inc_compaction_count() const { ++_compaction_count; }
bool has_held_buffers() const noexcept { return _hold_buffer_count != 0u; }
diff --git a/vespalib/src/vespa/vespalib/datastore/free_list.cpp b/vespalib/src/vespa/vespalib/datastore/free_list.cpp
new file mode 100644
index 00000000000..6c96e51241c
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/free_list.cpp
@@ -0,0 +1,36 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "free_list.h"
+#include <cassert>
+
+namespace vespalib::datastore {
+
+FreeList::FreeList()
+ : _free_lists()
+{
+}
+
+FreeList::~FreeList()
+{
+ assert(_free_lists.empty());
+}
+
+void
+FreeList::attach(BufferFreeList& buf_list)
+{
+ _free_lists.push_back(&buf_list);
+}
+
+void
+FreeList::detach(BufferFreeList& buf_list)
+{
+ if (!_free_lists.empty() && (_free_lists.back() == &buf_list)) {
+ _free_lists.pop_back();
+ return;
+ }
+ auto itr = std::find(_free_lists.begin(), _free_lists.end(), &buf_list);
+ assert(itr != _free_lists.end());
+ _free_lists.erase(itr);
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/free_list.h b/vespalib/src/vespa/vespalib/datastore/free_list.h
new file mode 100644
index 00000000000..39e1713e47d
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/free_list.h
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "buffer_free_list.h"
+#include "entryref.h"
+#include <vector>
+
+namespace vespalib::datastore {
+
+/**
+ * Class containing the free list for a single buffer type id.
+ *
+ * This consists of a stack of buffer free lists,
+ * where the newest attached is used when getting an EntryRef for reuse.
+ */
+class FreeList {
+private:
+ std::vector<BufferFreeList*> _free_lists;
+
+public:
+ FreeList();
+ ~FreeList();
+ void attach(BufferFreeList& buf_list);
+ void detach(BufferFreeList& buf_list);
+
+ bool empty() const { return _free_lists.empty(); }
+ size_t size() const { return _free_lists.size(); }
+ EntryRef pop_entry() {
+ return _free_lists.back()->pop_entry();
+ }
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp
index cbb8369e1f2..37a56bf2561 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp
@@ -94,17 +94,17 @@ private:
btree::NoAggregated,
EntryComparatorWrapper,
DictionaryTraits>;
- using UniqueStoreRemapper<RefT>::_compacting_buffer;
+ using UniqueStoreRemapper<RefT>::_filter;
using UniqueStoreRemapper<RefT>::_mapping;
- DataStoreBase &_dataStore;
IUniqueStoreDictionary &_dict;
ICompactable &_store;
- std::vector<uint32_t> _bufferIdsToCompact;
+ std::unique_ptr<CompactingBuffers> _compacting_buffers;
void allocMapping() {
_mapping.resize(RefT::numBuffers());
- for (const auto bufferId : _bufferIdsToCompact) {
- BufferState &state = _dataStore.getBufferState(bufferId);
+ auto& data_store = _compacting_buffers->get_store();
+ for (const auto bufferId : _compacting_buffers->get_buffer_ids()) {
+ BufferState &state = data_store.getBufferState(bufferId);
_mapping[bufferId].resize(state.get_used_arrays());
}
}
@@ -122,34 +122,30 @@ private:
}
void fillMapping() {
- _dict.move_keys(*this, _compacting_buffer);
+ _dict.move_keys(*this, _filter);
}
public:
- CompactionContext(DataStoreBase &dataStore,
- IUniqueStoreDictionary &dict,
+ CompactionContext(IUniqueStoreDictionary &dict,
ICompactable &store,
- std::vector<uint32_t> bufferIdsToCompact)
- : UniqueStoreRemapper<RefT>(),
+ std::unique_ptr<CompactingBuffers> compacting_buffers)
+ : UniqueStoreRemapper<RefT>(compacting_buffers->make_entry_ref_filter()),
ICompactable(),
- _dataStore(dataStore),
_dict(dict),
_store(store),
- _bufferIdsToCompact(std::move(bufferIdsToCompact))
+ _compacting_buffers(std::move(compacting_buffers))
{
- if (!_bufferIdsToCompact.empty()) {
- _compacting_buffer.add_buffers(_bufferIdsToCompact);
+ if (!_compacting_buffers->empty()) {
allocMapping();
fillMapping();
}
}
void done() override {
- _dataStore.finishCompact(_bufferIdsToCompact);
- _bufferIdsToCompact.clear();
+ _compacting_buffers->finish();
}
~CompactionContext() override {
- assert(_bufferIdsToCompact.empty());
+ assert(_compacting_buffers->empty());
}
};
@@ -159,11 +155,11 @@ template <typename EntryT, typename RefT, typename Compare, typename Allocator>
std::unique_ptr<typename UniqueStore<EntryT, RefT, Compare, Allocator>::Remapper>
UniqueStore<EntryT, RefT, Compare, Allocator>::compact_worst(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy)
{
- std::vector<uint32_t> bufferIdsToCompact = _store.startCompactWorstBuffers(compaction_spec, compaction_strategy);
- if (bufferIdsToCompact.empty()) {
+ auto compacting_buffers = _store.start_compact_worst_buffers(compaction_spec, compaction_strategy);
+ if (compacting_buffers->empty()) {
return std::unique_ptr<Remapper>();
} else {
- return std::make_unique<uniquestore::CompactionContext<RefT>>(_store, *_dict, _allocator, std::move(bufferIdsToCompact));
+ return std::make_unique<uniquestore::CompactionContext<RefT>>(*_dict, _allocator, std::move(compacting_buffers));
}
}
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h b/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h
index 4babd6204c7..174c74a62d2 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h
@@ -19,11 +19,11 @@ public:
using RefType = RefT;
protected:
- EntryRefFilter _compacting_buffer;
+ EntryRefFilter _filter;
std::vector<std::vector<EntryRef, allocator_large<EntryRef>>> _mapping;
public:
- UniqueStoreRemapper()
- : _compacting_buffer(RefT::numBuffers(), RefT::offset_bits),
+ UniqueStoreRemapper(EntryRefFilter&& filter)
+ : _filter(std::move(filter)),
_mapping()
{
}
@@ -41,13 +41,13 @@ public:
void remap(vespalib::ArrayRef<AtomicEntryRef> refs) const {
for (auto &atomic_ref : refs) {
auto ref = atomic_ref.load_relaxed();
- if (ref.valid() && _compacting_buffer.has(ref)) {
+ if (ref.valid() && _filter.has(ref)) {
atomic_ref.store_release(remap(ref));
}
}
}
- const EntryRefFilter& get_entry_ref_filter() const noexcept { return _compacting_buffer; }
+ const EntryRefFilter& get_entry_ref_filter() const noexcept { return _filter; }
virtual void done() = 0;
};