diff options
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/logging/Spooler.java | 53 | ||||
-rw-r--r-- | container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java | 99 |
2 files changed, 119 insertions, 33 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 46f7fbb0b3c..862be3848f2 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -5,7 +5,6 @@ import ai.vespa.validation.Validation; import com.yahoo.vespa.defaults.Defaults; import java.io.File; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -16,6 +15,8 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -53,21 +54,24 @@ public class Spooler { private final Clock clock; private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>(); private final boolean keepSuccessFiles; + private final int maxFailures; + private final Map<File, Integer> failures = new ConcurrentHashMap<>(); public Spooler(Clock clock) { - this(clock, false); + this(clock, false, 1000); } - public Spooler(Clock clock, boolean keepSuccessFiles) { - this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles); + public Spooler(Clock clock, boolean keepSuccessFiles, int maxFailures) { + this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles, maxFailures); } - public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles) { + Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles, int maxFailures) { this.spoolPath = spoolPath; this.maxEntriesPerFile = maxEntriesPerFile; this.clock = clock; this.fileNameBase.set(newFileNameBase(clock)); this.keepSuccessFiles = keepSuccessFiles; + this.maxFailures = maxFailures; firstWriteTimestamp.set(Instant.EPOCH); createDirs(spoolPath); } @@ -113,26 +117,39 @@ public class Spooler { log.log(Level.FINE, "Read entry " + entry + " from " + f); succcess = transport.apply(entry); if (! succcess) { - log.log(Level.WARNING, "unsuccessful call to transport() for " + entry); + throw new RuntimeException("Unable to process file " + f + ": unsuccessful call to transport() for " + entry); } - }; - } catch (IOException e) { - throw new UncheckedIOException("Unable to process file " + f.toPath(), e); - // TODO: Move to failures path + } + failures.remove(f); + } catch (Exception e) { + handleFailure(f); } finally { if (succcess && keepSuccessFiles) { - Path file = f.toPath(); - Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); - try { - Files.move(file, target); - } catch (IOException e) { - log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e); - } + moveProcessedFile(f, successesPath); } } } } + private void handleFailure(File file) { + failures.putIfAbsent(file, 0); + var failCount = failures.compute(file, (f, count) -> count + 1); + if (failCount > maxFailures) { + log.log(Level.WARNING, "Unable to process file " + file + " after trying " + maxFailures + " times, moving it to " + failuresPath); + moveProcessedFile(file, failuresPath); + } + } + + private void moveProcessedFile(File f, Path path) { + Path file = f.toPath(); + Path target = spoolPath.resolve(path).resolve(f.toPath().relativize(file)).resolve(f.getName()); + try { + Files.move(file, target); + } catch (IOException e) { + log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e); + } + } + public Path processingPath() { return processingPath; } public Path readyPath() { return readyPath; } public Path successesPath() { return successesPath; } @@ -178,6 +195,8 @@ public class Spooler { switchFileIfNeeded(file, fileName); } + Map<File, Integer> failures() { return failures; } + private synchronized void switchFileIfNeeded(Path file, String fileName) throws IOException { if (file.toFile().exists() && (entryCounter.get() >= maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) { diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java index eb5d7f85481..dc8f82e4d5a 100644 --- a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java +++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java @@ -27,8 +27,7 @@ public class SpoolerTest { public void testSpoolingLogger() throws IOException { Path spoolDir = tempDir.resolve("spool"); - int maxEntriesPerFile = 1; - Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile, clock, true); + Spooler spooler = createSpooler(spoolDir, 1); TestLogger logger = new TestLogger(spooler); assertTrue(sendEntry(logger, "Yo entry")); @@ -53,14 +52,15 @@ public class SpoolerTest { assertReadyFiles(spooler, 0); assertSuccessFiles(spooler, 2); assertFailureFiles(spooler, 0); + + assertTrue(spooler.failures().isEmpty(), spooler.failures().toString()); } @Test public void testSpoolingManyEntriesPerFile() throws IOException { Path spoolDir = tempDir.resolve("spool"); - int maxEntriesPerFile = 2; - Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile, clock, true); + Spooler spooler = createSpooler(spoolDir, 2); TestLogger logger = new TestLogger(spooler); assertTrue(sendEntry(logger, "Yo entry")); @@ -111,8 +111,8 @@ public class SpoolerTest { @Test public void failingToTransportIsRetried() throws IOException { Path spoolDir = tempDir.resolve("spool"); - Spooler spooler = new Spooler(spoolDir, 1, clock, true); - FailingToTransportSecondEntryLogger logger = new FailingToTransportSecondEntryLogger(spooler); + Spooler spooler = createSpooler(spoolDir, 1, true, 2); + FailingToTransportNthEntryLogger logger = new FailingToTransportNthEntryLogger(spooler, 2); assertTrue(sendEntry(logger, "Yo entry")); logger.manualRun(); // Success for first message @@ -121,17 +121,47 @@ public class SpoolerTest { assertTrue(sendEntry(logger, "Yo entry 2")); logger.manualRun(); // Failure for second message, so still just 1 file in successes path assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size()); + assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size()); logger.manualRun(); // Success when retrying second message, so 2 files in successes path assertEquals(2, spooler.listFilesInPath(spooler.successesPath()).size()); } @Test + public void failingToTransportGivesUpAfterNTries() throws IOException { + Path spoolDir = tempDir.resolve("spool"); + Spooler spooler = createSpooler(spoolDir, 1, true, 2); + FailingToTransportAfterNEntriesLogger logger = new FailingToTransportAfterNEntriesLogger(spooler, 2); + + assertTrue(sendEntry(logger, "Yo entry")); + assertEquals(1, spooler.listFilesInPath(spooler.readyPath()).size()); + logger.manualRun(); // Success for first message + assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size()); + assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size()); + + assertTrue(sendEntry(logger, "Yo entry 2")); + assertEquals(1, spooler.listFilesInPath(spooler.readyPath()).size()); + logger.manualRun(); // Failure for second message, so still just 1 file in successes path + assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size()); + assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size()); + + logger.manualRun(); // Fails again, but should be retried + assertEquals(1, spooler.listFilesInPath(spooler.readyPath()).size()); + assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size()); + assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size()); + + logger.manualRun(); // Fails again, should be moved to failures path + assertEquals(0, spooler.listFilesInPath(spooler.readyPath()).size()); + assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size()); + assertEquals(1, spooler.listFilesInPath(spooler.failuresPath()).size()); + } + + @Test public void noSuccessFiles() throws IOException { Path spoolDir = tempDir.resolve("spool"); boolean keepSuccessFiles = false; - Spooler spooler = new Spooler(spoolDir, 1, clock, keepSuccessFiles); - FailingToTransportSecondEntryLogger logger = new FailingToTransportSecondEntryLogger(spooler); + Spooler spooler = createSpooler(spoolDir, 1, keepSuccessFiles, 2); + FailingToTransportNthEntryLogger logger = new FailingToTransportNthEntryLogger(spooler, 2); assertTrue(sendEntry(logger, "Yo entry")); logger.manualRun(); // Success for first message @@ -163,6 +193,14 @@ public class SpoolerTest { assertTrue(content.contains(Base64.getEncoder().encodeToString(expectedContent.getBytes()))); } + private static Spooler createSpooler(Path spoolDir, int maxEntriesPerFile) { + return new Spooler(spoolDir, maxEntriesPerFile, clock, true, 1000); + } + + private static Spooler createSpooler(Path spoolDir, int maxEntriesPerFile, boolean keepSuccessFiles, int maxFailures) { + return new Spooler(spoolDir, maxEntriesPerFile, clock, keepSuccessFiles, maxFailures); + } + private static class TestLogger extends AbstractSpoolingLogger { private final List<LoggerEntry> entriesSent = new ArrayList<>(); @@ -178,9 +216,7 @@ public class SpoolerTest { } @Override - public void run() { - // Do nothing, use manualRun - } + public void run() {} // do nothing, call manualRun() to do something @Override public boolean send(LoggerEntry entry) { @@ -198,12 +234,14 @@ public class SpoolerTest { } - private static class FailingToTransportSecondEntryLogger extends AbstractSpoolingLogger { + private static class FailingToTransportNthEntryLogger extends AbstractSpoolingLogger { private int transportCount = 0; + private final int entriesToFail; - public FailingToTransportSecondEntryLogger(Spooler spooler) { + public FailingToTransportNthEntryLogger(Spooler spooler, int entriesToFail) { super(spooler); + this.entriesToFail = entriesToFail; } @Override @@ -215,14 +253,43 @@ public class SpoolerTest { @Override public boolean transport(LoggerEntry entry) { transportCount++; - return transportCount != 2; + return transportCount != entriesToFail; + } + + @Override + public void run() {} // do nothing, call manualRun() to do something + + public void manualRun() { + super.run(); + } + + } + + private static class FailingToTransportAfterNEntriesLogger extends AbstractSpoolingLogger { + + private int transportCount = 0; + private final int entriesToFailAfter; + + public FailingToTransportAfterNEntriesLogger(Spooler spooler, int entriesToFailAfter) { + super(spooler); + this.entriesToFailAfter = entriesToFailAfter; } @Override - public void run() { - // do nothing + public boolean send(LoggerEntry entry) { + spooler.write(entry); + return true; } + @Override + public boolean transport(LoggerEntry entry) { + transportCount++; + return transportCount < entriesToFailAfter; + } + + @Override + public void run() {} // do nothing, call manualRun() to do something + public void manualRun() { super.run(); } |