summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2022-10-19 09:35:20 +0200
committerGitHub <noreply@github.com>2022-10-19 09:35:20 +0200
commit0f2271b74221560e61063c13faf92d141e1991c0 (patch)
tree5887fff189b15fc577bd5c3f31b84c6c5c2254d9
parent733cb216bc1519e34749a5e358f7e12c962c96d6 (diff)
parentad237aba91d05c81d28be2bce185e1eab2ba9bd0 (diff)
Merge pull request #24500 from vespa-engine/hmusum/switch-to-new-file-based-on-time
Switch to a new file if some time after first write has passed
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java4
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java48
-rw-r--r--container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java17
3 files changed, 53 insertions, 16 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
index c9c6546711a..3569a28f563 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
@@ -3,6 +3,7 @@ package com.yahoo.search.logging;
import com.yahoo.concurrent.DaemonThreadFactory;
import java.io.IOException;
+import java.time.Clock;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -25,7 +26,7 @@ public abstract class AbstractSpoolingLogger extends AbstractThreadedLogger impl
protected final Spooler spooler;
public AbstractSpoolingLogger() {
- this(new Spooler());
+ this(new Spooler(Clock.systemUTC()));
}
public AbstractSpoolingLogger(Spooler spooler) {
@@ -35,6 +36,7 @@ public abstract class AbstractSpoolingLogger extends AbstractThreadedLogger impl
public void run() {
try {
+ spooler.switchFileIfNeeded();
spooler.processFiles(this::transport);
} catch (IOException e) {
e.printStackTrace();
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
index 2193d414458..e80437de0d3 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -10,10 +10,14 @@ import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.stream.Stream;
@@ -30,25 +34,32 @@ public class Spooler {
private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events"));
private static final Comparator<File> ordering = new TimestampCompare();
private static final int defaultMaxEntriesPerFile = 100;
+ // Max delay between first write to a file and when we should close file and move it for further processing
+ static final Duration maxDelayAfterFirstWrite = Duration.ofSeconds(10);
private Path processingPath;
private Path readyPath;
private Path failuresPath;
private Path successesPath;
+ // Count of entries for the file that is currently being written to
AtomicInteger entryCounter = new AtomicInteger(1);
AtomicInteger fileCounter = new AtomicInteger(1);
private final Path spoolPath;
private final int maxEntriesPerFile;
+ private final Clock clock;
+ private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>();
- public Spooler() {
- this(defaultSpoolPath, defaultMaxEntriesPerFile);
+ public Spooler(Clock clock) {
+ this(defaultSpoolPath, defaultMaxEntriesPerFile, clock);
}
- public Spooler(Path spoolPath, int maxEntriesPerFile) {
+ public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock) {
this.spoolPath = spoolPath;
this.maxEntriesPerFile = maxEntriesPerFile;
+ this.clock = clock;
+ firstWriteTimestamp.set(Instant.EPOCH);
createDirs(spoolPath);
}
@@ -106,7 +117,7 @@ public class Spooler {
try {
Files.move(file, target);
} catch (IOException e) {
- throw new UncheckedIOException(e);
+ log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target);
}
}
}
@@ -144,19 +155,32 @@ public class Spooler {
try {
log.log(Level.FINE, "Writing entry " + entryCounter.get() + " (" + entry.serialize() + ") to file " + fileName);
Files.writeString(file, entry.serialize() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
- Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
-
- if (entryCounter.incrementAndGet() > maxEntriesPerFile) {
- log.log(Level.INFO, "Moving file from " + file + " to " + target);
- Files.move(file, target);
- entryCounter.set(1);
- fileCounter.incrementAndGet();
- }
+ firstWriteTimestamp.compareAndExchange(Instant.EPOCH, clock.instant());
+ entryCounter.incrementAndGet();
+ switchFileIfNeeded(file, fileName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ void switchFileIfNeeded() throws IOException {
+ String fileName = String.valueOf(fileCounter);
+ Path file = spoolPath.resolve(processingPath).resolve(fileName);
+ switchFileIfNeeded(file, fileName);
+ }
+
+ private void switchFileIfNeeded(Path file, String fileName) throws IOException {
+ if (file.toFile().exists()
+ && (entryCounter.get() > maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) {
+ Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
+ log.log(Level.INFO, "Finished writing file " + file + " moving it to " + target);
+ Files.move(file, target);
+ entryCounter.set(1);
+ fileCounter.incrementAndGet();
+ firstWriteTimestamp.set(Instant.EPOCH);
+ }
+ }
+
private void createDirs(Path spoolerPath) {
processingPath = createDir(spoolerPath.resolve("processing"));
readyPath = createDir(spoolerPath.resolve("ready"));
diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
index 2899f5b67a0..d37a44148b0 100644
--- a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.logging;
+import com.yahoo.test.ManualClock;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
@@ -17,6 +18,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class SpoolerTest {
+ private static final ManualClock clock = new ManualClock();
+
@TempDir
Path tempDir;
@@ -25,7 +28,7 @@ public class SpoolerTest {
Path spoolDir = tempDir.resolve("spool");
int maxEntriesPerFile = 1;
- Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile);
+ Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile, clock);
TestLogger logger = new TestLogger(spooler);
assertTrue(sendEntry(logger, "Yo entry"));
@@ -57,7 +60,7 @@ public class SpoolerTest {
Path spoolDir = tempDir.resolve("spool");
int maxEntriesPerFile = 2;
- Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile);
+ Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile, clock);
TestLogger logger = new TestLogger(spooler);
assertTrue(sendEntry(logger, "Yo entry"));
@@ -79,6 +82,14 @@ public class SpoolerTest {
assertReadyFiles(spooler, 0);
assertSuccessFiles(spooler, 1);
assertFailureFiles(spooler, 0);
+
+ // Write 1 entry and advance time, so that file will be processed even if
+ // maxEntriesPerFile is 2 and there is only 1 entry in file
+ assertTrue(sendEntry(logger, "Yo entry 3"));
+ clock.advance(Duration.ofMinutes(1));
+ logger.manualRun();
+ assertEquals(3, logger.entriesSent());
+ assertSuccessFiles(spooler, 2);
}
private void assertProcessedFiles(Spooler spooler, int expected) throws IOException {
@@ -100,7 +111,7 @@ public class SpoolerTest {
@Test
public void failingToTransportIsRetried() throws IOException {
Path spoolDir = tempDir.resolve("spool");
- Spooler spooler = new Spooler(spoolDir, 1);
+ Spooler spooler = new Spooler(spoolDir, 1, clock);
FailingToTransportSecondEntryLogger logger = new FailingToTransportSecondEntryLogger(spooler);
assertTrue(sendEntry(logger, "Yo entry"));