summaryrefslogtreecommitdiffstats
path: root/vespajlib
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-01-19 18:06:57 +0100
committerGitHub <noreply@github.com>2021-01-19 18:06:57 +0100
commit6f326f6cc056dc46f5538a8a186c8420b0379edc (patch)
tree7efbbf5b4f585edca3c135be7b8b8f59fa22f1e7 /vespajlib
parent4e6d9b184867553f740a65124fdb2d9c380caf22 (diff)
parentecd1ac5e05d4e7b2b059af3ca01084c3a3783148 (diff)
Merge pull request #16102 from vespa-engine/bjorncs/zstd-java
Bjorncs/zstd java
Diffstat (limited to 'vespajlib')
-rw-r--r--vespajlib/src/main/java/com/yahoo/compress/CompressionType.java5
-rw-r--r--vespajlib/src/main/java/com/yahoo/compress/Compressor.java15
-rw-r--r--vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java51
-rw-r--r--vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java88
-rw-r--r--vespajlib/src/test/java/com/yahoo/compress/CompressorTest.java27
-rw-r--r--vespajlib/src/test/java/com/yahoo/compress/ZstdCompressorTest.java37
-rw-r--r--vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java48
7 files changed, 270 insertions, 1 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/compress/CompressionType.java b/vespajlib/src/main/java/com/yahoo/compress/CompressionType.java
index c3e42895a5e..36deb318ae8 100644
--- a/vespajlib/src/main/java/com/yahoo/compress/CompressionType.java
+++ b/vespajlib/src/main/java/com/yahoo/compress/CompressionType.java
@@ -11,7 +11,8 @@ public enum CompressionType {
// Do not change the type->ordinal association. The gap is due to historic types no longer supported.
NONE((byte) 0),
INCOMPRESSIBLE((byte) 5),
- LZ4((byte) 6);
+ LZ4((byte) 6),
+ ZSTD((byte) 7);
private byte code;
@@ -38,6 +39,8 @@ public enum CompressionType {
return INCOMPRESSIBLE;
case ((byte) 6):
return LZ4;
+ case ((byte) 7):
+ return ZSTD;
default:
throw new IllegalArgumentException("Unknown compression type ordinal " + value);
}
diff --git a/vespajlib/src/main/java/com/yahoo/compress/Compressor.java b/vespajlib/src/main/java/com/yahoo/compress/Compressor.java
index fb5da192f36..3220b81a3a9 100644
--- a/vespajlib/src/main/java/com/yahoo/compress/Compressor.java
+++ b/vespajlib/src/main/java/com/yahoo/compress/Compressor.java
@@ -18,6 +18,7 @@ import java.util.Random;
*/
public class Compressor {
+ private final ZstdCompressor zstdCompressor = new ZstdCompressor();
private final CompressionType type;
private final int level;
private final double compressionThresholdFactor;
@@ -91,6 +92,11 @@ public class Compressor {
if (compressedData.length + 8 >= dataSize * compressionThresholdFactor)
return new Compression(CompressionType.INCOMPRESSIBLE, dataSize, data);
return new Compression(CompressionType.LZ4, dataSize, compressedData);
+ case ZSTD:
+ int dataLength = uncompressedSize.orElse(data.length);
+ if (dataLength < compressMinSizeBytes) return new Compression(CompressionType.INCOMPRESSIBLE, dataLength, data);
+ byte[] compressed = zstdCompressor.compress(data, 0, dataLength);
+ return new Compression(CompressionType.ZSTD, dataLength, compressed);
default:
throw new IllegalArgumentException(requestedCompression + " is not supported");
}
@@ -130,6 +136,15 @@ public class Compressor {
if (expectedCompressedSize.isPresent() && compressedSize != expectedCompressedSize.get())
throw new IllegalStateException("Compressed size mismatch. Expected " + compressedSize + ". Got " + expectedCompressedSize.get());
return uncompressedLZ4Data;
+ case ZSTD:
+ int compressedLength = expectedCompressedSize.orElseThrow(() -> new IllegalArgumentException("Zstd decompressor requires input size"));
+ byte[] decompressedData = zstdCompressor.decompress(compressedData, compressedDataOffset, compressedLength);
+ expectedCompressedSize.ifPresent(expectedSize -> {
+ if (compressedData.length != expectedSize) {
+ throw new IllegalStateException("Compressed size mismatch. Expected " + expectedSize + ". Got " + decompressedData.length);
+ }
+ });
+ return decompressedData;
default:
throw new IllegalArgumentException(compression + " is not supported");
}
diff --git a/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java b/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java
new file mode 100644
index 00000000000..72ccb730db7
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java
@@ -0,0 +1,51 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.compress;
+
+import java.util.Arrays;
+
+/**
+ * Frame based Zstd compressor (https://github.com/facebook/zstd)
+ * Implemented based on https://github.com/airlift/aircompressor - a pure Java implementation (no JNI).
+ *
+ * @author bjorncs
+ */
+public class ZstdCompressor {
+
+ private static final io.airlift.compress.zstd.ZstdCompressor compressor = new io.airlift.compress.zstd.ZstdCompressor();
+ private static final io.airlift.compress.zstd.ZstdDecompressor decompressor = new io.airlift.compress.zstd.ZstdDecompressor();
+
+ public byte[] compress(byte[] input, int inputOffset, int inputLength) {
+ int maxCompressedLength = getMaxCompressedLength(inputLength);
+ byte[] output = new byte[maxCompressedLength];
+ int compressedLength = compress(input, inputOffset, inputLength, output, 0, maxCompressedLength);
+ return Arrays.copyOf(output, compressedLength);
+ }
+
+ public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) {
+ return compressor.compress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
+ }
+
+ /**
+ * Note:
+ * Implementation assumes single frame (since {@link #getDecompressedLength(byte[], int, int)} only includes the first frame)
+ * The {@link #decompress(byte[], int, int, byte[], int, int)} overload will try to decompress all frame, causing the output buffer to overflow.
+ */
+ public byte[] decompress(byte[] input, int inputOffset, int inputLength) {
+ int decompressedLength = getDecompressedLength(input, inputOffset, inputLength);
+ byte[] output = new byte[decompressedLength];
+ decompress(input, inputOffset, inputLength, output, 0, decompressedLength);
+ return output;
+ }
+
+ public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) {
+ return decompressor.decompress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
+ }
+
+ public static int getMaxCompressedLength(int uncompressedLength) {
+ return compressor.maxCompressedLength(uncompressedLength);
+ }
+
+ public static int getDecompressedLength(byte[] input, int inputOffset, int inputLength) {
+ return (int) io.airlift.compress.zstd.ZstdDecompressor.getDecompressedSize(input, inputOffset, inputLength);
+ }
+}
diff --git a/vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java b/vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java
new file mode 100644
index 00000000000..e81bcf6a465
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java
@@ -0,0 +1,88 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.compress;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * @author bjorncs
+ */
+public class ZstdOuputStream extends OutputStream {
+
+ private final ZstdCompressor compressor = new ZstdCompressor();
+
+ public static final int DEFAULT_INPUT_BUFFER_SIZE = 8*1024;
+
+ private final OutputStream out;
+ private final byte[] inputBuffer;
+ private final byte[] outputBuffer;
+ private int inputPosition = 0;
+ private boolean isClosed = false;
+
+ public ZstdOuputStream(OutputStream out, int inputBufferSize) {
+ this.out = out;
+ this.inputBuffer = new byte[inputBufferSize];
+ this.outputBuffer = new byte[ZstdCompressor.getMaxCompressedLength(inputBufferSize)];
+ }
+
+ public ZstdOuputStream(OutputStream out) {
+ this(out, DEFAULT_INPUT_BUFFER_SIZE);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ throwIfClosed();
+ inputBuffer[inputPosition++] = (byte) b;
+ flushIfFull();
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ throwIfClosed();
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ throwIfClosed();
+ int end = off + len;
+ while (off < end) {
+ int copyLength = Math.min(end - off, inputBuffer.length - inputPosition);
+ System.arraycopy(b, off, inputBuffer, inputPosition, copyLength);
+ off += copyLength;
+ inputPosition += copyLength;
+ flushIfFull();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushInternal();
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ throwIfClosed();
+ flush();
+ out.close();
+ isClosed = true;
+ }
+
+ private void flushInternal() throws IOException {
+ throwIfClosed();
+ int compressedLength = compressor.compress(inputBuffer, 0, inputPosition, outputBuffer, 0, outputBuffer.length);
+ out.write(outputBuffer, 0, compressedLength);
+ inputPosition = 0;
+ }
+
+ private void flushIfFull() throws IOException {
+ if (inputPosition == inputBuffer.length) {
+ flushInternal();
+ }
+ }
+
+ private void throwIfClosed() {
+ if (isClosed) throw new IllegalArgumentException("Output stream is already closed");
+ }
+}
diff --git a/vespajlib/src/test/java/com/yahoo/compress/CompressorTest.java b/vespajlib/src/test/java/com/yahoo/compress/CompressorTest.java
new file mode 100644
index 00000000000..0c6af48deb8
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/compress/CompressorTest.java
@@ -0,0 +1,27 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.compress;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author bjorncs
+ */
+class CompressorTest {
+
+ @Test
+ void compresses_and_decompresses_input_using_zstd() {
+ byte[] inputData = "The quick brown fox jumps over the lazy dog".getBytes();
+ Compressor compressor = new Compressor(CompressionType.ZSTD);
+ Compressor.Compression compression = compressor.compress(CompressionType.ZSTD, inputData, Optional.empty());
+ assertEquals(inputData.length, compression.uncompressedSize());
+ byte[] compressedData = compression.data();
+ byte[] decompressedData = compressor.decompress(CompressionType.ZSTD, compressedData, 0, inputData.length, Optional.of(compressedData.length));
+ assertArrayEquals(inputData, decompressedData);
+ }
+
+}
diff --git a/vespajlib/src/test/java/com/yahoo/compress/ZstdCompressorTest.java b/vespajlib/src/test/java/com/yahoo/compress/ZstdCompressorTest.java
new file mode 100644
index 00000000000..f38ce4ad953
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/compress/ZstdCompressorTest.java
@@ -0,0 +1,37 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.compress;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * @author bjorncs
+ */
+class ZstdCompressorTest {
+
+ @Test
+ void compresses_and_decompresses_input() {
+ byte[] inputData = "The quick brown fox jumps over the lazy dog".getBytes();
+ ZstdCompressor compressor = new ZstdCompressor();
+ byte[] compressedData = compressor.compress(inputData, 0, inputData.length);
+ byte[] decompressedData = compressor.decompress(compressedData, 0, compressedData.length);
+ assertArrayEquals(inputData, decompressedData);
+ }
+
+ @Test
+ void compressed_size_is_less_than_uncompressed() {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < 100; i++) {
+ builder.append("The quick brown fox jumps over the lazy dog").append('\n');
+ }
+ byte[] inputData = builder.toString().getBytes();
+ ZstdCompressor compressor = new ZstdCompressor();
+ byte[] compressedData = compressor.compress(inputData, 0, inputData.length);
+ assertTrue(
+ compressedData.length < inputData.length,
+ () -> "Compressed size is " + compressedData.length + " while uncompressed size is " + inputData.length);
+ }
+
+}
diff --git a/vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java b/vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java
new file mode 100644
index 00000000000..5d35eb10215
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java
@@ -0,0 +1,48 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.compress;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * @author bjorncs
+ */
+class ZstdOuputStreamTest {
+
+ @Test
+ void output_stream_compresses_input() throws IOException {
+ byte[] inputData = "The quick brown fox jumps over the lazy dog".getBytes();
+ ByteArrayOutputStream arrayOut = new ByteArrayOutputStream();
+ try (ZstdOuputStream zstdOut = new ZstdOuputStream(arrayOut, 12)) {
+ zstdOut.write(inputData[0]);
+ zstdOut.write(inputData, 1, inputData.length - 1);
+ }
+ byte[] compressedData = arrayOut.toByteArray();
+ ZstdCompressor compressor = new ZstdCompressor();
+ byte[] decompressedData = new byte[inputData.length];
+ compressor.decompress(compressedData, 0, compressedData.length, decompressedData, 0, decompressedData.length);
+ assertArrayEquals(inputData, decompressedData);
+ }
+
+ @Test
+ void compressed_size_is_less_than_uncompressed() throws IOException {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < 100; i++) {
+ builder.append("The quick brown fox jumps over the lazy dog").append('\n');
+ }
+ byte[] inputData = builder.toString().getBytes();
+ ByteArrayOutputStream arrayOut = new ByteArrayOutputStream();
+ try (ZstdOuputStream zstdOut = new ZstdOuputStream(arrayOut)) {
+ zstdOut.write(inputData);
+ }
+ int compressedSize = arrayOut.toByteArray().length;
+ assertTrue(
+ compressedSize < inputData.length,
+ () -> "Compressed size is " + compressedSize + " while uncompressed size is " + inputData.length);
+ }
+}