From 2d700964107f60381de9091e724fcc316f36f4d7 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Tue, 19 Jan 2021 13:29:20 +0100 Subject: Add zstd support to Compressor Introduce zstandard compression using airlift aircompressor - a pure Java implementation. --- .../java/com/yahoo/compress/CompressionType.java | 5 +++- .../main/java/com/yahoo/compress/Compressor.java | 15 +++++++++++ .../java/com/yahoo/compress/ZstdCompressor.java | 30 ++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java (limited to 'vespajlib/src/main') diff --git a/vespajlib/src/main/java/com/yahoo/compress/CompressionType.java b/vespajlib/src/main/java/com/yahoo/compress/CompressionType.java index c3e42895a5e..36deb318ae8 100644 --- a/vespajlib/src/main/java/com/yahoo/compress/CompressionType.java +++ b/vespajlib/src/main/java/com/yahoo/compress/CompressionType.java @@ -11,7 +11,8 @@ public enum CompressionType { // Do not change the type->ordinal association. The gap is due to historic types no longer supported. NONE((byte) 0), INCOMPRESSIBLE((byte) 5), - LZ4((byte) 6); + LZ4((byte) 6), + ZSTD((byte) 7); private byte code; @@ -38,6 +39,8 @@ public enum CompressionType { return INCOMPRESSIBLE; case ((byte) 6): return LZ4; + case ((byte) 7): + return ZSTD; default: throw new IllegalArgumentException("Unknown compression type ordinal " + value); } diff --git a/vespajlib/src/main/java/com/yahoo/compress/Compressor.java b/vespajlib/src/main/java/com/yahoo/compress/Compressor.java index fb5da192f36..3220b81a3a9 100644 --- a/vespajlib/src/main/java/com/yahoo/compress/Compressor.java +++ b/vespajlib/src/main/java/com/yahoo/compress/Compressor.java @@ -18,6 +18,7 @@ import java.util.Random; */ public class Compressor { + private final ZstdCompressor zstdCompressor = new ZstdCompressor(); private final CompressionType type; private final int level; private final double compressionThresholdFactor; @@ -91,6 +92,11 @@ public class Compressor { if (compressedData.length + 8 >= dataSize * compressionThresholdFactor) return new Compression(CompressionType.INCOMPRESSIBLE, dataSize, data); return new Compression(CompressionType.LZ4, dataSize, compressedData); + case ZSTD: + int dataLength = uncompressedSize.orElse(data.length); + if (dataLength < compressMinSizeBytes) return new Compression(CompressionType.INCOMPRESSIBLE, dataLength, data); + byte[] compressed = zstdCompressor.compress(data, 0, dataLength); + return new Compression(CompressionType.ZSTD, dataLength, compressed); default: throw new IllegalArgumentException(requestedCompression + " is not supported"); } @@ -130,6 +136,15 @@ public class Compressor { if (expectedCompressedSize.isPresent() && compressedSize != expectedCompressedSize.get()) throw new IllegalStateException("Compressed size mismatch. Expected " + compressedSize + ". Got " + expectedCompressedSize.get()); return uncompressedLZ4Data; + case ZSTD: + int compressedLength = expectedCompressedSize.orElseThrow(() -> new IllegalArgumentException("Zstd decompressor requires input size")); + byte[] decompressedData = zstdCompressor.decompress(compressedData, compressedDataOffset, compressedLength); + expectedCompressedSize.ifPresent(expectedSize -> { + if (compressedData.length != expectedSize) { + throw new IllegalStateException("Compressed size mismatch. Expected " + expectedSize + ". Got " + decompressedData.length); + } + }); + return decompressedData; default: throw new IllegalArgumentException(compression + " is not supported"); } diff --git a/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java b/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java new file mode 100644 index 00000000000..58a01df5b09 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java @@ -0,0 +1,30 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.compress; + +import java.util.Arrays; + +/** + * Frame based Zstd compressor (https://github.com/facebook/zstd) + * Implemented based on https://github.com/airlift/aircompressor - a pure Java implementation (no JNI). + * + * @author bjorncs + */ +public class ZstdCompressor { + + private static final io.airlift.compress.zstd.ZstdCompressor compressor = new io.airlift.compress.zstd.ZstdCompressor(); + private static final io.airlift.compress.zstd.ZstdDecompressor decompressor = new io.airlift.compress.zstd.ZstdDecompressor(); + + public byte[] compress(byte[] input, int inputOffset, int inputLength) { + int maxCompressedLength = compressor.maxCompressedLength(inputLength); + byte[] output = new byte[maxCompressedLength]; + int compressedLength = compressor.compress(input, inputOffset, inputLength, output, 0, maxCompressedLength); + return Arrays.copyOf(output, compressedLength); + } + + public byte[] decompress(byte[] input, int inputOffset, int inputLength) { + int decompressedLength = (int) io.airlift.compress.zstd.ZstdDecompressor.getDecompressedSize(input, inputOffset, inputLength); + byte[] output = new byte[decompressedLength]; + decompressor.decompress(input, inputOffset, inputLength, output, 0, decompressedLength); + return output; + } +} -- cgit v1.2.3 From 095ba218f5dd1b57ef606c02393d43ae2e6e3c3d Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Tue, 19 Jan 2021 13:55:33 +0100 Subject: Extend ZstdCompressor with more low-level API methods --- .../java/com/yahoo/compress/ZstdCompressor.java | 24 ++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) (limited to 'vespajlib/src/main') diff --git a/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java b/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java index 58a01df5b09..5d15b102ad6 100644 --- a/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java +++ b/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java @@ -15,16 +15,32 @@ public class ZstdCompressor { private static final io.airlift.compress.zstd.ZstdDecompressor decompressor = new io.airlift.compress.zstd.ZstdDecompressor(); public byte[] compress(byte[] input, int inputOffset, int inputLength) { - int maxCompressedLength = compressor.maxCompressedLength(inputLength); + int maxCompressedLength = getMaxCompressedLength(inputLength); byte[] output = new byte[maxCompressedLength]; - int compressedLength = compressor.compress(input, inputOffset, inputLength, output, 0, maxCompressedLength); + int compressedLength = compress(input, inputOffset, inputLength, output, 0, maxCompressedLength); return Arrays.copyOf(output, compressedLength); } + public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) { + return compressor.compress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength); + } + public byte[] decompress(byte[] input, int inputOffset, int inputLength) { - int decompressedLength = (int) io.airlift.compress.zstd.ZstdDecompressor.getDecompressedSize(input, inputOffset, inputLength); + int decompressedLength = getDecompressedLength(input, inputOffset, inputLength); byte[] output = new byte[decompressedLength]; - decompressor.decompress(input, inputOffset, inputLength, output, 0, decompressedLength); + decompress(input, inputOffset, inputLength, output, 0, decompressedLength); return output; } + + public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) { + return decompressor.decompress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength); + } + + public static int getMaxCompressedLength(int uncompressedLength) { + return compressor.maxCompressedLength(uncompressedLength); + } + + public static int getDecompressedLength(byte[] input, int inputOffset, int inputLength) { + return (int) io.airlift.compress.zstd.ZstdDecompressor.getDecompressedSize(input, inputOffset, inputLength); + } } -- cgit v1.2.3 From 86507230475ff6964bcb98adda345d00867ce024 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Tue, 19 Jan 2021 16:08:59 +0100 Subject: Implement an output stream compressing with zstd --- .../java/com/yahoo/compress/ZstdOuputStream.java | 88 ++++++++++++++++++++++ .../com/yahoo/compress/ZstdOuputStreamTest.java | 30 ++++++++ 2 files changed, 118 insertions(+) create mode 100644 vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java create mode 100644 vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java (limited to 'vespajlib/src/main') diff --git a/vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java b/vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java new file mode 100644 index 00000000000..e81bcf6a465 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java @@ -0,0 +1,88 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.compress; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * @author bjorncs + */ +public class ZstdOuputStream extends OutputStream { + + private final ZstdCompressor compressor = new ZstdCompressor(); + + public static final int DEFAULT_INPUT_BUFFER_SIZE = 8*1024; + + private final OutputStream out; + private final byte[] inputBuffer; + private final byte[] outputBuffer; + private int inputPosition = 0; + private boolean isClosed = false; + + public ZstdOuputStream(OutputStream out, int inputBufferSize) { + this.out = out; + this.inputBuffer = new byte[inputBufferSize]; + this.outputBuffer = new byte[ZstdCompressor.getMaxCompressedLength(inputBufferSize)]; + } + + public ZstdOuputStream(OutputStream out) { + this(out, DEFAULT_INPUT_BUFFER_SIZE); + } + + @Override + public void write(int b) throws IOException { + throwIfClosed(); + inputBuffer[inputPosition++] = (byte) b; + flushIfFull(); + } + + @Override + public void write(byte[] b) throws IOException { + throwIfClosed(); + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + throwIfClosed(); + int end = off + len; + while (off < end) { + int copyLength = Math.min(end - off, inputBuffer.length - inputPosition); + System.arraycopy(b, off, inputBuffer, inputPosition, copyLength); + off += copyLength; + inputPosition += copyLength; + flushIfFull(); + } + } + + @Override + public void flush() throws IOException { + flushInternal(); + out.flush(); + } + + @Override + public void close() throws IOException { + throwIfClosed(); + flush(); + out.close(); + isClosed = true; + } + + private void flushInternal() throws IOException { + throwIfClosed(); + int compressedLength = compressor.compress(inputBuffer, 0, inputPosition, outputBuffer, 0, outputBuffer.length); + out.write(outputBuffer, 0, compressedLength); + inputPosition = 0; + } + + private void flushIfFull() throws IOException { + if (inputPosition == inputBuffer.length) { + flushInternal(); + } + } + + private void throwIfClosed() { + if (isClosed) throw new IllegalArgumentException("Output stream is already closed"); + } +} diff --git a/vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java b/vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java new file mode 100644 index 00000000000..5c7c9ca0e1f --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java @@ -0,0 +1,30 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.compress; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +/** + * @author bjorncs + */ +class ZstdOuputStreamTest { + + @Test + void output_stream_compresses_input() throws IOException { + byte[] inputData = "The quick brown fox jumps over the lazy dog".getBytes(); + ByteArrayOutputStream arrayOut = new ByteArrayOutputStream(); + try (ZstdOuputStream zstdOut = new ZstdOuputStream(arrayOut, 12)) { + zstdOut.write(inputData[0]); + zstdOut.write(inputData, 1, inputData.length - 1); + } + byte[] compressedData = arrayOut.toByteArray(); + ZstdCompressor compressor = new ZstdCompressor(); + byte[] decompressedData = new byte[inputData.length]; + compressor.decompress(compressedData, 0, compressedData.length, decompressedData, 0, decompressedData.length); + assertArrayEquals(inputData, decompressedData); + } +} \ No newline at end of file -- cgit v1.2.3 From f3de2f00775760223b6b69919bfb4fa76e99ac76 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Tue, 19 Jan 2021 16:14:36 +0100 Subject: Document pitfall with decompress() --- vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'vespajlib/src/main') diff --git a/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java b/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java index 5d15b102ad6..72ccb730db7 100644 --- a/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java +++ b/vespajlib/src/main/java/com/yahoo/compress/ZstdCompressor.java @@ -25,6 +25,11 @@ public class ZstdCompressor { return compressor.compress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength); } + /** + * Note: + * Implementation assumes single frame (since {@link #getDecompressedLength(byte[], int, int)} only includes the first frame) + * The {@link #decompress(byte[], int, int, byte[], int, int)} overload will try to decompress all frame, causing the output buffer to overflow. + */ public byte[] decompress(byte[] input, int inputOffset, int inputLength) { int decompressedLength = getDecompressedLength(input, inputOffset, inputLength); byte[] output = new byte[decompressedLength]; -- cgit v1.2.3