From d253b9a057072c5dbe9b988dbfaeff12567517f4 Mon Sep 17 00:00:00 2001 From: Valerij Fredriksen Date: Fri, 12 Feb 2021 13:14:07 +0100 Subject: Define SyncClient --- .../node/admin/maintenance/sync/SyncClient.java | 22 ++++++ .../node/admin/maintenance/sync/SyncFileInfo.java | 83 ++++++++++++++++++++++ .../sync/ZstdCompressingInputStream.java | 75 +++++++++++++++++++ .../node/admin/maintenance/sync/package-info.java | 8 +++ .../admin/maintenance/sync/SyncFileInfoTest.java | 79 ++++++++++++++++++++ .../sync/ZstdCompressingInputStreamTest.java | 45 ++++++++++++ 6 files changed, 312 insertions(+) create mode 100644 node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncClient.java create mode 100644 node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java create mode 100644 node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStream.java create mode 100644 node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/package-info.java create mode 100644 node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java create mode 100644 node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStreamTest.java (limited to 'node-admin') diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncClient.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncClient.java new file mode 100644 index 00000000000..64684a4b3e7 --- /dev/null +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncClient.java @@ -0,0 +1,22 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.node.admin.maintenance.sync; + +import com.yahoo.vespa.hosted.node.admin.component.TaskContext; + +import java.util.List; + +/** + * @author freva + */ +public interface SyncClient { + + /** + * Syncs the given files, will only upload each file once. + * + * @param context context used to log which files were synced + * @param syncFileInfos list of files and their metadata to sync + * @param limit max number of files to upload for this invocation, to avoid blocking for too long + * @return true iff any files were uploaded + */ + boolean sync(TaskContext context, List syncFileInfos, int limit); +} diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java new file mode 100644 index 00000000000..a7f2d97984f --- /dev/null +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java @@ -0,0 +1,83 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.node.admin.maintenance.sync; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.HostName; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * @author freva + */ +public class SyncFileInfo { + + private final String bucketName; + private final Path srcPath; + private final Path destPath; + private final boolean compressWithZstd; + + private SyncFileInfo(String bucketName, Path srcPath, Path destPath, boolean compressWithZstd) { + this.bucketName = bucketName; + this.srcPath = srcPath; + this.destPath = destPath; + this.compressWithZstd = compressWithZstd; + } + + public String bucketName() { + return bucketName; + } + + public Path srcPath() { + return srcPath; + } + + public Path destPath() { + return destPath; + } + + public InputStream inputStream() throws IOException { + InputStream is = Files.newInputStream(srcPath); + if (compressWithZstd) return new ZstdCompressingInputStream(is, 4 << 20); + return is; + } + + + public static SyncFileInfo tenantVespaLog(String bucketName, ApplicationId applicationId, HostName hostName, Path vespaLogFile) { + return new SyncFileInfo(bucketName, vespaLogFile, destination(applicationId, hostName, "logs/vespa", vespaLogFile, ".zst"), true); + } + + public static SyncFileInfo tenantAccessLog(String bucketName, ApplicationId applicationId, HostName hostName, Path accessLogFile) { + return new SyncFileInfo(bucketName, accessLogFile, destination(applicationId, hostName, "logs/access", accessLogFile, null), false); + } + + public static SyncFileInfo infrastructureVespaLog(String bucketName, HostName hostName, Path vespaLogFile) { + return new SyncFileInfo(bucketName, vespaLogFile, destination(null, hostName, "logs/vespa", vespaLogFile, ".zst"), true); + } + + public static SyncFileInfo infrastructureAccessLog(String bucketName, HostName hostName, Path accessLogFile) { + return new SyncFileInfo(bucketName, accessLogFile, destination(null, hostName, "logs/access", accessLogFile, null), false); + } + + private static Path destination(ApplicationId app, HostName hostName, String dir, Path filename, String extension) { + StringBuilder sb = new StringBuilder(50).append('/'); + + if (app == null) sb.append("infrastructure"); + else sb.append(app.tenant().value()).append('.').append(app.application().value()).append('.').append(app.instance().value()); + + sb.append('/'); + for (char c: hostName.value().toCharArray()) { + if (c == '.') break; + sb.append(c); + } + + sb.append('/').append(dir).append('/').append(filename.getFileName().toString()); + + if (extension != null) sb.append(extension); + + return Paths.get(sb.toString()); + } +} diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStream.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStream.java new file mode 100644 index 00000000000..0cf8eaa9983 --- /dev/null +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStream.java @@ -0,0 +1,75 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.node.admin.maintenance.sync; + +import com.yahoo.compress.ZstdCompressor; + +import java.io.IOException; +import java.io.InputStream; + +/** + * InputStream that outputs given InputStream compressed with the ZStandard. + * + * @author freva + */ +public class ZstdCompressingInputStream extends InputStream { + + public static final int DEFAULT_INPUT_BUFFER_SIZE = 8 * 1024; + static final ZstdCompressor compressor = new ZstdCompressor(); + + private final InputStream is; + private final byte[] inputBuffer; + private final byte[] outputBuffer; + + private int outputPosition = 0; + private int outputLength = 0; + private boolean isClosed = false; + + public ZstdCompressingInputStream(InputStream is, int inputBufferSize) { + this.is = is; + this.inputBuffer = new byte[inputBufferSize]; + this.outputBuffer = new byte[ZstdCompressor.getMaxCompressedLength(inputBufferSize)]; + } + + public ZstdCompressingInputStream(InputStream is) { + this(is, DEFAULT_INPUT_BUFFER_SIZE); + } + + @Override + public int read() throws IOException { + throwIfClosed(); + + if (outputPosition >= outputLength) { + int readLength = is.read(inputBuffer); + if (readLength == -1) + return -1; + + outputLength = compressor.compress(inputBuffer, 0, readLength, outputBuffer, 0, outputBuffer.length); + outputPosition = 0; + } + + return Byte.toUnsignedInt(outputBuffer[outputPosition++]); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int first = read(); + if (first == -1) return -1; + + b[off++] = (byte) first; + len = Math.min(Math.min(len, outputLength - outputPosition), b.length - off); + System.arraycopy(outputBuffer, outputPosition, b, off, len); + outputPosition += len; + return len + 1; + } + + @Override + public void close() throws IOException { + throwIfClosed(); + is.close(); + isClosed = true; + } + + private void throwIfClosed() { + if (isClosed) throw new IllegalArgumentException("Input stream is already closed"); + } +} diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/package-info.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/package-info.java new file mode 100644 index 00000000000..fc197450492 --- /dev/null +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/package-info.java @@ -0,0 +1,8 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * @author freva + */ +@ExportPackage +package com.yahoo.vespa.hosted.node.admin.maintenance.sync; + +import com.yahoo.osgi.annotation.ExportPackage; \ No newline at end of file diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java new file mode 100644 index 00000000000..0d596a46d77 --- /dev/null +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java @@ -0,0 +1,79 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.node.admin.maintenance.sync; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.HostName; +import com.yahoo.vespa.test.file.TestFileSystem; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * @author freva + */ +public class SyncFileInfoTest { + + private static final FileSystem fileSystem = TestFileSystem.create(); + + private static final String bucket = "logs-region-acdf21"; + private static final ApplicationId application = ApplicationId.from("tenant", "application", "instance"); + private static final HostName hostname = HostName.from("h12352a.env.region-1.vespa.domain.example"); + private static final Path accessLogPath = fileSystem.getPath("/opt/vespa/logs/qrs/access.json-20210212.zst"); + private static final Path vespaLogPath = fileSystem.getPath("/opt/vespa/logs/vespa.log-2021-02-12"); + + @Test + public void tenant_access_log() { + SyncFileInfo sfi = SyncFileInfo.tenantAccessLog(bucket, application, hostname, accessLogPath); + assertEquals(Paths.get("/tenant.application.instance/h12352a/logs/access/access.json-20210212.zst"), sfi.destPath()); + assertEquals(bucket, sfi.bucketName()); + assertNotEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi)); + } + + @Test + public void tenant_vespa_log() { + SyncFileInfo sfi = SyncFileInfo.tenantVespaLog(bucket, application, hostname, vespaLogPath); + assertEquals(Paths.get("/tenant.application.instance/h12352a/logs/vespa/vespa.log-2021-02-12.zst"), sfi.destPath()); + assertEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi)); + } + + @Test + public void infra_access_log() { + SyncFileInfo sfi = SyncFileInfo.infrastructureAccessLog(bucket, hostname, accessLogPath); + assertEquals(Paths.get("/infrastructure/h12352a/logs/access/access.json-20210212.zst"), sfi.destPath()); + assertNotEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi)); + } + + @Test + public void infra_vespa_log() { + SyncFileInfo sfi = SyncFileInfo.infrastructureVespaLog(bucket, hostname, vespaLogPath); + assertEquals(Paths.get("/infrastructure/h12352a/logs/vespa/vespa.log-2021-02-12.zst"), sfi.destPath()); + assertEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi)); + } + + @BeforeClass + public static void setup() throws IOException { + Files.createDirectories(vespaLogPath.getParent()); + Files.createFile(vespaLogPath); + Files.createDirectories(accessLogPath.getParent()); + Files.createFile(accessLogPath); + } + + private static Class getInputStreamType(SyncFileInfo syncFileInfo) { + try (InputStream inputStream = syncFileInfo.inputStream()) { + return inputStream.getClass(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} \ No newline at end of file diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStreamTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStreamTest.java new file mode 100644 index 00000000000..be0df437b7f --- /dev/null +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStreamTest.java @@ -0,0 +1,45 @@ +package com.yahoo.vespa.hosted.node.admin.maintenance.sync; + +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Random; + +import static com.yahoo.vespa.hosted.node.admin.maintenance.sync.ZstdCompressingInputStream.compressor; +import static org.junit.Assert.assertArrayEquals; + +/** + * @author freva + */ +public class ZstdCompressingInputStreamTest { + + @Test + public void compression_test() throws Exception { + Random rnd = new Random(); + byte[] data = new byte[(int) (100_000 * (10 + rnd.nextDouble()))]; + rnd.nextBytes(data); + assertCompression(data, 1 << 14); + } + + private static void assertCompression(byte[] data, int bufferSize) { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ZstdCompressingInputStream zcis = new ZstdCompressingInputStream(bais, bufferSize)) { + byte[] buffer = new byte[bufferSize]; + for (int nRead; (nRead = zcis.read(buffer, 0, buffer.length)) != -1; ) + baos.write(buffer, 0, nRead); + baos.flush(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + byte[] compressedData = baos.toByteArray(); + byte[] decompressedData = new byte[data.length]; + compressor.decompress(compressedData, 0, compressedData.length, decompressedData, 0, decompressedData.length); + + assertArrayEquals(data, decompressedData); + } +} -- cgit v1.2.3