summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2023-06-06 13:30:30 +0200
committerHarald Musum <musum@yahooinc.com>2023-06-06 13:30:30 +0200
commit09d00888d1cb60a92ba75b4707262e08987f1bad (patch)
tree599f0fefd708120031c51921e86f55e206623fba
parentd0c8f93a7b9dae9acae144857176c1dd1a375c6f (diff)
Track failures and move file to failures path if more than max failures
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java53
-rw-r--r--container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java99
2 files changed, 119 insertions, 33 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
index 46f7fbb0b3c..862be3848f2 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -5,7 +5,6 @@ import ai.vespa.validation.Validation;
import com.yahoo.vespa.defaults.Defaults;
import java.io.File;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
@@ -16,6 +15,8 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -53,21 +54,24 @@ public class Spooler {
private final Clock clock;
private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>();
private final boolean keepSuccessFiles;
+ private final int maxFailures;
+ private final Map<File, Integer> failures = new ConcurrentHashMap<>();
public Spooler(Clock clock) {
- this(clock, false);
+ this(clock, false, 1000);
}
- public Spooler(Clock clock, boolean keepSuccessFiles) {
- this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles);
+ public Spooler(Clock clock, boolean keepSuccessFiles, int maxFailures) {
+ this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles, maxFailures);
}
- public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles) {
+ Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles, int maxFailures) {
this.spoolPath = spoolPath;
this.maxEntriesPerFile = maxEntriesPerFile;
this.clock = clock;
this.fileNameBase.set(newFileNameBase(clock));
this.keepSuccessFiles = keepSuccessFiles;
+ this.maxFailures = maxFailures;
firstWriteTimestamp.set(Instant.EPOCH);
createDirs(spoolPath);
}
@@ -113,26 +117,39 @@ public class Spooler {
log.log(Level.FINE, "Read entry " + entry + " from " + f);
succcess = transport.apply(entry);
if (! succcess) {
- log.log(Level.WARNING, "unsuccessful call to transport() for " + entry);
+ throw new RuntimeException("Unable to process file " + f + ": unsuccessful call to transport() for " + entry);
}
- };
- } catch (IOException e) {
- throw new UncheckedIOException("Unable to process file " + f.toPath(), e);
- // TODO: Move to failures path
+ }
+ failures.remove(f);
+ } catch (Exception e) {
+ handleFailure(f);
} finally {
if (succcess && keepSuccessFiles) {
- Path file = f.toPath();
- Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
- try {
- Files.move(file, target);
- } catch (IOException e) {
- log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e);
- }
+ moveProcessedFile(f, successesPath);
}
}
}
}
+ private void handleFailure(File file) {
+ failures.putIfAbsent(file, 0);
+ var failCount = failures.compute(file, (f, count) -> count + 1);
+ if (failCount > maxFailures) {
+ log.log(Level.WARNING, "Unable to process file " + file + " after trying " + maxFailures + " times, moving it to " + failuresPath);
+ moveProcessedFile(file, failuresPath);
+ }
+ }
+
+ private void moveProcessedFile(File f, Path path) {
+ Path file = f.toPath();
+ Path target = spoolPath.resolve(path).resolve(f.toPath().relativize(file)).resolve(f.getName());
+ try {
+ Files.move(file, target);
+ } catch (IOException e) {
+ log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e);
+ }
+ }
+
public Path processingPath() { return processingPath; }
public Path readyPath() { return readyPath; }
public Path successesPath() { return successesPath; }
@@ -178,6 +195,8 @@ public class Spooler {
switchFileIfNeeded(file, fileName);
}
+ Map<File, Integer> failures() { return failures; }
+
private synchronized void switchFileIfNeeded(Path file, String fileName) throws IOException {
if (file.toFile().exists()
&& (entryCounter.get() >= maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) {
diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
index eb5d7f85481..dc8f82e4d5a 100644
--- a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
@@ -27,8 +27,7 @@ public class SpoolerTest {
public void testSpoolingLogger() throws IOException {
Path spoolDir = tempDir.resolve("spool");
- int maxEntriesPerFile = 1;
- Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile, clock, true);
+ Spooler spooler = createSpooler(spoolDir, 1);
TestLogger logger = new TestLogger(spooler);
assertTrue(sendEntry(logger, "Yo entry"));
@@ -53,14 +52,15 @@ public class SpoolerTest {
assertReadyFiles(spooler, 0);
assertSuccessFiles(spooler, 2);
assertFailureFiles(spooler, 0);
+
+ assertTrue(spooler.failures().isEmpty(), spooler.failures().toString());
}
@Test
public void testSpoolingManyEntriesPerFile() throws IOException {
Path spoolDir = tempDir.resolve("spool");
- int maxEntriesPerFile = 2;
- Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile, clock, true);
+ Spooler spooler = createSpooler(spoolDir, 2);
TestLogger logger = new TestLogger(spooler);
assertTrue(sendEntry(logger, "Yo entry"));
@@ -111,8 +111,8 @@ public class SpoolerTest {
@Test
public void failingToTransportIsRetried() throws IOException {
Path spoolDir = tempDir.resolve("spool");
- Spooler spooler = new Spooler(spoolDir, 1, clock, true);
- FailingToTransportSecondEntryLogger logger = new FailingToTransportSecondEntryLogger(spooler);
+ Spooler spooler = createSpooler(spoolDir, 1, true, 2);
+ FailingToTransportNthEntryLogger logger = new FailingToTransportNthEntryLogger(spooler, 2);
assertTrue(sendEntry(logger, "Yo entry"));
logger.manualRun(); // Success for first message
@@ -121,17 +121,47 @@ public class SpoolerTest {
assertTrue(sendEntry(logger, "Yo entry 2"));
logger.manualRun(); // Failure for second message, so still just 1 file in successes path
assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size());
+ assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size());
logger.manualRun(); // Success when retrying second message, so 2 files in successes path
assertEquals(2, spooler.listFilesInPath(spooler.successesPath()).size());
}
@Test
+ public void failingToTransportGivesUpAfterNTries() throws IOException {
+ Path spoolDir = tempDir.resolve("spool");
+ Spooler spooler = createSpooler(spoolDir, 1, true, 2);
+ FailingToTransportAfterNEntriesLogger logger = new FailingToTransportAfterNEntriesLogger(spooler, 2);
+
+ assertTrue(sendEntry(logger, "Yo entry"));
+ assertEquals(1, spooler.listFilesInPath(spooler.readyPath()).size());
+ logger.manualRun(); // Success for first message
+ assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size());
+ assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size());
+
+ assertTrue(sendEntry(logger, "Yo entry 2"));
+ assertEquals(1, spooler.listFilesInPath(spooler.readyPath()).size());
+ logger.manualRun(); // Failure for second message, so still just 1 file in successes path
+ assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size());
+ assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size());
+
+ logger.manualRun(); // Fails again, but should be retried
+ assertEquals(1, spooler.listFilesInPath(spooler.readyPath()).size());
+ assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size());
+ assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size());
+
+ logger.manualRun(); // Fails again, should be moved to failures path
+ assertEquals(0, spooler.listFilesInPath(spooler.readyPath()).size());
+ assertEquals(1, spooler.listFilesInPath(spooler.successesPath()).size());
+ assertEquals(1, spooler.listFilesInPath(spooler.failuresPath()).size());
+ }
+
+ @Test
public void noSuccessFiles() throws IOException {
Path spoolDir = tempDir.resolve("spool");
boolean keepSuccessFiles = false;
- Spooler spooler = new Spooler(spoolDir, 1, clock, keepSuccessFiles);
- FailingToTransportSecondEntryLogger logger = new FailingToTransportSecondEntryLogger(spooler);
+ Spooler spooler = createSpooler(spoolDir, 1, keepSuccessFiles, 2);
+ FailingToTransportNthEntryLogger logger = new FailingToTransportNthEntryLogger(spooler, 2);
assertTrue(sendEntry(logger, "Yo entry"));
logger.manualRun(); // Success for first message
@@ -163,6 +193,14 @@ public class SpoolerTest {
assertTrue(content.contains(Base64.getEncoder().encodeToString(expectedContent.getBytes())));
}
+ private static Spooler createSpooler(Path spoolDir, int maxEntriesPerFile) {
+ return new Spooler(spoolDir, maxEntriesPerFile, clock, true, 1000);
+ }
+
+ private static Spooler createSpooler(Path spoolDir, int maxEntriesPerFile, boolean keepSuccessFiles, int maxFailures) {
+ return new Spooler(spoolDir, maxEntriesPerFile, clock, keepSuccessFiles, maxFailures);
+ }
+
private static class TestLogger extends AbstractSpoolingLogger {
private final List<LoggerEntry> entriesSent = new ArrayList<>();
@@ -178,9 +216,7 @@ public class SpoolerTest {
}
@Override
- public void run() {
- // Do nothing, use manualRun
- }
+ public void run() {} // do nothing, call manualRun() to do something
@Override
public boolean send(LoggerEntry entry) {
@@ -198,12 +234,14 @@ public class SpoolerTest {
}
- private static class FailingToTransportSecondEntryLogger extends AbstractSpoolingLogger {
+ private static class FailingToTransportNthEntryLogger extends AbstractSpoolingLogger {
private int transportCount = 0;
+ private final int entriesToFail;
- public FailingToTransportSecondEntryLogger(Spooler spooler) {
+ public FailingToTransportNthEntryLogger(Spooler spooler, int entriesToFail) {
super(spooler);
+ this.entriesToFail = entriesToFail;
}
@Override
@@ -215,14 +253,43 @@ public class SpoolerTest {
@Override
public boolean transport(LoggerEntry entry) {
transportCount++;
- return transportCount != 2;
+ return transportCount != entriesToFail;
+ }
+
+ @Override
+ public void run() {} // do nothing, call manualRun() to do something
+
+ public void manualRun() {
+ super.run();
+ }
+
+ }
+
+ private static class FailingToTransportAfterNEntriesLogger extends AbstractSpoolingLogger {
+
+ private int transportCount = 0;
+ private final int entriesToFailAfter;
+
+ public FailingToTransportAfterNEntriesLogger(Spooler spooler, int entriesToFailAfter) {
+ super(spooler);
+ this.entriesToFailAfter = entriesToFailAfter;
}
@Override
- public void run() {
- // do nothing
+ public boolean send(LoggerEntry entry) {
+ spooler.write(entry);
+ return true;
}
+ @Override
+ public boolean transport(LoggerEntry entry) {
+ transportCount++;
+ return transportCount < entriesToFailAfter;
+ }
+
+ @Override
+ public void run() {} // do nothing, call manualRun() to do something
+
public void manualRun() {
super.run();
}