summaryrefslogtreecommitdiffstats
path: root/node-admin
diff options
context:
space:
mode:
authorValerij Fredriksen <valerijf@verizonmedia.com>2021-02-12 13:14:07 +0100
committerValerij Fredriksen <valerijf@verizonmedia.com>2021-02-15 11:46:13 +0100
commitd253b9a057072c5dbe9b988dbfaeff12567517f4 (patch)
tree61f114a8a03c2a4ee37dc887c502f478affd3d1b /node-admin
parent0c8fd7914be9847cfcf5cfe692454cfed3b85e17 (diff)
Define SyncClient
Diffstat (limited to 'node-admin')
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncClient.java22
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java83
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStream.java75
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/package-info.java8
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java79
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStreamTest.java45
6 files changed, 312 insertions, 0 deletions
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncClient.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncClient.java
new file mode 100644
index 00000000000..64684a4b3e7
--- /dev/null
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncClient.java
@@ -0,0 +1,22 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hosted.node.admin.maintenance.sync;
+
+import com.yahoo.vespa.hosted.node.admin.component.TaskContext;
+
+import java.util.List;
+
+/**
+ * @author freva
+ */
+public interface SyncClient {
+
+ /**
+ * Syncs the given files, will only upload each file once.
+ *
+ * @param context context used to log which files were synced
+ * @param syncFileInfos list of files and their metadata to sync
+ * @param limit max number of files to upload for this invocation, to avoid blocking for too long
+ * @return true iff any files were uploaded
+ */
+ boolean sync(TaskContext context, List<SyncFileInfo> syncFileInfos, int limit);
+}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java
new file mode 100644
index 00000000000..a7f2d97984f
--- /dev/null
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfo.java
@@ -0,0 +1,83 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hosted.node.admin.maintenance.sync;
+
+import com.yahoo.config.provision.ApplicationId;
+import com.yahoo.config.provision.HostName;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * @author freva
+ */
+public class SyncFileInfo {
+
+ private final String bucketName;
+ private final Path srcPath;
+ private final Path destPath;
+ private final boolean compressWithZstd;
+
+ private SyncFileInfo(String bucketName, Path srcPath, Path destPath, boolean compressWithZstd) {
+ this.bucketName = bucketName;
+ this.srcPath = srcPath;
+ this.destPath = destPath;
+ this.compressWithZstd = compressWithZstd;
+ }
+
+ public String bucketName() {
+ return bucketName;
+ }
+
+ public Path srcPath() {
+ return srcPath;
+ }
+
+ public Path destPath() {
+ return destPath;
+ }
+
+ public InputStream inputStream() throws IOException {
+ InputStream is = Files.newInputStream(srcPath);
+ if (compressWithZstd) return new ZstdCompressingInputStream(is, 4 << 20);
+ return is;
+ }
+
+
+ public static SyncFileInfo tenantVespaLog(String bucketName, ApplicationId applicationId, HostName hostName, Path vespaLogFile) {
+ return new SyncFileInfo(bucketName, vespaLogFile, destination(applicationId, hostName, "logs/vespa", vespaLogFile, ".zst"), true);
+ }
+
+ public static SyncFileInfo tenantAccessLog(String bucketName, ApplicationId applicationId, HostName hostName, Path accessLogFile) {
+ return new SyncFileInfo(bucketName, accessLogFile, destination(applicationId, hostName, "logs/access", accessLogFile, null), false);
+ }
+
+ public static SyncFileInfo infrastructureVespaLog(String bucketName, HostName hostName, Path vespaLogFile) {
+ return new SyncFileInfo(bucketName, vespaLogFile, destination(null, hostName, "logs/vespa", vespaLogFile, ".zst"), true);
+ }
+
+ public static SyncFileInfo infrastructureAccessLog(String bucketName, HostName hostName, Path accessLogFile) {
+ return new SyncFileInfo(bucketName, accessLogFile, destination(null, hostName, "logs/access", accessLogFile, null), false);
+ }
+
+ private static Path destination(ApplicationId app, HostName hostName, String dir, Path filename, String extension) {
+ StringBuilder sb = new StringBuilder(50).append('/');
+
+ if (app == null) sb.append("infrastructure");
+ else sb.append(app.tenant().value()).append('.').append(app.application().value()).append('.').append(app.instance().value());
+
+ sb.append('/');
+ for (char c: hostName.value().toCharArray()) {
+ if (c == '.') break;
+ sb.append(c);
+ }
+
+ sb.append('/').append(dir).append('/').append(filename.getFileName().toString());
+
+ if (extension != null) sb.append(extension);
+
+ return Paths.get(sb.toString());
+ }
+}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStream.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStream.java
new file mode 100644
index 00000000000..0cf8eaa9983
--- /dev/null
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStream.java
@@ -0,0 +1,75 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hosted.node.admin.maintenance.sync;
+
+import com.yahoo.compress.ZstdCompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * InputStream that outputs given InputStream compressed with the ZStandard.
+ *
+ * @author freva
+ */
+public class ZstdCompressingInputStream extends InputStream {
+
+ public static final int DEFAULT_INPUT_BUFFER_SIZE = 8 * 1024;
+ static final ZstdCompressor compressor = new ZstdCompressor();
+
+ private final InputStream is;
+ private final byte[] inputBuffer;
+ private final byte[] outputBuffer;
+
+ private int outputPosition = 0;
+ private int outputLength = 0;
+ private boolean isClosed = false;
+
+ public ZstdCompressingInputStream(InputStream is, int inputBufferSize) {
+ this.is = is;
+ this.inputBuffer = new byte[inputBufferSize];
+ this.outputBuffer = new byte[ZstdCompressor.getMaxCompressedLength(inputBufferSize)];
+ }
+
+ public ZstdCompressingInputStream(InputStream is) {
+ this(is, DEFAULT_INPUT_BUFFER_SIZE);
+ }
+
+ @Override
+ public int read() throws IOException {
+ throwIfClosed();
+
+ if (outputPosition >= outputLength) {
+ int readLength = is.read(inputBuffer);
+ if (readLength == -1)
+ return -1;
+
+ outputLength = compressor.compress(inputBuffer, 0, readLength, outputBuffer, 0, outputBuffer.length);
+ outputPosition = 0;
+ }
+
+ return Byte.toUnsignedInt(outputBuffer[outputPosition++]);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int first = read();
+ if (first == -1) return -1;
+
+ b[off++] = (byte) first;
+ len = Math.min(Math.min(len, outputLength - outputPosition), b.length - off);
+ System.arraycopy(outputBuffer, outputPosition, b, off, len);
+ outputPosition += len;
+ return len + 1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ throwIfClosed();
+ is.close();
+ isClosed = true;
+ }
+
+ private void throwIfClosed() {
+ if (isClosed) throw new IllegalArgumentException("Input stream is already closed");
+ }
+}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/package-info.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/package-info.java
new file mode 100644
index 00000000000..fc197450492
--- /dev/null
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/package-info.java
@@ -0,0 +1,8 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * @author freva
+ */
+@ExportPackage
+package com.yahoo.vespa.hosted.node.admin.maintenance.sync;
+
+import com.yahoo.osgi.annotation.ExportPackage; \ No newline at end of file
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java
new file mode 100644
index 00000000000..0d596a46d77
--- /dev/null
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/SyncFileInfoTest.java
@@ -0,0 +1,79 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hosted.node.admin.maintenance.sync;
+
+import com.yahoo.config.provision.ApplicationId;
+import com.yahoo.config.provision.HostName;
+import com.yahoo.vespa.test.file.TestFileSystem;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.FileSystem;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * @author freva
+ */
+public class SyncFileInfoTest {
+
+ private static final FileSystem fileSystem = TestFileSystem.create();
+
+ private static final String bucket = "logs-region-acdf21";
+ private static final ApplicationId application = ApplicationId.from("tenant", "application", "instance");
+ private static final HostName hostname = HostName.from("h12352a.env.region-1.vespa.domain.example");
+ private static final Path accessLogPath = fileSystem.getPath("/opt/vespa/logs/qrs/access.json-20210212.zst");
+ private static final Path vespaLogPath = fileSystem.getPath("/opt/vespa/logs/vespa.log-2021-02-12");
+
+ @Test
+ public void tenant_access_log() {
+ SyncFileInfo sfi = SyncFileInfo.tenantAccessLog(bucket, application, hostname, accessLogPath);
+ assertEquals(Paths.get("/tenant.application.instance/h12352a/logs/access/access.json-20210212.zst"), sfi.destPath());
+ assertEquals(bucket, sfi.bucketName());
+ assertNotEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi));
+ }
+
+ @Test
+ public void tenant_vespa_log() {
+ SyncFileInfo sfi = SyncFileInfo.tenantVespaLog(bucket, application, hostname, vespaLogPath);
+ assertEquals(Paths.get("/tenant.application.instance/h12352a/logs/vespa/vespa.log-2021-02-12.zst"), sfi.destPath());
+ assertEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi));
+ }
+
+ @Test
+ public void infra_access_log() {
+ SyncFileInfo sfi = SyncFileInfo.infrastructureAccessLog(bucket, hostname, accessLogPath);
+ assertEquals(Paths.get("/infrastructure/h12352a/logs/access/access.json-20210212.zst"), sfi.destPath());
+ assertNotEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi));
+ }
+
+ @Test
+ public void infra_vespa_log() {
+ SyncFileInfo sfi = SyncFileInfo.infrastructureVespaLog(bucket, hostname, vespaLogPath);
+ assertEquals(Paths.get("/infrastructure/h12352a/logs/vespa/vespa.log-2021-02-12.zst"), sfi.destPath());
+ assertEquals(ZstdCompressingInputStream.class, getInputStreamType(sfi));
+ }
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ Files.createDirectories(vespaLogPath.getParent());
+ Files.createFile(vespaLogPath);
+ Files.createDirectories(accessLogPath.getParent());
+ Files.createFile(accessLogPath);
+ }
+
+ private static Class<? extends InputStream> getInputStreamType(SyncFileInfo syncFileInfo) {
+ try (InputStream inputStream = syncFileInfo.inputStream()) {
+ return inputStream.getClass();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStreamTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStreamTest.java
new file mode 100644
index 00000000000..be0df437b7f
--- /dev/null
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/maintenance/sync/ZstdCompressingInputStreamTest.java
@@ -0,0 +1,45 @@
+package com.yahoo.vespa.hosted.node.admin.maintenance.sync;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Random;
+
+import static com.yahoo.vespa.hosted.node.admin.maintenance.sync.ZstdCompressingInputStream.compressor;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * @author freva
+ */
+public class ZstdCompressingInputStreamTest {
+
+ @Test
+ public void compression_test() throws Exception {
+ Random rnd = new Random();
+ byte[] data = new byte[(int) (100_000 * (10 + rnd.nextDouble()))];
+ rnd.nextBytes(data);
+ assertCompression(data, 1 << 14);
+ }
+
+ private static void assertCompression(byte[] data, int bufferSize) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ZstdCompressingInputStream zcis = new ZstdCompressingInputStream(bais, bufferSize)) {
+ byte[] buffer = new byte[bufferSize];
+ for (int nRead; (nRead = zcis.read(buffer, 0, buffer.length)) != -1; )
+ baos.write(buffer, 0, nRead);
+ baos.flush();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ byte[] compressedData = baos.toByteArray();
+ byte[] decompressedData = new byte[data.length];
+ compressor.decompress(compressedData, 0, compressedData.length, decompressedData, 0, decompressedData.length);
+
+ assertArrayEquals(data, decompressedData);
+ }
+}