diff options
Diffstat (limited to 'container-search')
7 files changed, 168 insertions, 76 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java index 3db854a6888..3569a28f563 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java +++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java @@ -3,6 +3,7 @@ package com.yahoo.search.logging; import com.yahoo.concurrent.DaemonThreadFactory; import java.io.IOException; +import java.time.Clock; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -15,7 +16,7 @@ import java.util.logging.Level; * * @author hmusum */ -abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements Runnable { +public abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements Runnable { protected static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); @@ -25,7 +26,7 @@ abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements protected final Spooler spooler; public AbstractSpoolingLogger() { - this(new Spooler()); + this(new Spooler(Clock.systemUTC())); } public AbstractSpoolingLogger(Spooler spooler) { @@ -35,6 +36,7 @@ abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements public void run() { try { + spooler.switchFileIfNeeded(); spooler.processFiles(this::transport); } catch (IOException e) { e.printStackTrace(); diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java index 17895a9ce57..ffc916bad65 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java +++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java @@ -53,7 +53,7 @@ abstract class AbstractThreadedLogger implements Logger { /** * Actually transports the entry to it's destination */ - abstract boolean transport(LoggerEntry entry); + public abstract boolean transport(LoggerEntry entry); private static class WorkerThread extends Thread { diff --git a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java index f5d8c31be5d..edf78ab6974 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java +++ b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java @@ -14,8 +14,8 @@ public class LocalDiskLogger extends AbstractThreadedLogger { } @Override - boolean transport(LoggerEntry entry) { - String json = entry.toJson(); + public boolean transport(LoggerEntry entry) { + String json = entry.serialize(); try (FileWriter fw = new FileWriter(logFilePath, true)) { fw.write(json); fw.write(System.getProperty("line.separator")); diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java index 692de3e6cef..e02bb8da8c0 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java +++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java @@ -2,13 +2,11 @@ package com.yahoo.search.logging; -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; import com.yahoo.search.Query; -import com.yahoo.slime.JsonDecoder; +import com.yahoo.slime.Cursor; import com.yahoo.slime.Slime; -import java.io.ByteArrayOutputStream; +import com.yahoo.slime.SlimeUtils; +import com.yahoo.text.Utf8; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; @@ -52,37 +50,30 @@ public class LoggerEntry { } public String toString() { - return toJson(); + return serialize(); } - public String toJson() { + public String serialize() { try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); - g.writeStartObject(); + Slime slime = new Slime(); + Cursor root = slime.setObject(); - g.writeNumberField("timestamp", timestamp == null ? 0 : timestamp); - g.writeStringField("query", queryString()); - g.writeStringField("blob", Base64.getEncoder().encodeToString(blob.array())); - - g.writeEndObject(); - g.close(); - return out.toString(); + root.setLong("timestamp", timestamp == null ? 0 : timestamp); + root.setString("query", queryString()); + root.setString("blob", Base64.getEncoder().encodeToString(blob.array())); + return Utf8.toString(SlimeUtils.toJsonBytes(slime)); // TODO } catch (IOException e) { throw new UncheckedIOException(e); } } - // TODO: Rename method here and above? (serialize/deserialize) - // TODO: Use Slime or Jackson for both - public static LoggerEntry fromJson(byte[] content) throws IOException { - var decoder = new JsonDecoder(); - var slime = decoder.decode(new Slime(), content); + public static LoggerEntry deserialize(String content) throws IOException { + var slime = SlimeUtils.jsonToSlime(content); var timestamp = slime.get().field("timestamp").asLong(); var query = new Query(slime.get().field("query").asString()); - var blob = slime.get().field("blob").asData(); + var blob = slime.get().field("blob").asString(); return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob)); } @@ -118,6 +109,13 @@ public class LoggerEntry { return this; } + public Builder blob(String blob) { + byte[] bytes = Utf8.toBytes(blob); + this.blob = ByteBuffer.allocate(bytes.length); + this.blob.put(bytes); + return this; + } + public boolean send() { return logger.send(new LoggerEntry(this)); } diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 0b50985d974..e80437de0d3 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -5,21 +5,26 @@ import ai.vespa.validation.Validation; import com.yahoo.vespa.defaults.Defaults; import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.logging.Level; import java.util.stream.Stream; /** - * Spooler that will write an entry to a file and read files that are ready to be sent + * Spooler that will write an entry to a file and read files that are ready to be sent. + * Files are written in JSON Lines text file format. * * @author hmusum */ @@ -28,22 +33,33 @@ public class Spooler { private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events")); private static final Comparator<File> ordering = new TimestampCompare(); + private static final int defaultMaxEntriesPerFile = 100; + // Max delay between first write to a file and when we should close file and move it for further processing + static final Duration maxDelayAfterFirstWrite = Duration.ofSeconds(10); private Path processingPath; private Path readyPath; private Path failuresPath; private Path successesPath; + // Count of entries for the file that is currently being written to + AtomicInteger entryCounter = new AtomicInteger(1); AtomicInteger fileCounter = new AtomicInteger(1); private final Path spoolPath; + private final int maxEntriesPerFile; + private final Clock clock; + private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>(); - public Spooler() { - this(defaultSpoolPath); + public Spooler(Clock clock) { + this(defaultSpoolPath, defaultMaxEntriesPerFile, clock); } - public Spooler(Path spoolPath) { + public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock) { this.spoolPath = spoolPath; + this.maxEntriesPerFile = maxEntriesPerFile; + this.clock = clock; + firstWriteTimestamp.set(Instant.EPOCH); createDirs(spoolPath); } @@ -77,16 +93,33 @@ public class Spooler { return files; } - public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) throws IOException { + public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) { for (File f : files) { - log.log(Level.INFO, "Found file " + f); - var content = Files.readAllBytes(f.toPath()); - var entry = LoggerEntry.fromJson(content); - - if (transport.apply(entry)) { - Path file = f.toPath(); - Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); - Files.move(file, target); + log.log(Level.FINE, "Processing file " + f); + boolean succcess = false; + try { + List<String> lines = Files.readAllLines(f.toPath()); + for (String line : lines) { + LoggerEntry entry = LoggerEntry.deserialize(line); + log.log(Level.FINE, "Read entry " + entry + " from " + f); + succcess = transport.apply(entry); + if (! succcess) { + log.log(Level.WARNING, "unsuccessful call to transport() for " + entry); + } + }; + } catch (IOException e) { + throw new UncheckedIOException("Unable to process file " + f.toPath(), e); + // TODO: Move to failures path + } finally { + if (succcess) { + Path file = f.toPath(); + Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); + try { + Files.move(file, target); + } catch (IOException e) { + log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target); + } + } } } } @@ -96,19 +129,6 @@ public class Spooler { public Path successesPath() { return successesPath; } public Path failuresPath() { return failuresPath; } - List<File> getDirectories(File[] files) { - List<File> fileList = new ArrayList<>(); - - for (File f : files) { - if (f.isDirectory()) { - fileList.add(f); - } - } - - Collections.sort(fileList); - return fileList; - } - List<File> getFiles(List<Path> files, int count) { Validation.requireAtLeast(count, "count must be a positive number", 1); List<File> fileList = new ArrayList<>(); @@ -133,16 +153,34 @@ public class Spooler { String fileName = String.valueOf(fileCounter); Path file = spoolPath.resolve(processingPath).resolve(fileName); try { - Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); - Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); - log.log(Level.INFO, "Moving file from " + file + " to " + target); - Files.move(file, target); - fileCounter.addAndGet(1); + log.log(Level.FINE, "Writing entry " + entryCounter.get() + " (" + entry.serialize() + ") to file " + fileName); + Files.writeString(file, entry.serialize() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + firstWriteTimestamp.compareAndExchange(Instant.EPOCH, clock.instant()); + entryCounter.incrementAndGet(); + switchFileIfNeeded(file, fileName); } catch (IOException e) { throw new RuntimeException(e); } } + void switchFileIfNeeded() throws IOException { + String fileName = String.valueOf(fileCounter); + Path file = spoolPath.resolve(processingPath).resolve(fileName); + switchFileIfNeeded(file, fileName); + } + + private void switchFileIfNeeded(Path file, String fileName) throws IOException { + if (file.toFile().exists() + && (entryCounter.get() > maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) { + Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); + log.log(Level.INFO, "Finished writing file " + file + " moving it to " + target); + Files.move(file, target); + entryCounter.set(1); + fileCounter.incrementAndGet(); + firstWriteTimestamp.set(Instant.EPOCH); + } + } + private void createDirs(Path spoolerPath) { processingPath = createDir(spoolerPath.resolve("processing")); readyPath = createDir(spoolerPath.resolve("ready")); diff --git a/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java b/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java index 4f55b9946a8..31f99ab1927 100644 --- a/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java +++ b/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java @@ -418,8 +418,7 @@ public class JsonRenderer extends AsynchronousSectionedRenderer<Result> { protected void renderGroupMetadata(GroupId id) throws IOException { if (!(id instanceof ValueGroupId || id instanceof BucketGroupId)) return; - if (id instanceof ValueGroupId) { - ValueGroupId<?> valueId = (ValueGroupId<?>) id; + if (id instanceof ValueGroupId valueId) { generator.writeStringField(GROUPING_VALUE, getIdValue(valueId)); } else { BucketGroupId<?> bucketId = (BucketGroupId<?>) id; @@ -663,7 +662,7 @@ public class JsonRenderer extends AsynchronousSectionedRenderer<Result> { if (key.type() == Type.STRING) { map.put(key.asString(), value); } else { - map.put(key.toString(), value); + map.put(JsonRender.render(key, new StringBuilder(), true).toString(), value); } } return map; @@ -685,7 +684,7 @@ public class JsonRenderer extends AsynchronousSectionedRenderer<Result> { if (item.type() == Type.STRING) { wset.put(item.asString(), weight.asLong()); } else if (settings.jsonWsetsAll) { - wset.put(item.toString(), weight.asLong()); + wset.put(JsonRender.render(item, new StringBuilder(), true).toString(), weight.asLong()); } else { return null; } @@ -753,9 +752,7 @@ public class JsonRenderer extends AsynchronousSectionedRenderer<Result> { } private void renderInspectorDirect(Inspector data) throws IOException { - StringBuilder intermediate = new StringBuilder(); - JsonRender.render(data, intermediate, true); - generator().writeRawValue(intermediate.toString()); + generator().writeRawValue(JsonRender.render(data, new StringBuilder(), true).toString()); } protected void renderFieldContents(Object field) throws IOException { diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java index af371b7da93..b6a1ebc4b55 100644 --- a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java +++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.logging; +import com.yahoo.test.ManualClock; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.IOException; @@ -17,6 +18,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class SpoolerTest { + private static final ManualClock clock = new ManualClock(); + @TempDir Path tempDir; @@ -24,7 +27,8 @@ public class SpoolerTest { public void testSpoolingLogger() throws IOException { Path spoolDir = tempDir.resolve("spool"); - Spooler spooler = new Spooler(spoolDir); + int maxEntriesPerFile = 1; + Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile, clock); TestLogger logger = new TestLogger(spooler); assertTrue(sendEntry(logger, "Yo entry")); @@ -45,16 +49,69 @@ public class SpoolerTest { assertEquals(2, logger.entriesSent()); // No files in processing or ready, 2 files in successes - assertEquals(0, spooler.listFilesInPath(spooler.processingPath()).size()); - assertEquals(0, spooler.listFilesInPath(readyPath).size()); - assertEquals(2, spooler.listFilesInPath(spooler.successesPath()).size()); - assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size()); + assertProcessedFiles(spooler, 0); + assertReadyFiles(spooler, 0); + assertSuccessFiles(spooler, 2); + assertFailureFiles(spooler, 0); + } + + @Test + public void testSpoolingManyEntriesPerFile() throws IOException { + Path spoolDir = tempDir.resolve("spool"); + + int maxEntriesPerFile = 2; + Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile, clock); + + TestLogger logger = new TestLogger(spooler); + assertTrue(sendEntry(logger, "Yo entry")); + assertTrue(sendEntry(logger, "Yo entry 2")); + + Path readyPath = spooler.readyPath(); + Path readyFile1 = readyPath.resolve("1"); + waitUntilFileExists(readyFile1); + + // Check content after being moved to ready path + assertContent(readyFile1, "Yo entry"); + + // Process files (read, transport files) + logger.manualRun(); + assertEquals(2, logger.entriesSent()); + + // No files in processing or ready, 1 file in successes + assertProcessedFiles(spooler, 0); + assertReadyFiles(spooler, 0); + assertSuccessFiles(spooler, 1); + assertFailureFiles(spooler, 0); + + // Write 1 entry and advance time, so that file will be processed even if + // maxEntriesPerFile is 2 and there is only 1 entry in file + assertTrue(sendEntry(logger, "Yo entry 3")); + clock.advance(Duration.ofMinutes(1)); + logger.manualRun(); + assertEquals(3, logger.entriesSent()); + assertSuccessFiles(spooler, 2); + } + + private void assertProcessedFiles(Spooler spooler, int expected) throws IOException { + assertEquals(expected, spooler.listFilesInPath(spooler.processingPath()).size()); + } + + private void assertReadyFiles(Spooler spooler, int expected) throws IOException { + assertEquals(expected, spooler.listFilesInPath(spooler.readyPath()).size()); + } + + private void assertSuccessFiles(Spooler spooler, int expected) throws IOException { + assertEquals(expected, spooler.listFilesInPath(spooler.successesPath()).size()); + } + + private void assertFailureFiles(Spooler spooler, int expected) throws IOException { + assertEquals(expected, spooler.listFilesInPath(spooler.failuresPath()).size()); } @Test public void failingToTransportIsRetried() throws IOException { Path spoolDir = tempDir.resolve("spool"); - Spooler spooler = new Spooler(spoolDir); + Spooler spooler = new Spooler(spoolDir, 1, clock); FailingToTransportSecondEntryLogger logger = new FailingToTransportSecondEntryLogger(spooler); assertTrue(sendEntry(logger, "Yo entry")); @@ -85,7 +142,7 @@ public class SpoolerTest { } } - assertTrue(path.toFile().exists()); + assertTrue(path.toFile().exists(), path.toFile() + " does not exits"); } private void assertContent(Path file, String expectedContent) throws IOException { @@ -102,7 +159,7 @@ public class SpoolerTest { } @Override - boolean transport(LoggerEntry entry) { + public boolean transport(LoggerEntry entry) { entriesSent.add(entry); return true; } @@ -143,7 +200,7 @@ public class SpoolerTest { } @Override - boolean transport(LoggerEntry entry) { + public boolean transport(LoggerEntry entry) { transportCount++; return transportCount != 2; } |