diff options
author | Bjørn Christian Seime <bjorncs@yahooinc.com> | 2023-07-10 10:21:38 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@yahooinc.com> | 2023-07-10 11:43:46 +0200 |
commit | 53767322abaf206ec6297d199acf21defb14dd51 (patch) | |
tree | dc2dba5a869f9f3ccd7b1d6ffe7706c8b8bbde25 /vespa-feed-client | |
parent | d097d374cbc284b2a0a8459f47e87f9b83c68034 (diff) |
Compress proactively
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java | 64 |
1 files changed, 14 insertions, 50 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index 7fac977ca51..46eb0ca151b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -9,8 +9,8 @@ import org.eclipse.jetty.client.MultiplexConnectionPool; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.AbstractRequestContent; import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.BytesRequestContent; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; @@ -18,7 +18,6 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http2.client.HTTP2Client; import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; import org.eclipse.jetty.io.ClientConnector; -import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.HttpCookieStore; import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.Promise; @@ -33,10 +32,8 @@ import java.io.UncheckedIOException; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.URI; -import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -81,9 +78,19 @@ class JettyCluster implements Cluster { .idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS) .timeout(reqTimeoutMillis, MILLISECONDS); if (req.body() != null) { - FeedContent content = new FeedContent(compression, req.body()); - content.contentEncoding().ifPresent(ce -> jettyReq.headers(hs -> hs.add(ce))); - jettyReq.body(content); + boolean shouldCompress = compression == gzip || compression == auto && req.body().length > 512; + byte[] bytes; + if (shouldCompress) { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(1 << 10); + try (GZIPOutputStream zip = new GZIPOutputStream(buffer)) { + zip.write(req.body()); + } catch (IOException e) { throw new UncheckedIOException(e); } + bytes = buffer.toByteArray(); + jettyReq.headers(hs -> hs.add(HttpHeader.CONTENT_ENCODING, "gzip")); + } else { + bytes = req.body(); + } + jettyReq.body(new BytesRequestContent(APPLICATION_JSON.asString(), bytes)); } jettyReq.send(new BufferingResponseListener() { @Override @@ -183,49 +190,6 @@ class JettyCluster implements Cluster { Endpoint(URI uri) { this.uri = String.format("%s://%s:%s", uri.getScheme(), uri.getHost(), portOf(uri)); } } - private static class FeedContent extends AbstractRequestContent { - final Compression compression; - final byte[] body; - - FeedContent(Compression compression, byte[] body) { - super(APPLICATION_JSON.asString()); - this.compression = compression; - this.body = body; - } - - @Override public boolean isReproducible() { return true; } - @Override public long getLength() { return shouldCompress() ? -1 : body.length; } - Optional<HttpField> contentEncoding() { - return shouldCompress() ? Optional.of(new HttpField(HttpHeader.CONTENT_ENCODING, "gzip")) : Optional.empty(); - } - - @Override - public Subscription newSubscription(Consumer consumer, boolean emitInitialContent) { - return new SubscriptionImpl(consumer, emitInitialContent); - } - - boolean shouldCompress() { return compression == gzip || compression == auto && body.length > 512; } - - class SubscriptionImpl extends AbstractSubscription { - SubscriptionImpl(Consumer consumer, boolean emitInitialContent) { super(consumer, emitInitialContent); } - - @Override - protected boolean produceContent(Producer producer) { - byte[] bytes; - if (shouldCompress()) { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(1 << 10); - try (GZIPOutputStream zip = new GZIPOutputStream(buffer)) { - zip.write(body); - } catch (IOException e) { throw new UncheckedIOException(e); } - bytes = buffer.toByteArray(); - } else { - bytes = body; - } - return producer.produce(ByteBuffer.wrap(bytes), true, Callback.NOOP); - } - } - } - private static class Ipv4PreferringResolver extends AbstractLifeCycle implements SocketAddressResolver { final HttpClient client; |