From c9a8248b49109bd4f3abde4c0c492efb04707eab Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Tue, 19 Jan 2021 16:08:59 +0100 Subject: Implement an output stream compressing with zstd --- .../java/com/yahoo/compress/ZstdOuputStream.java | 88 ++++++++++++++++++++++ .../com/yahoo/compress/ZstdOuputStreamTest.java | 48 ++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java create mode 100644 vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java (limited to 'vespajlib') diff --git a/vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java b/vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java new file mode 100644 index 00000000000..e81bcf6a465 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/compress/ZstdOuputStream.java @@ -0,0 +1,88 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.compress; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * @author bjorncs + */ +public class ZstdOuputStream extends OutputStream { + + private final ZstdCompressor compressor = new ZstdCompressor(); + + public static final int DEFAULT_INPUT_BUFFER_SIZE = 8*1024; + + private final OutputStream out; + private final byte[] inputBuffer; + private final byte[] outputBuffer; + private int inputPosition = 0; + private boolean isClosed = false; + + public ZstdOuputStream(OutputStream out, int inputBufferSize) { + this.out = out; + this.inputBuffer = new byte[inputBufferSize]; + this.outputBuffer = new byte[ZstdCompressor.getMaxCompressedLength(inputBufferSize)]; + } + + public ZstdOuputStream(OutputStream out) { + this(out, DEFAULT_INPUT_BUFFER_SIZE); + } + + @Override + public void write(int b) throws IOException { + throwIfClosed(); + inputBuffer[inputPosition++] = (byte) b; + flushIfFull(); + } + + @Override + public void write(byte[] b) throws IOException { + throwIfClosed(); + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + throwIfClosed(); + int end = off + len; + while (off < end) { + int copyLength = Math.min(end - off, inputBuffer.length - inputPosition); + System.arraycopy(b, off, inputBuffer, inputPosition, copyLength); + off += copyLength; + inputPosition += copyLength; + flushIfFull(); + } + } + + @Override + public void flush() throws IOException { + flushInternal(); + out.flush(); + } + + @Override + public void close() throws IOException { + throwIfClosed(); + flush(); + out.close(); + isClosed = true; + } + + private void flushInternal() throws IOException { + throwIfClosed(); + int compressedLength = compressor.compress(inputBuffer, 0, inputPosition, outputBuffer, 0, outputBuffer.length); + out.write(outputBuffer, 0, compressedLength); + inputPosition = 0; + } + + private void flushIfFull() throws IOException { + if (inputPosition == inputBuffer.length) { + flushInternal(); + } + } + + private void throwIfClosed() { + if (isClosed) throw new IllegalArgumentException("Output stream is already closed"); + } +} diff --git a/vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java b/vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java new file mode 100644 index 00000000000..5d35eb10215 --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/compress/ZstdOuputStreamTest.java @@ -0,0 +1,48 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.compress; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * @author bjorncs + */ +class ZstdOuputStreamTest { + + @Test + void output_stream_compresses_input() throws IOException { + byte[] inputData = "The quick brown fox jumps over the lazy dog".getBytes(); + ByteArrayOutputStream arrayOut = new ByteArrayOutputStream(); + try (ZstdOuputStream zstdOut = new ZstdOuputStream(arrayOut, 12)) { + zstdOut.write(inputData[0]); + zstdOut.write(inputData, 1, inputData.length - 1); + } + byte[] compressedData = arrayOut.toByteArray(); + ZstdCompressor compressor = new ZstdCompressor(); + byte[] decompressedData = new byte[inputData.length]; + compressor.decompress(compressedData, 0, compressedData.length, decompressedData, 0, decompressedData.length); + assertArrayEquals(inputData, decompressedData); + } + + @Test + void compressed_size_is_less_than_uncompressed() throws IOException { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 100; i++) { + builder.append("The quick brown fox jumps over the lazy dog").append('\n'); + } + byte[] inputData = builder.toString().getBytes(); + ByteArrayOutputStream arrayOut = new ByteArrayOutputStream(); + try (ZstdOuputStream zstdOut = new ZstdOuputStream(arrayOut)) { + zstdOut.write(inputData); + } + int compressedSize = arrayOut.toByteArray().length; + assertTrue( + compressedSize < inputData.length, + () -> "Compressed size is " + compressedSize + " while uncompressed size is " + inputData.length); + } +} -- cgit v1.2.3