diff options
44 files changed, 1195 insertions, 177 deletions
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java index bcef4af6620..3703878f595 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java @@ -217,8 +217,12 @@ class ServletRequestReader { synchronized (monitor) { errorDuringRead = t; - if (state != State.READING) return; - state = State.ALL_DATA_READ; + if (state == State.REQUEST_CONTENT_CLOSED) { + return; + } + if (state == State.READING) { + state = State.ALL_DATA_READ; + } shouldCloseRequestContentChannel = numberOfOutstandingUserCalls == 0; if (shouldCloseRequestContentChannel) { state = State.REQUEST_CONTENT_CLOSED; diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java new file mode 100644 index 00000000000..5cb3aa24f98 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java @@ -0,0 +1,68 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.logging; + +import com.yahoo.concurrent.DaemonThreadFactory; +import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +/** + * Abstract class that deals with storing event entries on disk and making sure all stored + * entries are eventually sent + * + * @author hmusum + */ +abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements Runnable { + + protected static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); + + private static final ScheduledExecutorService executorService = + new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("AbstractSpoolingLogger-send-")); + + protected final Spooler spooler; + + public AbstractSpoolingLogger() { + this(new Spooler()); + } + + public AbstractSpoolingLogger(Spooler spooler) { + this.spooler = spooler; + executorService.scheduleWithFixedDelay(this, 0, 10L, TimeUnit.MILLISECONDS); + } + + public void run() { + try { + var entries = spooler.processFiles(); + log.log(Level.INFO, "Entries: " + entries.size()); + entries.forEach(this::transport); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public boolean send(LoggerEntry entry) { + log.log(Level.INFO, "Sending"); + try { + executor.execute(() -> spooler.write(entry)); + } catch (RejectedExecutionException e) { + return false; + } + return true; + } + + // TODO Call from a component or make this class a component + public void shutdown() { + executorService.shutdown(); + try { + if ( ! executorService.awaitTermination(10, TimeUnit.SECONDS)) + log.log(Level.WARNING, "Timeout elapsed waiting for termination"); + } catch (InterruptedException e) { + log.log(Level.WARNING, "Failure when waiting for termination: " + e.getMessage()); + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java new file mode 100644 index 00000000000..1df236a037f --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java @@ -0,0 +1,110 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.search.logging; + +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +abstract class AbstractThreadedLogger implements Logger { + + private final static java.util.logging.Logger log = java.util.logging.Logger.getLogger(AbstractThreadedLogger.class.getName()); + + final static int DEFAULT_MAX_THREADS = 1; + final static int DEFAULT_QUEUE_SIZE = 1000; + + protected final WorkerThreadExecutor executor; + + AbstractThreadedLogger() { + this(DEFAULT_MAX_THREADS, DEFAULT_QUEUE_SIZE); + } + + AbstractThreadedLogger(int threads, int queueSize) { + executor = new WorkerThreadExecutor(threads, queueSize); + } + + AbstractThreadedLogger(int threads, int queueSize, ThreadFactory factory) { + executor = new WorkerThreadExecutor(threads, queueSize, factory); + } + + @Override + public boolean send(LoggerEntry entry) { + return enqueue(entry); + } + + protected boolean enqueue(LoggerEntry entry) { + // Todo: metric things + try { + executor.execute(() -> dequeue(entry)); + } catch (RejectedExecutionException e) { + return false; + } + return true; + } + + protected void dequeue(LoggerEntry entry) { + transport(entry); // This happens in worker thread + } + + /** + * Actually transports the entry to it's destination + */ + abstract void transport(LoggerEntry entry); + + + private static class WorkerThread extends Thread { + + public WorkerThread(Runnable r) { + super(r); + } + + @Override + public void run() { + try { + super.run(); + } catch (Exception e) { + log.log(Level.SEVERE, String.format("Error while sending logger entry: %s", e), e); + } + } + + } + + protected static class WorkerThreadExecutor implements Executor { + + protected final ThreadPoolExecutor executor; + + WorkerThreadExecutor(int threads, int queueSize) { + this(threads, queueSize, WorkerThread::new); + } + + WorkerThreadExecutor(int threads, int queueSize, ThreadFactory threadFactory) { + executor = new ThreadPoolExecutor( + threads, threads, + 0L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), + threadFactory); + } + + public void close() { + try { + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // + } finally { + executor.shutdownNow(); + } + } + + @Override + public void execute(Runnable r) { + executor.execute(r); + } + + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java new file mode 100644 index 00000000000..6d815059066 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java @@ -0,0 +1,27 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.search.logging; + +import java.io.FileWriter; +import java.io.IOException; + +public class LocalDiskLogger extends AbstractThreadedLogger { + + private String logFilePath; + + public LocalDiskLogger(LocalDiskLoggerConfig config) { + logFilePath = config.path(); + } + + @Override + void transport(LoggerEntry entry) { + String json = entry.toJson(); + try (FileWriter fw = new FileWriter(logFilePath, true)) { + fw.write(json); + fw.write(System.getProperty("line.separator")); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/logging/Logger.java b/container-search/src/main/java/com/yahoo/search/logging/Logger.java new file mode 100644 index 00000000000..3938b01c7b7 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/logging/Logger.java @@ -0,0 +1,13 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.search.logging; + +public interface Logger { + + default LoggerEntry.Builder newEntry() { + return new LoggerEntry.Builder(this); + } + + boolean send(LoggerEntry entry); + +} diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java new file mode 100644 index 00000000000..692de3e6cef --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java @@ -0,0 +1,127 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.search.logging; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.yahoo.search.Query; +import com.yahoo.slime.JsonDecoder; +import com.yahoo.slime.Slime; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.Base64; + +public class LoggerEntry { + + private final Long timestamp; + private final Query query; + private final ByteBuffer blob; + + private LoggerEntry(Builder builder) { + timestamp = builder.timestamp; // or set automatically if not set + query = builder.query; + blob = builder.blob; + } + + public Long timestamp() { + return timestamp; + } + + public Query query() { + return query; + } + + public String queryString() { + String queryString = null; + if (query != null) { + if (query.getHttpRequest() != null && query.getHttpRequest().getUri() != null) { + queryString = query.getHttpRequest().getUri().getPath(); + if (query.getHttpRequest().getUri().getQuery() != null) { + queryString += "?" + query.getHttpRequest().getUri().getRawQuery(); + } + } + } + return queryString; + } + + public ByteBuffer blob() { + return blob; + } + + public String toString() { + return toJson(); + } + + public String toJson() { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); + g.writeStartObject(); + + g.writeNumberField("timestamp", timestamp == null ? 0 : timestamp); + g.writeStringField("query", queryString()); + g.writeStringField("blob", Base64.getEncoder().encodeToString(blob.array())); + + g.writeEndObject(); + g.close(); + return out.toString(); + + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + // TODO: Rename method here and above? (serialize/deserialize) + // TODO: Use Slime or Jackson for both + public static LoggerEntry fromJson(byte[] content) throws IOException { + var decoder = new JsonDecoder(); + var slime = decoder.decode(new Slime(), content); + + var timestamp = slime.get().field("timestamp").asLong(); + var query = new Query(slime.get().field("query").asString()); + var blob = slime.get().field("blob").asData(); + + return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob)); + } + + public static class Builder { + + private final Logger logger; + + private Long timestamp; + private Query query; + private ByteBuffer blob; + + // For testing + public Builder() { this(entry -> false); } + + public Builder(Logger logger) { + this.logger = logger; + } + + public Builder timestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + public Builder query(Query query) { + this.query = query; + return this; + } + + public Builder blob(byte[] bytes) { + blob = ByteBuffer.allocate(bytes.length); + blob.put(bytes).limit(blob.position()).position(0); + return this; + } + + public boolean send() { + return logger.send(new LoggerEntry(this)); + } + + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java new file mode 100644 index 00000000000..8fa51a0b47c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -0,0 +1,175 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.logging; + +import ai.vespa.validation.Validation; +import com.yahoo.vespa.defaults.Defaults; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.stream.Stream; + +/** + * Spooler that will write an entry to a file and read files that are ready to be sent + * + * @author hmusum + */ +public class Spooler { + + private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); + private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events")); + private static final Comparator<File> ordering = new TimestampCompare(); + + private Path processingPath; + private Path readyPath; + private Path failuresPath; + private Path successesPath; + + AtomicInteger fileCounter = new AtomicInteger(1); + + private final Path spoolPath; + + public Spooler() { + this(defaultSpoolPath); + } + + public Spooler(Path spoolPath) { + this.spoolPath = spoolPath; + createDirs(spoolPath); + } + + public boolean write(LoggerEntry entry) { + writeEntry(entry); + + return true; + } + + public List<LoggerEntry> processFiles() throws IOException { + List<Path> files = listFilesInPath(readyPath); + + if (files.size() == 0) { + log.log(Level.INFO, "No files in ready path " + readyPath.toFile().getAbsolutePath()); + return List.of(); + } + log.log(Level.FINE, "Files in ready path: " + files.size()); + + List<File> fileList = getFiles(files, 50); // TODO + if ( ! fileList.isEmpty()) { + return processFiles(fileList); + } + + return List.of(); + } + + List<Path> listFilesInPath(Path path) throws IOException { + List<Path> files; + System.out.println("Path " + path + " exists: " + path.toFile().exists()); + try (Stream<Path> stream = Files.list(path)) { + files = stream.toList(); + // TODO: Or check if stream is empty + } catch (NoSuchFileException e) { + return List.of(); // No files, this is OK + } + return files; + } + + public ArrayList<LoggerEntry> processFiles(List<File> files) throws IOException { + ArrayList<LoggerEntry> entries = new ArrayList<>(); + for (File f : files) { + log.log(Level.INFO, "Found file " + f); + var content = Files.readAllBytes(f.toPath()); + var entry = LoggerEntry.fromJson(content); + Path file = f.toPath(); + Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); + Files.move(file, target); + entries.add(entry); + } + return entries; + } + + public Path processingPath() { return processingPath; } + public Path readyPath() { return readyPath; } + public Path successesPath() { return successesPath; } + public Path failuresPath() { return failuresPath; } + + List<File> getDirectories(File[] files) { + List<File> fileList = new ArrayList<>(); + + for (File f : files) { + if (f.isDirectory()) { + fileList.add(f); + } + } + + Collections.sort(fileList); + return fileList; + } + + List<File> getFiles(List<Path> files, int count) { + Validation.requireAtLeast(count, "count must be a positive number", 1); + List<File> fileList = new ArrayList<>(); + + for (Path p : files) { + File f = p.toFile(); + if (!f.isDirectory()) { + fileList.add(f); + } + + // Grab only some files + if (fileList.size() > count) { + break; + } + } + + fileList.sort(ordering); + return fileList; + } + + private void writeEntry(LoggerEntry entry) { + String fileName = String.valueOf(fileCounter); + Path file = spoolPath.resolve(processingPath).resolve(fileName); + try { + Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); + log.log(Level.INFO, "Moving file from " + file + " to " + target); + Files.move(file, target); + fileCounter.addAndGet(1); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void createDirs(Path spoolerPath) { + processingPath = createDir(spoolerPath.resolve("processing")); + readyPath = createDir(spoolerPath.resolve("ready")); + failuresPath = createDir(spoolerPath.resolve("failures")); + successesPath = createDir(spoolerPath.resolve("successes")); + } + + private static Path createDir(Path path) { + File file = path.toFile(); + if (file.exists() && file.canRead() && file.canWrite()) { + log.log(Level.INFO, "Directory " + path + " already exists"); + } else if (file.mkdirs()) { + log.log(Level.INFO, "Created " + path); + } else { + log.log(Level.WARNING, "Could not create " + path + ", please check permissions"); + } + return path; + } + + private static class TimestampCompare implements Comparator<File> { + public int compare(File a, File b) { + return (int) (a.lastModified() - b.lastModified()); + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/logging/package-info.java b/container-search/src/main/java/com/yahoo/search/logging/package-info.java new file mode 100644 index 00000000000..e2bc14f4faf --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/logging/package-info.java @@ -0,0 +1,6 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +@ExportPackage +package com.yahoo.search.logging; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def b/container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def new file mode 100644 index 00000000000..fc42c0dbd20 --- /dev/null +++ b/container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def @@ -0,0 +1,5 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +namespace=search.logging + +## Path where logs should be put +path string default=/dev/null diff --git a/container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java b/container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java new file mode 100644 index 00000000000..bf6938424d7 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java @@ -0,0 +1,50 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.search.logging; + +import com.yahoo.io.IOUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Base64; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class LocalDiskLoggerTest { + + @TempDir + Path tempDir; + + @Test + public void testLocalDiskLogger() throws InterruptedException, IOException { + File logFile = tempDir.resolve("localdisklogger.log").toFile(); + + LocalDiskLoggerConfig.Builder builder = new LocalDiskLoggerConfig.Builder(); + builder.path(logFile.getAbsolutePath()); + LocalDiskLogger logger = new LocalDiskLogger(builder.build()); + + logger.newEntry() + .blob("Yo entry".getBytes()) + .send(); + waitForFile(logFile); + + String test = IOUtils.readAll(new FileReader(logFile)); + assertTrue(test.contains(Base64.getEncoder().encodeToString("Yo entry".getBytes()))); + } + + private void waitForFile(File file) throws InterruptedException { + int waitFor = 10; + while ( ! file.exists() && --waitFor > 0) { + Thread.sleep(10); + } + if ( ! file.exists()) { + fail("Local disk logger file was not created"); + } + } + +} diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java new file mode 100644 index 00000000000..abd3b27b70b --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java @@ -0,0 +1,107 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.logging; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SpoolerTest { + + @TempDir + Path tempDir; + + @Test + public void testSpoolingLogger() throws IOException { + Path spoolDir = tempDir.resolve("spool"); + + Spooler spooler = new Spooler(spoolDir); + + TestLogger logger = new TestLogger(spooler); + assertTrue(logger.newEntry() + .blob("Yo entry".getBytes()) + .send()); + assertTrue(logger.newEntry() + .blob("Yo entry 2".getBytes()) + .send()); + + Path readyPath = spooler.readyPath(); + Path readyFile1 = readyPath.resolve("1"); + waitUntilFileExists(readyFile1); + Path readyFile2 = readyPath.resolve("2"); + waitUntilFileExists(readyFile2); + + // Check content after being moved to ready path + String content = Files.readString(readyFile1); + assertTrue(content.contains(Base64.getEncoder().encodeToString("Yo entry".getBytes()))); + assertTrue(Files.readString(readyFile2).contains(Base64.getEncoder().encodeToString("Yo entry 2".getBytes()))); + + // Process files (read, transport files) + logger.manualRun(); + assertEquals(2, logger.entriesSent()); + + // No files in processing or ready, 2 files in successes + assertEquals(0, spooler.listFilesInPath(spooler.processingPath()).size()); + assertEquals(0, spooler.listFilesInPath(readyPath).size()); + assertEquals(2, spooler.listFilesInPath(spooler.successesPath()).size()); + assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size()); + } + + private void waitUntilFileExists(Path path) { + Instant end = Instant.now().plus(Duration.ofSeconds(1)); + while (!path.toFile().exists() && Instant.now().isBefore(end)) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + assertTrue(path.toFile().exists()); + } + + + private static class TestLogger extends AbstractSpoolingLogger { + + private final List<LoggerEntry> entriesSent = new ArrayList<>(); + + public TestLogger(Spooler spooler) { + super(spooler); + } + + @Override + void transport(LoggerEntry entry) { + System.out.println("Called transport()"); + entriesSent.add(entry); + } + + @Override + public void run() { + // Do nothing, use manualRun + } + + @Override + public boolean send(LoggerEntry entry) { + return spooler.write(entry); + } + + public void manualRun() { + super.run(); + } + + int entriesSent() { + return entriesSent.size(); + } + + } + +} diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/archive/CuratorArchiveBucketDb.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/archive/CuratorArchiveBucketDb.java index 0a0adcfc252..ac32fe5799d 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/archive/CuratorArchiveBucketDb.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/archive/CuratorArchiveBucketDb.java @@ -1,10 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.controller.archive; -import com.yahoo.config.provision.SystemName; import com.yahoo.config.provision.TenantName; import com.yahoo.config.provision.zone.ZoneId; -import com.yahoo.text.Text; import com.yahoo.vespa.hosted.controller.Controller; import com.yahoo.vespa.hosted.controller.api.integration.archive.ArchiveBucket; import com.yahoo.vespa.hosted.controller.api.integration.archive.ArchiveService; @@ -14,7 +12,6 @@ import java.net.URI; import java.util.HashSet; import java.util.Map; import java.util.Optional; -import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -34,12 +31,10 @@ public class CuratorArchiveBucketDb { private final ArchiveService archiveService; private final CuratorDb curatorDb; - private final SystemName system; public CuratorArchiveBucketDb(Controller controller) { this.archiveService = controller.serviceRegistry().archiveService(); this.curatorDb = controller.curator(); - this.system = controller.zoneRegistry().system(); } public Optional<URI> archiveUriFor(ZoneId zoneId, TenantName tenant, boolean createIfMissing) { diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ArchiveAccessMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ArchiveAccessMaintainer.java index 788360996ff..eed4fd0245d 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ArchiveAccessMaintainer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ArchiveAccessMaintainer.java @@ -73,7 +73,7 @@ public class ArchiveAccessMaintainer extends ControllerMaintainer { .filter(t -> t instanceof CloudTenant) .map(t -> (CloudTenant) t) .collect(Collectors.toUnmodifiableMap( - Tenant::name, cloudTenant -> cloudTenant.archiveAccess())); + Tenant::name, CloudTenant::archiveAccess)); } } diff --git a/eval/src/vespa/eval/instruction/simple_join_count.h b/eval/src/vespa/eval/instruction/simple_join_count.h index a566d2a7e68..1c74388d914 100644 --- a/eval/src/vespa/eval/instruction/simple_join_count.h +++ b/eval/src/vespa/eval/instruction/simple_join_count.h @@ -16,7 +16,7 @@ class SimpleJoinCount : public tensor_function::Op2 private: uint64_t _dense_factor; public: - SimpleJoinCount(const TensorFunction &lhs_in, const TensorFunction &rhs_in, size_t dense_factor_in); + SimpleJoinCount(const TensorFunction &lhs_in, const TensorFunction &rhs_in, uint64_t dense_factor_in); InterpretedFunction::Instruction compile_self(const ValueBuilderFactory &factory, Stash &stash) const override; bool result_is_mutable() const override { return true; } uint64_t dense_factor() const { return _dense_factor; } diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index 2dca3511044..d8e0ceb01bd 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -46,13 +46,13 @@ public class Flags { private static volatile TreeMap<FlagId, FlagDefinition> flags = new TreeMap<>(); - public static final UnboundBooleanFlag MAIN_CHAIN_GRAPH = defineFeatureFlag( - "main-chain-graph", true, - List.of("hakonhall"), "2022-07-06", "2022-10-05", - "Whether to run all tasks in the main task chain up to the one failing to converge (false), or " + - "run all tasks in the main task chain whose dependencies have converged (true). And when suspending, " + + public static final UnboundBooleanFlag ROOT_CHAIN_GRAPH = defineFeatureFlag( + "root-chain-graph", true, + List.of("hakonhall"), "2022-10-05", "2022-11-04", + "Whether to run all tasks in the root task chain up to the one failing to converge (false), or " + + "run all tasks in the root task chain whose dependencies have converged (true). And when suspending, " + "whether to run the tasks in sequence (false) or in reverse sequence (true).", - "On first tick of the main chain after (re)start of host admin.", + "On first tick of the root chain after (re)start of host admin.", ZONE_ID, NODE_TYPE, HOSTNAME); public static final UnboundDoubleFlag DEFAULT_TERM_WISE_LIMIT = defineDoubleFlag( diff --git a/parent/pom.xml b/parent/pom.xml index fd076c5cc35..f9155acbfd5 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -1061,7 +1061,7 @@ <org.json.version>20220320</org.json.version> <org.lz4.version>1.8.0</org.lz4.version> <prometheus.client.version>0.6.0</prometheus.client.version> - <protobuf.version>3.19.2</protobuf.version> + <protobuf.version>3.21.7</protobuf.version> <spifly.version>1.3.5</spifly.version> <surefire.version>2.22.2</surefire.version> <zookeeper.client.version>3.8.0</zookeeper.client.version> diff --git a/searchlib/src/vespa/searchlib/attribute/postingstore.cpp b/searchlib/src/vespa/searchlib/attribute/postingstore.cpp index 79ec976ac50..2aa05e7fa9f 100644 --- a/searchlib/src/vespa/searchlib/attribute/postingstore.cpp +++ b/searchlib/src/vespa/searchlib/attribute/postingstore.cpp @@ -8,6 +8,7 @@ #include <vespa/vespalib/btree/btreeiterator.hpp> #include <vespa/vespalib/btree/btreerootbase.cpp> #include <vespa/vespalib/datastore/datastore.hpp> +#include <vespa/vespalib/datastore/compacting_buffers.h> #include <vespa/vespalib/datastore/compaction_spec.h> #include <vespa/vespalib/datastore/entry_ref_filter.h> #include <vespa/vespalib/datastore/buffer_type.hpp> @@ -712,14 +713,14 @@ template <typename DataT> void PostingStore<DataT>::compact_worst_btree_nodes(const CompactionStrategy& compaction_strategy) { - auto to_hold = this->start_compact_worst_btree_nodes(compaction_strategy); + auto compacting_buffers = this->start_compact_worst_btree_nodes(compaction_strategy); EntryRefFilter filter(RefType::numBuffers(), RefType::offset_bits); // Only look at buffers containing bitvectors and btree roots filter.add_buffers(this->_treeType.get_active_buffers()); filter.add_buffers(_bvType.get_active_buffers()); _dictionary.foreach_posting_list([this](const std::vector<EntryRef>& refs) { move_btree_nodes(refs); }, filter); - this->finish_compact_worst_btree_nodes(to_hold); + compacting_buffers->finish(); } template <typename DataT> @@ -727,12 +728,11 @@ void PostingStore<DataT>::compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy) { - auto to_hold = this->start_compact_worst_buffers(compaction_spec, compaction_strategy); + auto compacting_buffers = this->start_compact_worst_buffers(compaction_spec, compaction_strategy); bool compact_btree_roots = false; - EntryRefFilter filter(RefType::numBuffers(), RefType::offset_bits); - filter.add_buffers(to_hold); + auto filter = compacting_buffers->make_entry_ref_filter(); // Start with looking at buffers being compacted - for (uint32_t buffer_id : to_hold) { + for (uint32_t buffer_id : compacting_buffers->get_buffer_ids()) { if (isBTree(_store.getBufferState(buffer_id).getTypeId())) { compact_btree_roots = true; } @@ -745,7 +745,7 @@ PostingStore<DataT>::compact_worst_buffers(CompactionSpec compaction_spec, const _dictionary.normalize_posting_lists([this](std::vector<EntryRef>& refs) { return move(refs); }, filter); - this->finishCompact(to_hold); + compacting_buffers->finish(); } template <typename DataT> diff --git a/searchlib/src/vespa/searchlib/tensor/small_subspaces_buffer_type.h b/searchlib/src/vespa/searchlib/tensor/small_subspaces_buffer_type.h index a778183c5a2..5622e9970b8 100644 --- a/searchlib/src/vespa/searchlib/tensor/small_subspaces_buffer_type.h +++ b/searchlib/src/vespa/searchlib/tensor/small_subspaces_buffer_type.h @@ -27,7 +27,7 @@ public: SmallSubspacesBufferType(const SmallSubspacesBufferType&) = delete; SmallSubspacesBufferType& operator=(const SmallSubspacesBufferType&) = delete; SmallSubspacesBufferType(SmallSubspacesBufferType&&) noexcept = default; - SmallSubspacesBufferType& operator=(SmallSubspacesBufferType&&) noexcept = default; + SmallSubspacesBufferType& operator=(SmallSubspacesBufferType&&) noexcept = delete; SmallSubspacesBufferType(uint32_t array_size, const AllocSpec& spec, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator, TensorBufferTypeMapper& type_mapper) noexcept; ~SmallSubspacesBufferType() override; void cleanHold(void* buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) override; diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/State.java b/vdslib/src/main/java/com/yahoo/vdslib/state/State.java index d4baff5861a..541364d90a9 100644 --- a/vdslib/src/main/java/com/yahoo/vdslib/state/State.java +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/State.java @@ -13,31 +13,25 @@ import java.util.ArrayList; public enum State { // The order declares the ordinals, and defines what states are above/below others - UNKNOWN ("-", false, true, true, false, false, false, false, false), // This state is used by the fleetcontroller to indicate - // that we have failed to contact the node. It should never be - // sent out of the fleetcontroller - MAINTENANCE ("m", false, false, false, true, true, false, true, true), - DOWN ("d", true, true, true, true, true, true, true, true), // Down is not valid reported state sent from the node itself. - STOPPING ("s", false, true, true, false, false, true, true, true), - INITIALIZING("i", false, true, true, false, false, true, true, true), - RETIRED ("r", false, false, false, false, true, false, true, true), - UP ("u", true, true, true, true, true, true, true, true); + UNKNOWN ("-", true, true, false, false, false, false, false), // This state is used by the fleetcontroller to indicate + // that we have failed to contact the node. It should never be + // sent out of the fleetcontroller + MAINTENANCE ("m", false, false, true, true, false, true, true), + DOWN ("d", true, true, true, true, true, true, true), // Down is not valid reported state sent from the node itself. + STOPPING ("s", true, true, false, false, true, true, true), + INITIALIZING("i", true, true, false, false, true, true, true), + RETIRED ("r", false, false, false, true, false, true, true), + UP ("u", true, true, true, true, true, true, true); - private final boolean validDiskState; private final boolean validClusterState; private final ArrayList<Boolean> validReportedNodeState = new ArrayList<>(); private final ArrayList<Boolean> validWantedNodeState = new ArrayList<>(); private final ArrayList<Boolean> validCurrentNodeState = new ArrayList<>(); private final String serializedAs; - private State(String serialized, boolean validDisk, boolean validDistReported, boolean validStorReported, - boolean validDistWanted, boolean validStorWanted, boolean validCluster, boolean validDistCurrent, - boolean validStorCurrent) - { - validDiskState = validDisk; + State(String serialized, boolean validDistReported, boolean validStorReported, boolean validDistWanted, + boolean validStorWanted, boolean validCluster, boolean validDistCurrent, boolean validStorCurrent) { validClusterState = validCluster; - assert(NodeType.STORAGE.ordinal() == 0); - assert(NodeType.DISTRIBUTOR.ordinal() == 1); validReportedNodeState.add(validStorReported); validReportedNodeState.add(validDistReported); validWantedNodeState.add(validStorWanted); @@ -56,7 +50,6 @@ public enum State { public String serialize() { return serializedAs; } - public boolean validDiskState() { return validDiskState; } public boolean validClusterState() { return validClusterState; } public boolean validReportedNodeState(NodeType type) { return validReportedNodeState.get(type.ordinal()); } public boolean validWantedNodeState(NodeType type) { return validWantedNodeState.get(type.ordinal()); } diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index df1e1006828..7aafb7c364e 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -58,6 +58,7 @@ vespa_define_module( src/tests/datastore/compact_buffer_candidates src/tests/datastore/datastore src/tests/datastore/fixed_size_hash_map + src/tests/datastore/free_list src/tests/datastore/sharded_hash_map src/tests/datastore/unique_store src/tests/datastore/unique_store_dictionary diff --git a/vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp b/vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp index 4716e91c2c4..c68ff07491e 100644 --- a/vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp +++ b/vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp @@ -64,8 +64,7 @@ public: uint32_t get(EntryRef ref) const { return _store.getEntry(ref); } uint32_t get_acquire(const AtomicEntryRef& ref) const { return get(ref.load_acquire()); } uint32_t get_relaxed(const AtomicEntryRef& ref) const { return get(ref.load_relaxed()); } - std::vector<uint32_t> start_compact(); - void finish_compact(std::vector<uint32_t> to_hold); + std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact(); static constexpr bool is_indirect = true; static uint32_t get_offset_bits() { return StoreRefType::offset_bits; } static uint32_t get_num_buffers() { return StoreRefType::numBuffers(); } @@ -79,19 +78,13 @@ RealIntStore::RealIntStore() RealIntStore::~RealIntStore() = default; -std::vector<uint32_t> +std::unique_ptr<vespalib::datastore::CompactingBuffers> RealIntStore::start_compact() { // Use a compaction strategy that will compact all active buffers CompactionStrategy compaction_strategy(0.0, 0.0, get_num_buffers(), 1.0); CompactionSpec compaction_spec(true, false); - return _store.startCompactWorstBuffers(compaction_spec, compaction_strategy); -} - -void -RealIntStore::finish_compact(std::vector<uint32_t> to_hold) -{ - _store.finishCompact(to_hold); + return _store.start_compact_worst_buffers(compaction_spec, compaction_strategy); } EntryRef @@ -347,9 +340,8 @@ void Fixture<Params>::compact_keys() { if constexpr (KeyStore::is_indirect) { - auto to_hold = _keys.start_compact(); - EntryRefFilter filter(_keys.get_num_buffers(), _keys.get_offset_bits()); - filter.add_buffers(to_hold); + auto compacting_buffers = _keys.start_compact(); + auto filter = compacting_buffers->make_entry_ref_filter(); auto itr = _tree.begin(); while (itr.valid()) { auto old_ref = itr.getKey().load_relaxed(); @@ -359,7 +351,7 @@ Fixture<Params>::compact_keys() } ++itr; } - _keys.finish_compact(std::move(to_hold)); + compacting_buffers->finish(); } _compact_keys.track_compacted(); } @@ -369,9 +361,8 @@ void Fixture<Params>::compact_values() { if constexpr (ValueStore::is_indirect) { - auto to_hold = _values.start_compact(); - EntryRefFilter filter(_values.get_num_buffers(), _values.get_offset_bits()); - filter.add_buffers(to_hold); + auto compacting_buffers = _values.start_compact(); + auto filter = compacting_buffers->make_entry_ref_filter(); auto itr = _tree.begin(); while (itr.valid()) { auto old_ref = itr.getData().load_relaxed(); @@ -381,7 +372,7 @@ Fixture<Params>::compact_values() } ++itr; } - _values.finish_compact(std::move(to_hold)); + compacting_buffers->finish(); } _compact_values.track_compacted(); } diff --git a/vespalib/src/tests/btree/btree_store/btree_store_test.cpp b/vespalib/src/tests/btree/btree_store/btree_store_test.cpp index 5e2aa89b59e..4da34c64ed9 100644 --- a/vespalib/src/tests/btree/btree_store/btree_store_test.cpp +++ b/vespalib/src/tests/btree/btree_store/btree_store_test.cpp @@ -5,7 +5,9 @@ #include <vespa/vespalib/btree/btreeroot.hpp> #include <vespa/vespalib/btree/btreestore.hpp> #include <vespa/vespalib/datastore/buffer_type.hpp> +#include <vespa/vespalib/datastore/compacting_buffers.h> #include <vespa/vespalib/datastore/compaction_strategy.h> +#include <vespa/vespalib/datastore/entry_ref_filter.h> #include <vespa/vespalib/gtest/gtest.h> using vespalib::GenerationHandler; @@ -114,7 +116,6 @@ void BTreeStoreTest::test_compact_sequence(uint32_t sequence_length) { auto &store = _store; - uint32_t entry_ref_offset_bits = TreeStore::RefType::offset_bits; EntryRef ref1 = add_sequence(4, 4 + sequence_length); EntryRef ref2 = add_sequence(5, 5 + sequence_length); std::vector<EntryRef> refs; @@ -136,13 +137,10 @@ BTreeStoreTest::test_compact_sequence(uint32_t sequence_length) for (uint32_t pass = 0; pass < 15; ++pass) { CompactionSpec compaction_spec(true, false); CompactionStrategy compaction_strategy; - auto to_hold = store.start_compact_worst_buffers(compaction_spec, compaction_strategy); - std::vector<bool> filter(TreeStore::RefType::numBuffers()); - for (auto buffer_id : to_hold) { - filter[buffer_id] = true; - } + auto compacting_buffers = store.start_compact_worst_buffers(compaction_spec, compaction_strategy); + auto filter = compacting_buffers->make_entry_ref_filter(); for (auto& ref : refs) { - if (ref.valid() && filter[ref.buffer_id(entry_ref_offset_bits)]) { + if (ref.valid() && filter.has(ref)) { move_refs.emplace_back(ref); change_writer.emplace_back(ref); } @@ -150,7 +148,7 @@ BTreeStoreTest::test_compact_sequence(uint32_t sequence_length) store.move(move_refs); change_writer.write(move_refs); move_refs.clear(); - store.finishCompact(to_hold); + compacting_buffers->finish(); inc_generation(); } EXPECT_NE(ref1, refs[0]); @@ -174,9 +172,9 @@ TEST_F(BTreeStoreTest, require_that_nodes_for_multiple_btrees_are_compacted) auto usage_before = store.getMemoryUsage(); for (uint32_t pass = 0; pass < 15; ++pass) { CompactionStrategy compaction_strategy; - auto to_hold = store.start_compact_worst_btree_nodes(compaction_strategy); + auto compacting_buffers = store.start_compact_worst_btree_nodes(compaction_strategy); store.move_btree_nodes(refs); - store.finish_compact_worst_btree_nodes(to_hold); + compacting_buffers->finish(); inc_generation(); } EXPECT_EQ(make_exp_sequence(4, 40), get_sequence(refs[0])); diff --git a/vespalib/src/tests/datastore/array_store/array_store_test.cpp b/vespalib/src/tests/datastore/array_store/array_store_test.cpp index 2ff2897461b..1e8632aee95 100644 --- a/vespalib/src/tests/datastore/array_store/array_store_test.cpp +++ b/vespalib/src/tests/datastore/array_store/array_store_test.cpp @@ -205,11 +205,11 @@ VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(NumberStoreFreeListsDisabledMultiTest, TEST_P(NumberStoreTest, control_static_sizes) { #ifdef _LIBCPP_VERSION - EXPECT_EQ(440u, sizeof(f.store)); - EXPECT_EQ(296u, sizeof(NumberStoreTest::ArrayStoreType::DataStoreType)); + EXPECT_EQ(464u, sizeof(store)); + EXPECT_EQ(304u, sizeof(NumberStoreTest::ArrayStoreType::DataStoreType)); #else - EXPECT_EQ(488u, sizeof(store)); - EXPECT_EQ(328u, sizeof(NumberStoreTest::ArrayStoreType::DataStoreType)); + EXPECT_EQ(496u, sizeof(store)); + EXPECT_EQ(336u, sizeof(NumberStoreTest::ArrayStoreType::DataStoreType)); #endif EXPECT_EQ(112u, sizeof(NumberStoreTest::ArrayStoreType::SmallBufferType)); MemoryUsage usage = store.getMemoryUsage(); diff --git a/vespalib/src/tests/datastore/free_list/CMakeLists.txt b/vespalib/src/tests/datastore/free_list/CMakeLists.txt new file mode 100644 index 00000000000..97aff6d7ae7 --- /dev/null +++ b/vespalib/src/tests/datastore/free_list/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_datastore_free_list_test_app TEST + SOURCES + free_list_test.cpp + DEPENDS + vespalib + GTest::GTest +) +vespa_add_test(NAME vespalib_datastore_free_list_test_app COMMAND vespalib_datastore_free_list_test_app) diff --git a/vespalib/src/tests/datastore/free_list/free_list_test.cpp b/vespalib/src/tests/datastore/free_list/free_list_test.cpp new file mode 100644 index 00000000000..d80020d3dc5 --- /dev/null +++ b/vespalib/src/tests/datastore/free_list/free_list_test.cpp @@ -0,0 +1,127 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/datastore/entryref.hpp> +#include <vespa/vespalib/datastore/free_list.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vector> + +using namespace vespalib::datastore; + +using MyEntryRef = EntryRefT<8, 4>; +constexpr uint32_t array_size = 6; + +struct FreeListTest : public testing::Test +{ + FreeList list; + std::atomic<ElemCount> dead_elems; + std::vector<BufferFreeList> bufs; + FreeListTest() + : list(), + bufs() + { + for (size_t i = 0; i < 3; ++i) { + bufs.emplace_back(dead_elems); + bufs.back().on_active(array_size); + } + } + void TearDown() override { + for (auto& buf : bufs) { + buf.disable(); + } + } + void enable(uint32_t buffer_id) { + bufs[buffer_id].enable(list); + } + void enable_all() { + for (auto& buf : bufs) { + buf.enable(list); + } + } + void push_entry(MyEntryRef ref) { + bufs[ref.bufferId()].push_entry(ref); + } + MyEntryRef pop_entry() { + return {list.pop_entry()}; + } +}; + +TEST_F(FreeListTest, entry_refs_are_reused_in_lifo_order) +{ + enable(0); + push_entry({10, 0}); + push_entry({11, 0}); + push_entry({12, 0}); + EXPECT_EQ(MyEntryRef(12, 0), pop_entry()); + EXPECT_EQ(MyEntryRef(11, 0), pop_entry()); + EXPECT_EQ(MyEntryRef(10, 0), pop_entry()); +} + +TEST_F(FreeListTest, buffer_free_list_attaches_and_detaches_from_free_list) +{ + enable(0); + EXPECT_TRUE(list.empty()); + push_entry({10, 0}); + EXPECT_EQ(1, list.size()); + push_entry({11, 0}); + pop_entry(); + EXPECT_EQ(1, list.size()); + pop_entry(); + EXPECT_TRUE(list.empty()); +} + +TEST_F(FreeListTest, disable_clears_all_entry_refs_and_detaches_from_free_list) +{ + enable(0); + push_entry({10, 0}); + EXPECT_EQ(1, list.size()); + EXPECT_FALSE(bufs[0].empty()); + EXPECT_TRUE(bufs[0].enabled()); + + bufs[0].disable(); + EXPECT_TRUE(list.empty()); + EXPECT_TRUE(bufs[0].empty()); + EXPECT_FALSE(bufs[0].enabled()); +} + +TEST_F(FreeListTest, buffer_free_lists_are_reused_in_lifo_order) +{ + enable_all(); + EXPECT_TRUE(list.empty()); + push_entry({10, 0}); + EXPECT_EQ(1, list.size()); + push_entry({11, 0}); + push_entry({20, 1}); + EXPECT_EQ(2, list.size()); + push_entry({21, 1}); + push_entry({30, 2}); + EXPECT_EQ(3, list.size()); + push_entry({31, 2}); + + EXPECT_EQ(MyEntryRef(31, 2), pop_entry()); + EXPECT_EQ(MyEntryRef(30, 2), pop_entry()); + EXPECT_EQ(2, list.size()); + EXPECT_EQ(MyEntryRef(21, 1), pop_entry()); + EXPECT_EQ(MyEntryRef(20, 1), pop_entry()); + EXPECT_EQ(1, list.size()); + EXPECT_EQ(MyEntryRef(11, 0), pop_entry()); + + push_entry({32, 2}); + EXPECT_EQ(2, list.size()); + + EXPECT_EQ(MyEntryRef(32, 2), pop_entry()); + EXPECT_EQ(1, list.size()); + EXPECT_EQ(MyEntryRef(10, 0), pop_entry()); + EXPECT_TRUE(list.empty()); +} + +TEST_F(FreeListTest, dead_elems_count_is_updated_when_popping_an_entry) +{ + enable(0); + push_entry({10, 0}); + dead_elems.store(18, std::memory_order_relaxed); + pop_entry(); + EXPECT_EQ(18 - array_size, dead_elems.load(std::memory_order_relaxed)); +} + +GTEST_MAIN_RUN_ALL_TESTS() + diff --git a/vespalib/src/vespa/vespalib/btree/btree.hpp b/vespalib/src/vespa/vespalib/btree/btree.hpp index 473d1f4735e..c6d8886254d 100644 --- a/vespalib/src/vespa/vespalib/btree/btree.hpp +++ b/vespalib/src/vespa/vespalib/btree/btree.hpp @@ -28,9 +28,9 @@ template <typename KeyT, typename DataT, typename AggrT, typename CompareT, void BTree<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::compact_worst(const datastore::CompactionStrategy& compaction_strategy) { - auto to_hold = _alloc.start_compact_worst(compaction_strategy); + auto compacting_buffers = _alloc.start_compact_worst(compaction_strategy); _tree.move_nodes(_alloc); - _alloc.finishCompact(to_hold); + compacting_buffers->finish(); } } diff --git a/vespalib/src/vespa/vespalib/btree/btreenodeallocator.h b/vespalib/src/vespa/vespalib/btree/btreenodeallocator.h index 27e73b3a2b6..86c9621f869 100644 --- a/vespalib/src/vespa/vespalib/btree/btreenodeallocator.h +++ b/vespalib/src/vespa/vespalib/btree/btreenodeallocator.h @@ -166,7 +166,7 @@ public: bool getCompacting(EntryRef ref) const { return _nodeStore.getCompacting(ref); } std::vector<uint32_t> startCompact() { return _nodeStore.startCompact(); } - std::vector<uint32_t> start_compact_worst(const CompactionStrategy& compaction_strategy) { return _nodeStore.start_compact_worst(compaction_strategy); } + std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact_worst(const CompactionStrategy& compaction_strategy) { return _nodeStore.start_compact_worst(compaction_strategy); } void finishCompact(const std::vector<uint32_t> &toHold) { return _nodeStore.finishCompact(toHold); diff --git a/vespalib/src/vespa/vespalib/btree/btreenodestore.h b/vespalib/src/vespa/vespalib/btree/btreenodestore.h index d4a5ae42ef8..d05ec840f83 100644 --- a/vespalib/src/vespa/vespalib/btree/btreenodestore.h +++ b/vespalib/src/vespa/vespalib/btree/btreenodestore.h @@ -6,6 +6,8 @@ #include "btreetraits.h" #include <vespa/vespalib/datastore/datastore.h> +namespace vespalib::datastore { class CompactingBuffers; } + namespace vespalib::btree { class BTreeNodeReclaimer @@ -160,7 +162,7 @@ public: std::vector<uint32_t> startCompact(); - std::vector<uint32_t> start_compact_worst(const CompactionStrategy& compaction_strategy); + std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact_worst(const CompactionStrategy& compaction_strategy); void finishCompact(const std::vector<uint32_t> &toHold); diff --git a/vespalib/src/vespa/vespalib/btree/btreenodestore.hpp b/vespalib/src/vespa/vespalib/btree/btreenodestore.hpp index 91953507eb0..0f9eeb9daec 100644 --- a/vespalib/src/vespa/vespalib/btree/btreenodestore.hpp +++ b/vespalib/src/vespa/vespalib/btree/btreenodestore.hpp @@ -3,6 +3,7 @@ #pragma once #include "btreenodestore.h" +#include <vespa/vespalib/datastore/compacting_buffers.h> #include <vespa/vespalib/datastore/compaction_spec.h> #include <vespa/vespalib/datastore/datastore.hpp> @@ -70,11 +71,11 @@ startCompact() template <typename KeyT, typename DataT, typename AggrT, size_t INTERNAL_SLOTS, size_t LEAF_SLOTS> -std::vector<uint32_t> +std::unique_ptr<vespalib::datastore::CompactingBuffers> BTreeNodeStore<KeyT, DataT, AggrT, INTERNAL_SLOTS, LEAF_SLOTS>:: start_compact_worst(const CompactionStrategy &compaction_strategy) { - return _store.startCompactWorstBuffers(datastore::CompactionSpec(true, false), compaction_strategy); + return _store.start_compact_worst_buffers(datastore::CompactionSpec(true, false), compaction_strategy); } template <typename KeyT, typename DataT, typename AggrT, diff --git a/vespalib/src/vespa/vespalib/btree/btreestore.h b/vespalib/src/vespa/vespalib/btree/btreestore.h index a79259c6e57..54bc397175d 100644 --- a/vespalib/src/vespa/vespalib/btree/btreestore.h +++ b/vespalib/src/vespa/vespalib/btree/btreestore.h @@ -149,13 +149,6 @@ public: KeyDataTypeRefPair allocKeyDataCopy(const KeyDataType *rhs, uint32_t clusterSize); - std::vector<uint32_t> - startCompact(); - - void - finishCompact(const std::vector<uint32_t> &toHold); - - const KeyDataType * lower_bound(const KeyDataType *b, const KeyDataType *e, const KeyType &key, CompareT comp); @@ -394,11 +387,10 @@ public: void foreach_frozen(EntryRef ref, FunctionType func) const; - std::vector<uint32_t> start_compact_worst_btree_nodes(const CompactionStrategy& compaction_strategy); - void finish_compact_worst_btree_nodes(const std::vector<uint32_t>& to_hold); + std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact_worst_btree_nodes(const CompactionStrategy& compaction_strategy); void move_btree_nodes(const std::vector<EntryRef>& refs); - std::vector<uint32_t> start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy); + std::unique_ptr<vespalib::datastore::CompactingBuffers> start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy); void move(std::vector<EntryRef>& refs); private: diff --git a/vespalib/src/vespa/vespalib/btree/btreestore.hpp b/vespalib/src/vespa/vespalib/btree/btreestore.hpp index c0985ff8f94..ffd337d642b 100644 --- a/vespalib/src/vespa/vespalib/btree/btreestore.hpp +++ b/vespalib/src/vespa/vespalib/btree/btreestore.hpp @@ -5,6 +5,7 @@ #include "btreestore.h" #include "btreebuilder.h" #include "btreebuilder.hpp" +#include <vespa/vespalib/datastore/compacting_buffers.h> #include <vespa/vespalib/datastore/compaction_spec.h> #include <vespa/vespalib/datastore/datastore.hpp> #include <vespa/vespalib/util/optimized.h> @@ -116,34 +117,6 @@ allocKeyDataCopy(const KeyDataType *rhs, uint32_t clusterSize) allocArray(vespalib::ConstArrayRef<KeyDataType>(rhs, clusterSize)); } - -template <typename KeyT, typename DataT, typename AggrT, typename CompareT, - typename TraitsT, typename AggrCalcT> -std::vector<uint32_t> -BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::startCompact() -{ - std::vector<uint32_t> ret = _store.startCompact(clusterLimit); - for (uint32_t clusterSize = 1; clusterSize <= clusterLimit; ++clusterSize) { - uint32_t typeId = clusterSize - 1; - std::vector<uint32_t> toHold = _store.startCompact(typeId); - for (auto i : toHold) { - ret.push_back(i); - } - } - return ret; -} - - -template <typename KeyT, typename DataT, typename AggrT, typename CompareT, - typename TraitsT, typename AggrCalcT> -void -BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>:: -finishCompact(const std::vector<uint32_t> &toHold) -{ - _store.finishCompact(toHold); -} - - template <typename KeyT, typename DataT, typename AggrT, typename CompareT, typename TraitsT, typename AggrCalcT> const typename BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>:: @@ -971,7 +944,7 @@ getAggregated(const EntryRef ref) const template <typename KeyT, typename DataT, typename AggrT, typename CompareT, typename TraitsT, typename AggrCalcT> -std::vector<uint32_t> +std::unique_ptr<vespalib::datastore::CompactingBuffers> BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>:: start_compact_worst_btree_nodes(const CompactionStrategy& compaction_strategy) { @@ -983,15 +956,6 @@ template <typename KeyT, typename DataT, typename AggrT, typename CompareT, typename TraitsT, typename AggrCalcT> void BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>:: -finish_compact_worst_btree_nodes(const std::vector<uint32_t>& to_hold) -{ - _allocator.finishCompact(to_hold); -} - -template <typename KeyT, typename DataT, typename AggrT, typename CompareT, - typename TraitsT, typename AggrCalcT> -void -BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>:: move_btree_nodes(const std::vector<EntryRef>& refs) { for (auto& ref : refs) { @@ -1006,12 +970,12 @@ move_btree_nodes(const std::vector<EntryRef>& refs) template <typename KeyT, typename DataT, typename AggrT, typename CompareT, typename TraitsT, typename AggrCalcT> -std::vector<uint32_t> +std::unique_ptr<vespalib::datastore::CompactingBuffers> BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>:: start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy) { freeze(); - return _store.startCompactWorstBuffers(compaction_spec, compaction_strategy); + return _store.start_compact_worst_buffers(compaction_spec, compaction_strategy); } template <typename KeyT, typename DataT, typename AggrT, typename CompareT, diff --git a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt index abbdc79c527..a2b561b6792 100644 --- a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt @@ -4,15 +4,19 @@ vespa_add_library(vespalib_vespalib_datastore OBJECT array_store.cpp array_store_config.cpp atomic_entry_ref.cpp + buffer_free_list.cpp buffer_type.cpp bufferstate.cpp - compaction_strategy.cpp compact_buffer_candidates.cpp + compacting_buffers.cpp + compaction_strategy.cpp + compaction_strategy.cpp datastore.cpp datastorebase.cpp - entryref.cpp entry_ref_filter.cpp + entryref.cpp fixed_size_hash_map.cpp + free_list.cpp large_array_buffer_type.cpp sharded_hash_map.cpp small_array_buffer_type.cpp diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.hpp b/vespalib/src/vespa/vespalib/datastore/array_store.hpp index 4fc13396f6b..e79398271fb 100644 --- a/vespalib/src/vespa/vespalib/datastore/array_store.hpp +++ b/vespalib/src/vespa/vespalib/datastore/array_store.hpp @@ -3,6 +3,7 @@ #pragma once #include "array_store.h" +#include "compacting_buffers.h" #include "compaction_spec.h" #include "entry_ref_filter.h" #include "datastore.hpp" @@ -150,24 +151,20 @@ template <typename EntryT, typename RefT, typename TypeMapperT> class CompactionContext : public ICompactionContext { private: using ArrayStoreType = ArrayStore<EntryT, RefT, TypeMapperT>; - DataStoreBase &_dataStore; ArrayStoreType &_store; - std::vector<uint32_t> _bufferIdsToCompact; + std::unique_ptr<vespalib::datastore::CompactingBuffers> _compacting_buffers; EntryRefFilter _filter; public: - CompactionContext(DataStoreBase &dataStore, - ArrayStoreType &store, - std::vector<uint32_t> bufferIdsToCompact) - : _dataStore(dataStore), - _store(store), - _bufferIdsToCompact(std::move(bufferIdsToCompact)), - _filter(RefT::numBuffers(), RefT::offset_bits) + CompactionContext(ArrayStoreType &store, + std::unique_ptr<vespalib::datastore::CompactingBuffers> compacting_buffers) + : _store(store), + _compacting_buffers(std::move(compacting_buffers)), + _filter(_compacting_buffers->make_entry_ref_filter()) { - _filter.add_buffers(_bufferIdsToCompact); } ~CompactionContext() override { - _dataStore.finishCompact(_bufferIdsToCompact); + _compacting_buffers->finish(); } void compact(vespalib::ArrayRef<AtomicEntryRef> refs) override { for (auto &atomic_entry_ref : refs) { @@ -186,9 +183,9 @@ template <typename EntryT, typename RefT, typename TypeMapperT> ICompactionContext::UP ArrayStore<EntryT, RefT, TypeMapperT>::compactWorst(CompactionSpec compaction_spec, const CompactionStrategy &compaction_strategy) { - std::vector<uint32_t> bufferIdsToCompact = _store.startCompactWorstBuffers(compaction_spec, compaction_strategy); + auto compacting_buffers = _store.start_compact_worst_buffers(compaction_spec, compaction_strategy); return std::make_unique<arraystore::CompactionContext<EntryT, RefT, TypeMapperT>> - (_store, *this, std::move(bufferIdsToCompact)); + (*this, std::move(compacting_buffers)); } template <typename EntryT, typename RefT, typename TypeMapperT> diff --git a/vespalib/src/vespa/vespalib/datastore/buffer_free_list.cpp b/vespalib/src/vespa/vespalib/datastore/buffer_free_list.cpp new file mode 100644 index 00000000000..0a440e3e867 --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/buffer_free_list.cpp @@ -0,0 +1,56 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "buffer_free_list.h" +#include "free_list.h" +#include <cassert> + +namespace vespalib::datastore { + +void +BufferFreeList::attach() +{ + assert(_free_list != nullptr); + _free_list->attach(*this); +} + +void +BufferFreeList::detach() +{ + assert(_free_list != nullptr); + _free_list->detach(*this); +} + +BufferFreeList::BufferFreeList(std::atomic<ElemCount>& dead_elems) + : _dead_elems(dead_elems), + _array_size(0), + _free_list(), + _free_refs() +{ +} + +BufferFreeList::~BufferFreeList() +{ + assert(_free_list == nullptr); + assert(_free_refs.empty()); +} + +void +BufferFreeList::enable(FreeList& free_list) +{ + assert(_free_list == nullptr); + assert(_free_refs.empty()); + _free_list = &free_list; +} + +void +BufferFreeList::disable() +{ + if (!empty()) { + detach(); + EntryRefArray().swap(_free_refs); + } + _free_list = nullptr; +} + +} + diff --git a/vespalib/src/vespa/vespalib/datastore/buffer_free_list.h b/vespalib/src/vespa/vespalib/datastore/buffer_free_list.h new file mode 100644 index 00000000000..9ad24f7f03e --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/buffer_free_list.h @@ -0,0 +1,56 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "entryref.h" +#include "buffer_type.h" +#include <vespa/vespalib/util/array.h> + +namespace vespalib::datastore { + +class FreeList; + +/** + * Class containing the free list for a single buffer. + * + * The free list is a stack of EntryRef's that can be reused. + */ +class BufferFreeList { +private: + using EntryRefArray = vespalib::Array<EntryRef>; + + std::atomic<ElemCount>& _dead_elems; + uint32_t _array_size; + FreeList* _free_list; + EntryRefArray _free_refs; + + void attach(); + void detach(); + +public: + BufferFreeList(std::atomic<ElemCount>& dead_elems); + ~BufferFreeList(); + void enable(FreeList& free_list); + void disable(); + + void on_active(uint32_t array_size) { _array_size = array_size; } + bool enabled() const { return _free_list != nullptr; } + bool empty() const { return _free_refs.empty(); } + void push_entry(EntryRef ref) { + if (empty()) { + attach(); + } + _free_refs.push_back(ref); + } + EntryRef pop_entry() { + EntryRef ret = _free_refs.back(); + _free_refs.pop_back(); + if (empty()) { + detach(); + } + _dead_elems.store(_dead_elems.load(std::memory_order_relaxed) - _array_size, std::memory_order_relaxed); + return ret; + } +}; + +} diff --git a/vespalib/src/vespa/vespalib/datastore/compacting_buffers.cpp b/vespalib/src/vespa/vespalib/datastore/compacting_buffers.cpp new file mode 100644 index 00000000000..e350ef5056e --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/compacting_buffers.cpp @@ -0,0 +1,38 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "compacting_buffers.h" +#include "datastorebase.h" +#include "entry_ref_filter.h" +#include <cassert> + +namespace vespalib::datastore { + +CompactingBuffers::CompactingBuffers(DataStoreBase& store, uint32_t num_buffers, uint32_t offset_bits, std::vector<uint32_t> buffer_ids) + : _store(store), + _num_buffers(num_buffers), + _offset_bits(offset_bits), + _buffer_ids(std::move(buffer_ids)) +{ +} + +CompactingBuffers::~CompactingBuffers() +{ + assert(_buffer_ids.empty()); +} + +void +CompactingBuffers::finish() +{ + _store.finishCompact(_buffer_ids); + _buffer_ids.clear(); +} + +EntryRefFilter +CompactingBuffers::make_entry_ref_filter() const +{ + EntryRefFilter filter(_num_buffers, _offset_bits); + filter.add_buffers(_buffer_ids); + return filter; +} + +} diff --git a/vespalib/src/vespa/vespalib/datastore/compacting_buffers.h b/vespalib/src/vespa/vespalib/datastore/compacting_buffers.h new file mode 100644 index 00000000000..87e698c4eca --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/compacting_buffers.h @@ -0,0 +1,32 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <cstdint> +#include <vector> + +namespace vespalib::datastore { + +class DataStoreBase; +class EntryRefFilter; + +/* + * Class representing the buffers currently being compacted in a data store. + */ +class CompactingBuffers +{ + DataStoreBase& _store; + uint32_t _num_buffers; + uint32_t _offset_bits; + std::vector<uint32_t> _buffer_ids; +public: + CompactingBuffers(DataStoreBase& store, uint32_t num_buffers, uint32_t offset_bits, std::vector<uint32_t> buffer_ids); + ~CompactingBuffers(); + DataStoreBase& get_store() const noexcept { return _store; } + const std::vector<uint32_t>& get_buffer_ids() const noexcept { return _buffer_ids; } + bool empty() const noexcept { return _buffer_ids.empty(); } + void finish(); + EntryRefFilter make_entry_ref_filter() const; +}; + +} diff --git a/vespalib/src/vespa/vespalib/datastore/datastore.hpp b/vespalib/src/vespa/vespalib/datastore/datastore.hpp index 4d09ffe4bc6..72d08460eea 100644 --- a/vespalib/src/vespa/vespalib/datastore/datastore.hpp +++ b/vespalib/src/vespa/vespalib/datastore/datastore.hpp @@ -13,7 +13,7 @@ namespace vespalib::datastore { template <typename RefT> DataStoreT<RefT>::DataStoreT() - : DataStoreBase(RefType::numBuffers(), RefType::offsetSize()) + : DataStoreBase(RefType::numBuffers(), RefType::offset_bits, RefType::offsetSize()) { } diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp index 60671afb1a0..79113f76941 100644 --- a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp +++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp @@ -2,6 +2,7 @@ #include "datastorebase.h" #include "compact_buffer_candidates.h" +#include "compacting_buffers.h" #include "compaction_spec.h" #include "compaction_strategy.h" #include <vespa/vespalib/util/array.hpp> @@ -79,7 +80,7 @@ public: } }; -DataStoreBase::DataStoreBase(uint32_t numBuffers, size_t maxArrays) +DataStoreBase::DataStoreBase(uint32_t numBuffers, uint32_t offset_bits, size_t maxArrays) : _buffers(numBuffers), _primary_buffer_ids(), _states(numBuffers), @@ -90,6 +91,7 @@ DataStoreBase::DataStoreBase(uint32_t numBuffers, size_t maxArrays) _elemHold1List(), _elemHold2List(), _numBuffers(numBuffers), + _offset_bits(offset_bits), _hold_buffer_count(0u), _maxArrays(maxArrays), _compaction_count(0u), @@ -529,8 +531,8 @@ DataStoreBase::markCompacting(uint32_t bufferId) inc_compaction_count(); } -std::vector<uint32_t> -DataStoreBase::startCompactWorstBuffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy) +std::unique_ptr<CompactingBuffers> +DataStoreBase::start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy) { // compact memory usage CompactBufferCandidates elem_buffers(_numBuffers, compaction_strategy.get_max_buffers(), compaction_strategy.get_active_buffers_ratio(), compaction_strategy.getMaxDeadBytesRatio() / 2, CompactionStrategy::DEAD_BYTES_SLACK); @@ -567,7 +569,7 @@ DataStoreBase::startCompactWorstBuffers(CompactionSpec compaction_spec, const Co for (auto buffer_id : result) { markCompacting(buffer_id); } - return result; + return std::make_unique<CompactingBuffers>(*this, _numBuffers, _offset_bits, std::move(result)); } void diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.h b/vespalib/src/vespa/vespalib/datastore/datastorebase.h index 20104670085..40730252139 100644 --- a/vespalib/src/vespa/vespalib/datastore/datastorebase.h +++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.h @@ -12,6 +12,7 @@ namespace vespalib::datastore { +class CompactingBuffers; class CompactionSpec; class CompactionStrategy; @@ -159,13 +160,14 @@ protected: ElemHold2List _elemHold2List; const uint32_t _numBuffers; + const uint32_t _offset_bits; uint32_t _hold_buffer_count; const size_t _maxArrays; mutable std::atomic<uint64_t> _compaction_count; vespalib::GenerationHolder _genHolder; - DataStoreBase(uint32_t numBuffers, size_t maxArrays); + DataStoreBase(uint32_t numBuffers, uint32_t offset_bits, size_t maxArrays); DataStoreBase(const DataStoreBase &) = delete; DataStoreBase &operator=(const DataStoreBase &) = delete; @@ -376,7 +378,7 @@ public: } uint32_t startCompactWorstBuffer(uint32_t typeId); - std::vector<uint32_t> startCompactWorstBuffers(CompactionSpec compaction_spec, const CompactionStrategy &compaction_strategy); + std::unique_ptr<CompactingBuffers> start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy &compaction_strategy); uint64_t get_compaction_count() const { return _compaction_count.load(std::memory_order_relaxed); } void inc_compaction_count() const { ++_compaction_count; } bool has_held_buffers() const noexcept { return _hold_buffer_count != 0u; } diff --git a/vespalib/src/vespa/vespalib/datastore/free_list.cpp b/vespalib/src/vespa/vespalib/datastore/free_list.cpp new file mode 100644 index 00000000000..6c96e51241c --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/free_list.cpp @@ -0,0 +1,36 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "free_list.h" +#include <cassert> + +namespace vespalib::datastore { + +FreeList::FreeList() + : _free_lists() +{ +} + +FreeList::~FreeList() +{ + assert(_free_lists.empty()); +} + +void +FreeList::attach(BufferFreeList& buf_list) +{ + _free_lists.push_back(&buf_list); +} + +void +FreeList::detach(BufferFreeList& buf_list) +{ + if (!_free_lists.empty() && (_free_lists.back() == &buf_list)) { + _free_lists.pop_back(); + return; + } + auto itr = std::find(_free_lists.begin(), _free_lists.end(), &buf_list); + assert(itr != _free_lists.end()); + _free_lists.erase(itr); +} + +} diff --git a/vespalib/src/vespa/vespalib/datastore/free_list.h b/vespalib/src/vespa/vespalib/datastore/free_list.h new file mode 100644 index 00000000000..39e1713e47d --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/free_list.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "buffer_free_list.h" +#include "entryref.h" +#include <vector> + +namespace vespalib::datastore { + +/** + * Class containing the free list for a single buffer type id. + * + * This consists of a stack of buffer free lists, + * where the newest attached is used when getting an EntryRef for reuse. + */ +class FreeList { +private: + std::vector<BufferFreeList*> _free_lists; + +public: + FreeList(); + ~FreeList(); + void attach(BufferFreeList& buf_list); + void detach(BufferFreeList& buf_list); + + bool empty() const { return _free_lists.empty(); } + size_t size() const { return _free_lists.size(); } + EntryRef pop_entry() { + return _free_lists.back()->pop_entry(); + } +}; + +} diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp index cbb8369e1f2..37a56bf2561 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp +++ b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp @@ -94,17 +94,17 @@ private: btree::NoAggregated, EntryComparatorWrapper, DictionaryTraits>; - using UniqueStoreRemapper<RefT>::_compacting_buffer; + using UniqueStoreRemapper<RefT>::_filter; using UniqueStoreRemapper<RefT>::_mapping; - DataStoreBase &_dataStore; IUniqueStoreDictionary &_dict; ICompactable &_store; - std::vector<uint32_t> _bufferIdsToCompact; + std::unique_ptr<CompactingBuffers> _compacting_buffers; void allocMapping() { _mapping.resize(RefT::numBuffers()); - for (const auto bufferId : _bufferIdsToCompact) { - BufferState &state = _dataStore.getBufferState(bufferId); + auto& data_store = _compacting_buffers->get_store(); + for (const auto bufferId : _compacting_buffers->get_buffer_ids()) { + BufferState &state = data_store.getBufferState(bufferId); _mapping[bufferId].resize(state.get_used_arrays()); } } @@ -122,34 +122,30 @@ private: } void fillMapping() { - _dict.move_keys(*this, _compacting_buffer); + _dict.move_keys(*this, _filter); } public: - CompactionContext(DataStoreBase &dataStore, - IUniqueStoreDictionary &dict, + CompactionContext(IUniqueStoreDictionary &dict, ICompactable &store, - std::vector<uint32_t> bufferIdsToCompact) - : UniqueStoreRemapper<RefT>(), + std::unique_ptr<CompactingBuffers> compacting_buffers) + : UniqueStoreRemapper<RefT>(compacting_buffers->make_entry_ref_filter()), ICompactable(), - _dataStore(dataStore), _dict(dict), _store(store), - _bufferIdsToCompact(std::move(bufferIdsToCompact)) + _compacting_buffers(std::move(compacting_buffers)) { - if (!_bufferIdsToCompact.empty()) { - _compacting_buffer.add_buffers(_bufferIdsToCompact); + if (!_compacting_buffers->empty()) { allocMapping(); fillMapping(); } } void done() override { - _dataStore.finishCompact(_bufferIdsToCompact); - _bufferIdsToCompact.clear(); + _compacting_buffers->finish(); } ~CompactionContext() override { - assert(_bufferIdsToCompact.empty()); + assert(_compacting_buffers->empty()); } }; @@ -159,11 +155,11 @@ template <typename EntryT, typename RefT, typename Compare, typename Allocator> std::unique_ptr<typename UniqueStore<EntryT, RefT, Compare, Allocator>::Remapper> UniqueStore<EntryT, RefT, Compare, Allocator>::compact_worst(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy) { - std::vector<uint32_t> bufferIdsToCompact = _store.startCompactWorstBuffers(compaction_spec, compaction_strategy); - if (bufferIdsToCompact.empty()) { + auto compacting_buffers = _store.start_compact_worst_buffers(compaction_spec, compaction_strategy); + if (compacting_buffers->empty()) { return std::unique_ptr<Remapper>(); } else { - return std::make_unique<uniquestore::CompactionContext<RefT>>(_store, *_dict, _allocator, std::move(bufferIdsToCompact)); + return std::make_unique<uniquestore::CompactionContext<RefT>>(*_dict, _allocator, std::move(compacting_buffers)); } } diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h b/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h index 4babd6204c7..174c74a62d2 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h @@ -19,11 +19,11 @@ public: using RefType = RefT; protected: - EntryRefFilter _compacting_buffer; + EntryRefFilter _filter; std::vector<std::vector<EntryRef, allocator_large<EntryRef>>> _mapping; public: - UniqueStoreRemapper() - : _compacting_buffer(RefT::numBuffers(), RefT::offset_bits), + UniqueStoreRemapper(EntryRefFilter&& filter) + : _filter(std::move(filter)), _mapping() { } @@ -41,13 +41,13 @@ public: void remap(vespalib::ArrayRef<AtomicEntryRef> refs) const { for (auto &atomic_ref : refs) { auto ref = atomic_ref.load_relaxed(); - if (ref.valid() && _compacting_buffer.has(ref)) { + if (ref.valid() && _filter.has(ref)) { atomic_ref.store_release(remap(ref)); } } } - const EntryRefFilter& get_entry_ref_filter() const noexcept { return _compacting_buffer; } + const EntryRefFilter& get_entry_ref_filter() const noexcept { return _filter; } virtual void done() = 0; }; |