From 90c4fa8c527f99146287f77ce7157a7ec1594df7 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Thu, 21 Oct 2021 17:44:23 +0200 Subject: Revert "Merge pull request #19664 from vespa-engine/jonmv/streaming-doc-v1-visit" This reverts commit b5d4b42caabf2c41fcbd8b21814819aae77dc7aa, reversing changes made to 9abe019606f2367b05e4e13d796de65dddf7c449. --- .../MaxPendingContentChannelOutputStream.java | 92 ---------------------- 1 file changed, 92 deletions(-) delete mode 100644 container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java (limited to 'container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java') diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java deleted file mode 100644 index aec4eeecd7b..00000000000 --- a/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.container.jdisc; - -import com.yahoo.jdisc.handler.CompletionHandler; -import com.yahoo.jdisc.handler.ContentChannel; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -/** - * @author baldersheim - */ -public class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream { - - private final long maxPending; - private final AtomicLong sent = new AtomicLong(0); - private final AtomicLong acked = new AtomicLong(0); - - public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) { - super(endpoint); - this.maxPending = maxPending; - } - - private long pendingBytes() { - return sent.get() - acked.get(); - } - - private class TrackCompletion implements CompletionHandler { - - private final long written; - private final AtomicBoolean replied = new AtomicBoolean(false); - - TrackCompletion(long written) { - this.written = written; - sent.addAndGet(written); - } - - @Override - public void completed() { - if (!replied.getAndSet(true)) { - acked.addAndGet(written); - } - } - - @Override - public void failed(Throwable t) { - if (!replied.getAndSet(true)) { - acked.addAndGet(written); - } - } - - } - - @Override - public void send(ByteBuffer src) throws IOException { - try { - stallWhilePendingAbove(maxPending); - } - catch (InterruptedException ignored) { - throw new InterruptedIOException("Interrupted waiting for IO"); - } - CompletionHandler pendingTracker = new TrackCompletion(src.remaining()); - try { - send(src, pendingTracker); - } - catch (Throwable throwable) { - pendingTracker.failed(throwable); - throw throwable; - } - } - - private void stallWhilePendingAbove(long pending) throws InterruptedException { - while (pendingBytes() > pending) { - Thread.sleep(1); - } - } - - @Override - public void flush() throws IOException { - super.flush(); - try { - stallWhilePendingAbove(0); - } - catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted waiting for IO"); - } - } - -} -- cgit v1.2.3