summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@yahooinc.com>2023-07-10 10:21:38 +0200
committerBjørn Christian Seime <bjorncs@yahooinc.com>2023-07-10 11:43:46 +0200
commit53767322abaf206ec6297d199acf21defb14dd51 (patch)
treedc2dba5a869f9f3ccd7b1d6ffe7706c8b8bbde25 /vespa-feed-client
parentd097d374cbc284b2a0a8459f47e87f9b83c68034 (diff)
Compress proactively
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java64
1 files changed, 14 insertions, 50 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
index 7fac977ca51..46eb0ca151b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
@@ -9,8 +9,8 @@ import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
-import org.eclipse.jetty.client.util.AbstractRequestContent;
import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.BytesRequestContent;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
@@ -18,7 +18,6 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.io.ClientConnector;
-import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.Promise;
@@ -33,10 +32,8 @@ import java.io.UncheckedIOException;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -81,9 +78,19 @@ class JettyCluster implements Cluster {
.idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS)
.timeout(reqTimeoutMillis, MILLISECONDS);
if (req.body() != null) {
- FeedContent content = new FeedContent(compression, req.body());
- content.contentEncoding().ifPresent(ce -> jettyReq.headers(hs -> hs.add(ce)));
- jettyReq.body(content);
+ boolean shouldCompress = compression == gzip || compression == auto && req.body().length > 512;
+ byte[] bytes;
+ if (shouldCompress) {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream(1 << 10);
+ try (GZIPOutputStream zip = new GZIPOutputStream(buffer)) {
+ zip.write(req.body());
+ } catch (IOException e) { throw new UncheckedIOException(e); }
+ bytes = buffer.toByteArray();
+ jettyReq.headers(hs -> hs.add(HttpHeader.CONTENT_ENCODING, "gzip"));
+ } else {
+ bytes = req.body();
+ }
+ jettyReq.body(new BytesRequestContent(APPLICATION_JSON.asString(), bytes));
}
jettyReq.send(new BufferingResponseListener() {
@Override
@@ -183,49 +190,6 @@ class JettyCluster implements Cluster {
Endpoint(URI uri) { this.uri = String.format("%s://%s:%s", uri.getScheme(), uri.getHost(), portOf(uri)); }
}
- private static class FeedContent extends AbstractRequestContent {
- final Compression compression;
- final byte[] body;
-
- FeedContent(Compression compression, byte[] body) {
- super(APPLICATION_JSON.asString());
- this.compression = compression;
- this.body = body;
- }
-
- @Override public boolean isReproducible() { return true; }
- @Override public long getLength() { return shouldCompress() ? -1 : body.length; }
- Optional<HttpField> contentEncoding() {
- return shouldCompress() ? Optional.of(new HttpField(HttpHeader.CONTENT_ENCODING, "gzip")) : Optional.empty();
- }
-
- @Override
- public Subscription newSubscription(Consumer consumer, boolean emitInitialContent) {
- return new SubscriptionImpl(consumer, emitInitialContent);
- }
-
- boolean shouldCompress() { return compression == gzip || compression == auto && body.length > 512; }
-
- class SubscriptionImpl extends AbstractSubscription {
- SubscriptionImpl(Consumer consumer, boolean emitInitialContent) { super(consumer, emitInitialContent); }
-
- @Override
- protected boolean produceContent(Producer producer) {
- byte[] bytes;
- if (shouldCompress()) {
- ByteArrayOutputStream buffer = new ByteArrayOutputStream(1 << 10);
- try (GZIPOutputStream zip = new GZIPOutputStream(buffer)) {
- zip.write(body);
- } catch (IOException e) { throw new UncheckedIOException(e); }
- bytes = buffer.toByteArray();
- } else {
- bytes = body;
- }
- return producer.produce(ByteBuffer.wrap(bytes), true, Callback.NOOP);
- }
- }
- }
-
private static class Ipv4PreferringResolver extends AbstractLifeCycle implements SocketAddressResolver {
final HttpClient client;