diff options
author | Harald Musum <musum@yahooinc.com> | 2022-10-15 13:44:31 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2022-10-15 13:44:31 +0200 |
commit | 682be49b09b9b5d2585c79f62d75305daa8054bd (patch) | |
tree | aa9671dae9e751616b4986a581bec7d704540130 /container-search/src | |
parent | e4443f6c23edcd520ca97a453086a22bc8f94131 (diff) |
Write several entries into files using JSONL file format
Diffstat (limited to 'container-search/src')
3 files changed, 108 insertions, 40 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java index 692de3e6cef..69f37226dae 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java +++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.yahoo.search.Query; import com.yahoo.slime.JsonDecoder; import com.yahoo.slime.Slime; +import com.yahoo.text.Utf8; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; @@ -76,13 +77,13 @@ public class LoggerEntry { // TODO: Rename method here and above? (serialize/deserialize) // TODO: Use Slime or Jackson for both - public static LoggerEntry fromJson(byte[] content) throws IOException { + public static LoggerEntry fromJson(String content) throws IOException { var decoder = new JsonDecoder(); - var slime = decoder.decode(new Slime(), content); + var slime = decoder.decode(new Slime(), Utf8.toBytes(content)); var timestamp = slime.get().field("timestamp").asLong(); var query = new Query(slime.get().field("query").asString()); - var blob = slime.get().field("blob").asData(); + var blob = slime.get().field("blob").asString(); return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob)); } @@ -118,6 +119,13 @@ public class LoggerEntry { return this; } + public Builder blob(String blob) { + byte[] bytes = Utf8.toBytes(blob); + this.blob = ByteBuffer.allocate(bytes.length); + this.blob.put(bytes); + return this; + } + public boolean send() { return logger.send(new LoggerEntry(this)); } diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 0b50985d974..5533320c907 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -5,12 +5,12 @@ import ai.vespa.validation.Validation; import com.yahoo.vespa.defaults.Defaults; import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -19,7 +19,8 @@ import java.util.logging.Level; import java.util.stream.Stream; /** - * Spooler that will write an entry to a file and read files that are ready to be sent + * Spooler that will write an entry to a file and read files that are ready to be sent. + * Files are written in JSON Lines text file format. * * @author hmusum */ @@ -28,22 +29,26 @@ public class Spooler { private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events")); private static final Comparator<File> ordering = new TimestampCompare(); + private static final int defaultMaxEntriesPerFile = 100; private Path processingPath; private Path readyPath; private Path failuresPath; private Path successesPath; + AtomicInteger entryCounter = new AtomicInteger(1); AtomicInteger fileCounter = new AtomicInteger(1); private final Path spoolPath; + private final int maxEntriesPerFile; public Spooler() { - this(defaultSpoolPath); + this(defaultSpoolPath, defaultMaxEntriesPerFile); } - public Spooler(Path spoolPath) { + public Spooler(Path spoolPath, int maxEntriesPerFile) { this.spoolPath = spoolPath; + this.maxEntriesPerFile = maxEntriesPerFile; createDirs(spoolPath); } @@ -77,16 +82,33 @@ public class Spooler { return files; } - public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) throws IOException { + public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) { for (File f : files) { - log.log(Level.INFO, "Found file " + f); - var content = Files.readAllBytes(f.toPath()); - var entry = LoggerEntry.fromJson(content); - - if (transport.apply(entry)) { - Path file = f.toPath(); - Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); - Files.move(file, target); + log.log(Level.FINE, "Processing file " + f); + boolean succcess = false; + try { + List<String> lines = Files.readAllLines(f.toPath()); + for (String line : lines) { + LoggerEntry entry = LoggerEntry.fromJson(line); + log.log(Level.FINE, "Read entry " + entry + " from " + f); + succcess = transport.apply(entry); + if (! succcess) { + log.log(Level.WARNING, "unsuccessful call to transport() for " + entry); + } + }; + } catch (IOException e) { + throw new UncheckedIOException("Unable to process file " + f.toPath(), e); + // TODO: Move to failures path + } finally { + if (succcess) { + Path file = f.toPath(); + Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); + try { + Files.move(file, target); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } } } @@ -96,19 +118,6 @@ public class Spooler { public Path successesPath() { return successesPath; } public Path failuresPath() { return failuresPath; } - List<File> getDirectories(File[] files) { - List<File> fileList = new ArrayList<>(); - - for (File f : files) { - if (f.isDirectory()) { - fileList.add(f); - } - } - - Collections.sort(fileList); - return fileList; - } - List<File> getFiles(List<Path> files, int count) { Validation.requireAtLeast(count, "count must be a positive number", 1); List<File> fileList = new ArrayList<>(); @@ -133,11 +142,16 @@ public class Spooler { String fileName = String.valueOf(fileCounter); Path file = spoolPath.resolve(processingPath).resolve(fileName); try { - Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + log.log(Level.INFO, "Writing entry " + entryCounter.get() + " (" + entry.toJson() + ") to " + fileName); + Files.writeString(file, entry.toJson() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); - log.log(Level.INFO, "Moving file from " + file + " to " + target); - Files.move(file, target); - fileCounter.addAndGet(1); + + if (entryCounter.incrementAndGet() > maxEntriesPerFile) { + log.log(Level.INFO, "Moving file from " + file + " to " + target); + Files.move(file, target); + entryCounter.set(1); + fileCounter.incrementAndGet(); + } } catch (IOException e) { throw new RuntimeException(e); } diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java index af371b7da93..2899f5b67a0 100644 --- a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java +++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java @@ -24,7 +24,8 @@ public class SpoolerTest { public void testSpoolingLogger() throws IOException { Path spoolDir = tempDir.resolve("spool"); - Spooler spooler = new Spooler(spoolDir); + int maxEntriesPerFile = 1; + Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile); TestLogger logger = new TestLogger(spooler); assertTrue(sendEntry(logger, "Yo entry")); @@ -45,16 +46,61 @@ public class SpoolerTest { assertEquals(2, logger.entriesSent()); // No files in processing or ready, 2 files in successes - assertEquals(0, spooler.listFilesInPath(spooler.processingPath()).size()); - assertEquals(0, spooler.listFilesInPath(readyPath).size()); - assertEquals(2, spooler.listFilesInPath(spooler.successesPath()).size()); - assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size()); + assertProcessedFiles(spooler, 0); + assertReadyFiles(spooler, 0); + assertSuccessFiles(spooler, 2); + assertFailureFiles(spooler, 0); + } + + @Test + public void testSpoolingManyEntriesPerFile() throws IOException { + Path spoolDir = tempDir.resolve("spool"); + + int maxEntriesPerFile = 2; + Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile); + + TestLogger logger = new TestLogger(spooler); + assertTrue(sendEntry(logger, "Yo entry")); + assertTrue(sendEntry(logger, "Yo entry 2")); + + Path readyPath = spooler.readyPath(); + Path readyFile1 = readyPath.resolve("1"); + waitUntilFileExists(readyFile1); + + // Check content after being moved to ready path + assertContent(readyFile1, "Yo entry"); + + // Process files (read, transport files) + logger.manualRun(); + assertEquals(2, logger.entriesSent()); + + // No files in processing or ready, 1 file in successes + assertProcessedFiles(spooler, 0); + assertReadyFiles(spooler, 0); + assertSuccessFiles(spooler, 1); + assertFailureFiles(spooler, 0); + } + + private void assertProcessedFiles(Spooler spooler, int expected) throws IOException { + assertEquals(expected, spooler.listFilesInPath(spooler.processingPath()).size()); + } + + private void assertReadyFiles(Spooler spooler, int expected) throws IOException { + assertEquals(expected, spooler.listFilesInPath(spooler.readyPath()).size()); + } + + private void assertSuccessFiles(Spooler spooler, int expected) throws IOException { + assertEquals(expected, spooler.listFilesInPath(spooler.successesPath()).size()); + } + + private void assertFailureFiles(Spooler spooler, int expected) throws IOException { + assertEquals(expected, spooler.listFilesInPath(spooler.failuresPath()).size()); } @Test public void failingToTransportIsRetried() throws IOException { Path spoolDir = tempDir.resolve("spool"); - Spooler spooler = new Spooler(spoolDir); + Spooler spooler = new Spooler(spoolDir, 1); FailingToTransportSecondEntryLogger logger = new FailingToTransportSecondEntryLogger(spooler); assertTrue(sendEntry(logger, "Yo entry")); @@ -85,7 +131,7 @@ public class SpoolerTest { } } - assertTrue(path.toFile().exists()); + assertTrue(path.toFile().exists(), path.toFile() + " does not exits"); } private void assertContent(Path file, String expectedContent) throws IOException { |