From 5481b852bfa87af50d55d5063567cf8621170151 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Wed, 5 Oct 2022 12:19:50 +0200 Subject: Use correct logger and allow executor access from subclasses --- .../java/com/yahoo/search/logging/AbstractThreadedLogger.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java index 5c14109b26e..1df236a037f 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java +++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java @@ -2,23 +2,22 @@ package com.yahoo.search.logging; -import org.slf4j.LoggerFactory; - import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; abstract class AbstractThreadedLogger implements Logger { - private final static org.slf4j.Logger log = LoggerFactory.getLogger(AbstractThreadedLogger.class); + private final static java.util.logging.Logger log = java.util.logging.Logger.getLogger(AbstractThreadedLogger.class.getName()); final static int DEFAULT_MAX_THREADS = 1; final static int DEFAULT_QUEUE_SIZE = 1000; - private final WorkerThreadExecutor executor; + protected final WorkerThreadExecutor executor; AbstractThreadedLogger() { this(DEFAULT_MAX_THREADS, DEFAULT_QUEUE_SIZE); @@ -68,13 +67,13 @@ abstract class AbstractThreadedLogger implements Logger { try { super.run(); } catch (Exception e) { - log.error(String.format("Error while sending logger entry: %s", e), e); + log.log(Level.SEVERE, String.format("Error while sending logger entry: %s", e), e); } } } - private static class WorkerThreadExecutor implements Executor { + protected static class WorkerThreadExecutor implements Executor { protected final ThreadPoolExecutor executor; -- cgit v1.2.3 From 6c26f5a1ad176569a7e3c95be35318d41f69ac14 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Wed, 5 Oct 2022 12:22:19 +0200 Subject: Add a fromJson method to LoggerEntry (use when reading spooler files) --- .../java/com/yahoo/search/logging/LoggerEntry.java | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java index 44eec6e64a0..692de3e6cef 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java +++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java @@ -6,7 +6,8 @@ import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.yahoo.search.Query; - +import com.yahoo.slime.JsonDecoder; +import com.yahoo.slime.Slime; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; @@ -73,6 +74,19 @@ public class LoggerEntry { } } + // TODO: Rename method here and above? (serialize/deserialize) + // TODO: Use Slime or Jackson for both + public static LoggerEntry fromJson(byte[] content) throws IOException { + var decoder = new JsonDecoder(); + var slime = decoder.decode(new Slime(), content); + + var timestamp = slime.get().field("timestamp").asLong(); + var query = new Query(slime.get().field("query").asString()); + var blob = slime.get().field("blob").asData(); + + return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob)); + } + public static class Builder { private final Logger logger; @@ -81,6 +95,9 @@ public class LoggerEntry { private Query query; private ByteBuffer blob; + // For testing + public Builder() { this(entry -> false); } + public Builder(Logger logger) { this.logger = logger; } @@ -107,5 +124,4 @@ public class LoggerEntry { } - } -- cgit v1.2.3 From e7f127984207ffeacef5c5a18d0bfae976b98e6a Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Wed, 5 Oct 2022 12:22:55 +0200 Subject: Add very simple start of spooler for entries --- .../search/logging/AbstractSpoolingLogger.java | 64 +++++++- .../java/com/yahoo/search/logging/Spooler.java | 175 +++++++++++++++++++++ .../java/com/yahoo/search/logging/SpoolerTest.java | 107 +++++++++++++ 3 files changed, 342 insertions(+), 4 deletions(-) create mode 100644 container-search/src/main/java/com/yahoo/search/logging/Spooler.java create mode 100644 container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java index cfcbf2cab45..5cb3aa24f98 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java +++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java @@ -1,12 +1,68 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - package com.yahoo.search.logging; -abstract class AbstractSpoolingLogger extends AbstractThreadedLogger { +import com.yahoo.concurrent.DaemonThreadFactory; +import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +/** + * Abstract class that deals with storing event entries on disk and making sure all stored + * entries are eventually sent + * + * @author hmusum + */ +abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements Runnable { + + protected static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); + + private static final ScheduledExecutorService executorService = + new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("AbstractSpoolingLogger-send-")); + + protected final Spooler spooler; + + public AbstractSpoolingLogger() { + this(new Spooler()); + } + + public AbstractSpoolingLogger(Spooler spooler) { + this.spooler = spooler; + executorService.scheduleWithFixedDelay(this, 0, 10L, TimeUnit.MILLISECONDS); + } + + public void run() { + try { + var entries = spooler.processFiles(); + log.log(Level.INFO, "Entries: " + entries.size()); + entries.forEach(this::transport); + } catch (IOException e) { + e.printStackTrace(); + } + } @Override - protected void dequeue(LoggerEntry entry) { - // Todo: add to spooler etc + public boolean send(LoggerEntry entry) { + log.log(Level.INFO, "Sending"); + try { + executor.execute(() -> spooler.write(entry)); + } catch (RejectedExecutionException e) { + return false; + } + return true; + } + + // TODO Call from a component or make this class a component + public void shutdown() { + executorService.shutdown(); + try { + if ( ! executorService.awaitTermination(10, TimeUnit.SECONDS)) + log.log(Level.WARNING, "Timeout elapsed waiting for termination"); + } catch (InterruptedException e) { + log.log(Level.WARNING, "Failure when waiting for termination: " + e.getMessage()); + } } } diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java new file mode 100644 index 00000000000..8fa51a0b47c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -0,0 +1,175 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.logging; + +import ai.vespa.validation.Validation; +import com.yahoo.vespa.defaults.Defaults; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.stream.Stream; + +/** + * Spooler that will write an entry to a file and read files that are ready to be sent + * + * @author hmusum + */ +public class Spooler { + + private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); + private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events")); + private static final Comparator ordering = new TimestampCompare(); + + private Path processingPath; + private Path readyPath; + private Path failuresPath; + private Path successesPath; + + AtomicInteger fileCounter = new AtomicInteger(1); + + private final Path spoolPath; + + public Spooler() { + this(defaultSpoolPath); + } + + public Spooler(Path spoolPath) { + this.spoolPath = spoolPath; + createDirs(spoolPath); + } + + public boolean write(LoggerEntry entry) { + writeEntry(entry); + + return true; + } + + public List processFiles() throws IOException { + List files = listFilesInPath(readyPath); + + if (files.size() == 0) { + log.log(Level.INFO, "No files in ready path " + readyPath.toFile().getAbsolutePath()); + return List.of(); + } + log.log(Level.FINE, "Files in ready path: " + files.size()); + + List fileList = getFiles(files, 50); // TODO + if ( ! fileList.isEmpty()) { + return processFiles(fileList); + } + + return List.of(); + } + + List listFilesInPath(Path path) throws IOException { + List files; + System.out.println("Path " + path + " exists: " + path.toFile().exists()); + try (Stream stream = Files.list(path)) { + files = stream.toList(); + // TODO: Or check if stream is empty + } catch (NoSuchFileException e) { + return List.of(); // No files, this is OK + } + return files; + } + + public ArrayList processFiles(List files) throws IOException { + ArrayList entries = new ArrayList<>(); + for (File f : files) { + log.log(Level.INFO, "Found file " + f); + var content = Files.readAllBytes(f.toPath()); + var entry = LoggerEntry.fromJson(content); + Path file = f.toPath(); + Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); + Files.move(file, target); + entries.add(entry); + } + return entries; + } + + public Path processingPath() { return processingPath; } + public Path readyPath() { return readyPath; } + public Path successesPath() { return successesPath; } + public Path failuresPath() { return failuresPath; } + + List getDirectories(File[] files) { + List fileList = new ArrayList<>(); + + for (File f : files) { + if (f.isDirectory()) { + fileList.add(f); + } + } + + Collections.sort(fileList); + return fileList; + } + + List getFiles(List files, int count) { + Validation.requireAtLeast(count, "count must be a positive number", 1); + List fileList = new ArrayList<>(); + + for (Path p : files) { + File f = p.toFile(); + if (!f.isDirectory()) { + fileList.add(f); + } + + // Grab only some files + if (fileList.size() > count) { + break; + } + } + + fileList.sort(ordering); + return fileList; + } + + private void writeEntry(LoggerEntry entry) { + String fileName = String.valueOf(fileCounter); + Path file = spoolPath.resolve(processingPath).resolve(fileName); + try { + Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); + log.log(Level.INFO, "Moving file from " + file + " to " + target); + Files.move(file, target); + fileCounter.addAndGet(1); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void createDirs(Path spoolerPath) { + processingPath = createDir(spoolerPath.resolve("processing")); + readyPath = createDir(spoolerPath.resolve("ready")); + failuresPath = createDir(spoolerPath.resolve("failures")); + successesPath = createDir(spoolerPath.resolve("successes")); + } + + private static Path createDir(Path path) { + File file = path.toFile(); + if (file.exists() && file.canRead() && file.canWrite()) { + log.log(Level.INFO, "Directory " + path + " already exists"); + } else if (file.mkdirs()) { + log.log(Level.INFO, "Created " + path); + } else { + log.log(Level.WARNING, "Could not create " + path + ", please check permissions"); + } + return path; + } + + private static class TimestampCompare implements Comparator { + public int compare(File a, File b) { + return (int) (a.lastModified() - b.lastModified()); + } + } + +} diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java new file mode 100644 index 00000000000..abd3b27b70b --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java @@ -0,0 +1,107 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.logging; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SpoolerTest { + + @TempDir + Path tempDir; + + @Test + public void testSpoolingLogger() throws IOException { + Path spoolDir = tempDir.resolve("spool"); + + Spooler spooler = new Spooler(spoolDir); + + TestLogger logger = new TestLogger(spooler); + assertTrue(logger.newEntry() + .blob("Yo entry".getBytes()) + .send()); + assertTrue(logger.newEntry() + .blob("Yo entry 2".getBytes()) + .send()); + + Path readyPath = spooler.readyPath(); + Path readyFile1 = readyPath.resolve("1"); + waitUntilFileExists(readyFile1); + Path readyFile2 = readyPath.resolve("2"); + waitUntilFileExists(readyFile2); + + // Check content after being moved to ready path + String content = Files.readString(readyFile1); + assertTrue(content.contains(Base64.getEncoder().encodeToString("Yo entry".getBytes()))); + assertTrue(Files.readString(readyFile2).contains(Base64.getEncoder().encodeToString("Yo entry 2".getBytes()))); + + // Process files (read, transport files) + logger.manualRun(); + assertEquals(2, logger.entriesSent()); + + // No files in processing or ready, 2 files in successes + assertEquals(0, spooler.listFilesInPath(spooler.processingPath()).size()); + assertEquals(0, spooler.listFilesInPath(readyPath).size()); + assertEquals(2, spooler.listFilesInPath(spooler.successesPath()).size()); + assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size()); + } + + private void waitUntilFileExists(Path path) { + Instant end = Instant.now().plus(Duration.ofSeconds(1)); + while (!path.toFile().exists() && Instant.now().isBefore(end)) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + assertTrue(path.toFile().exists()); + } + + + private static class TestLogger extends AbstractSpoolingLogger { + + private final List entriesSent = new ArrayList<>(); + + public TestLogger(Spooler spooler) { + super(spooler); + } + + @Override + void transport(LoggerEntry entry) { + System.out.println("Called transport()"); + entriesSent.add(entry); + } + + @Override + public void run() { + // Do nothing, use manualRun + } + + @Override + public boolean send(LoggerEntry entry) { + return spooler.write(entry); + } + + public void manualRun() { + super.run(); + } + + int entriesSent() { + return entriesSent.size(); + } + + } + +} -- cgit v1.2.3