summaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-21 17:44:23 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-21 17:44:23 +0200
commit90c4fa8c527f99146287f77ce7157a7ec1594df7 (patch)
treed3c65ce6715b87bfa587dba250b7445ad2a65cc0 /container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java
parent3cf7e6d9d2aab508924da8ef9f1a8f1b10f31453 (diff)
Revert "Merge pull request #19664 from vespa-engine/jonmv/streaming-doc-v1-visit"
This reverts commit b5d4b42caabf2c41fcbd8b21814819aae77dc7aa, reversing changes made to 9abe019606f2367b05e4e13d796de65dddf7c449.
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java')
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java92
1 files changed, 0 insertions, 92 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java
deleted file mode 100644
index aec4eeecd7b..00000000000
--- a/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java
+++ /dev/null
@@ -1,92 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.container.jdisc;
-
-import com.yahoo.jdisc.handler.CompletionHandler;
-import com.yahoo.jdisc.handler.ContentChannel;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * @author baldersheim
- */
-public class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
-
- private final long maxPending;
- private final AtomicLong sent = new AtomicLong(0);
- private final AtomicLong acked = new AtomicLong(0);
-
- public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
- super(endpoint);
- this.maxPending = maxPending;
- }
-
- private long pendingBytes() {
- return sent.get() - acked.get();
- }
-
- private class TrackCompletion implements CompletionHandler {
-
- private final long written;
- private final AtomicBoolean replied = new AtomicBoolean(false);
-
- TrackCompletion(long written) {
- this.written = written;
- sent.addAndGet(written);
- }
-
- @Override
- public void completed() {
- if (!replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
-
- @Override
- public void failed(Throwable t) {
- if (!replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
-
- }
-
- @Override
- public void send(ByteBuffer src) throws IOException {
- try {
- stallWhilePendingAbove(maxPending);
- }
- catch (InterruptedException ignored) {
- throw new InterruptedIOException("Interrupted waiting for IO");
- }
- CompletionHandler pendingTracker = new TrackCompletion(src.remaining());
- try {
- send(src, pendingTracker);
- }
- catch (Throwable throwable) {
- pendingTracker.failed(throwable);
- throw throwable;
- }
- }
-
- private void stallWhilePendingAbove(long pending) throws InterruptedException {
- while (pendingBytes() > pending) {
- Thread.sleep(1);
- }
- }
-
- @Override
- public void flush() throws IOException {
- super.flush();
- try {
- stallWhilePendingAbove(0);
- }
- catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted waiting for IO");
- }
- }
-
-}