diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging/Spooler.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/logging/Spooler.java | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java new file mode 100644 index 00000000000..8fa51a0b47c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -0,0 +1,175 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.logging; + +import ai.vespa.validation.Validation; +import com.yahoo.vespa.defaults.Defaults; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.stream.Stream; + +/** + * Spooler that will write an entry to a file and read files that are ready to be sent + * + * @author hmusum + */ +public class Spooler { + + private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); + private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events")); + private static final Comparator<File> ordering = new TimestampCompare(); + + private Path processingPath; + private Path readyPath; + private Path failuresPath; + private Path successesPath; + + AtomicInteger fileCounter = new AtomicInteger(1); + + private final Path spoolPath; + + public Spooler() { + this(defaultSpoolPath); + } + + public Spooler(Path spoolPath) { + this.spoolPath = spoolPath; + createDirs(spoolPath); + } + + public boolean write(LoggerEntry entry) { + writeEntry(entry); + + return true; + } + + public List<LoggerEntry> processFiles() throws IOException { + List<Path> files = listFilesInPath(readyPath); + + if (files.size() == 0) { + log.log(Level.INFO, "No files in ready path " + readyPath.toFile().getAbsolutePath()); + return List.of(); + } + log.log(Level.FINE, "Files in ready path: " + files.size()); + + List<File> fileList = getFiles(files, 50); // TODO + if ( ! fileList.isEmpty()) { + return processFiles(fileList); + } + + return List.of(); + } + + List<Path> listFilesInPath(Path path) throws IOException { + List<Path> files; + System.out.println("Path " + path + " exists: " + path.toFile().exists()); + try (Stream<Path> stream = Files.list(path)) { + files = stream.toList(); + // TODO: Or check if stream is empty + } catch (NoSuchFileException e) { + return List.of(); // No files, this is OK + } + return files; + } + + public ArrayList<LoggerEntry> processFiles(List<File> files) throws IOException { + ArrayList<LoggerEntry> entries = new ArrayList<>(); + for (File f : files) { + log.log(Level.INFO, "Found file " + f); + var content = Files.readAllBytes(f.toPath()); + var entry = LoggerEntry.fromJson(content); + Path file = f.toPath(); + Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); + Files.move(file, target); + entries.add(entry); + } + return entries; + } + + public Path processingPath() { return processingPath; } + public Path readyPath() { return readyPath; } + public Path successesPath() { return successesPath; } + public Path failuresPath() { return failuresPath; } + + List<File> getDirectories(File[] files) { + List<File> fileList = new ArrayList<>(); + + for (File f : files) { + if (f.isDirectory()) { + fileList.add(f); + } + } + + Collections.sort(fileList); + return fileList; + } + + List<File> getFiles(List<Path> files, int count) { + Validation.requireAtLeast(count, "count must be a positive number", 1); + List<File> fileList = new ArrayList<>(); + + for (Path p : files) { + File f = p.toFile(); + if (!f.isDirectory()) { + fileList.add(f); + } + + // Grab only some files + if (fileList.size() > count) { + break; + } + } + + fileList.sort(ordering); + return fileList; + } + + private void writeEntry(LoggerEntry entry) { + String fileName = String.valueOf(fileCounter); + Path file = spoolPath.resolve(processingPath).resolve(fileName); + try { + Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); + log.log(Level.INFO, "Moving file from " + file + " to " + target); + Files.move(file, target); + fileCounter.addAndGet(1); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void createDirs(Path spoolerPath) { + processingPath = createDir(spoolerPath.resolve("processing")); + readyPath = createDir(spoolerPath.resolve("ready")); + failuresPath = createDir(spoolerPath.resolve("failures")); + successesPath = createDir(spoolerPath.resolve("successes")); + } + + private static Path createDir(Path path) { + File file = path.toFile(); + if (file.exists() && file.canRead() && file.canWrite()) { + log.log(Level.INFO, "Directory " + path + " already exists"); + } else if (file.mkdirs()) { + log.log(Level.INFO, "Created " + path); + } else { + log.log(Level.WARNING, "Could not create " + path + ", please check permissions"); + } + return path; + } + + private static class TimestampCompare implements Comparator<File> { + public int compare(File a, File b) { + return (int) (a.lastModified() - b.lastModified()); + } + } + +} |