aboutsummaryrefslogtreecommitdiffstats
path: root/container-core
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-21 17:48:19 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-21 17:48:19 +0200
commit9e2cbe370e498fe0a89c8abae2a8b23344cf3f6a (patch)
treefaf8464e04acf1a943a2b751f3745a2c74c1dc29 /container-core
parent56c3fc7c2a3b7e317e79593aa56ed2d03472cbde (diff)
Revert "Merge pull request #19686 from vespa-engine/jonmv/revert-streamed-visits"
This reverts commit 56c3fc7c2a3b7e317e79593aa56ed2d03472cbde, reversing changes made to 367dae08c390833a54c1bae11282df5a7e056d16.
Diffstat (limited to 'container-core')
-rw-r--r--container-core/abi-spec.json13
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java92
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java80
-rw-r--r--container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java3
4 files changed, 105 insertions, 83 deletions
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json
index 02d43104a3f..8c0f3e5fd80 100644
--- a/container-core/abi-spec.json
+++ b/container-core/abi-spec.json
@@ -762,6 +762,19 @@
],
"fields": []
},
+ "com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream": {
+ "superClass": "com.yahoo.container.jdisc.ContentChannelOutputStream",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(com.yahoo.jdisc.handler.ContentChannel, long)",
+ "public void send(java.nio.ByteBuffer)",
+ "public void flush()"
+ ],
+ "fields": []
+ },
"com.yahoo.container.jdisc.MetricConsumerFactory": {
"superClass": "java.lang.Object",
"interfaces": [],
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java
new file mode 100644
index 00000000000..aec4eeecd7b
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java
@@ -0,0 +1,92 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc;
+
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author baldersheim
+ */
+public class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
+
+ private final long maxPending;
+ private final AtomicLong sent = new AtomicLong(0);
+ private final AtomicLong acked = new AtomicLong(0);
+
+ public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
+ super(endpoint);
+ this.maxPending = maxPending;
+ }
+
+ private long pendingBytes() {
+ return sent.get() - acked.get();
+ }
+
+ private class TrackCompletion implements CompletionHandler {
+
+ private final long written;
+ private final AtomicBoolean replied = new AtomicBoolean(false);
+
+ TrackCompletion(long written) {
+ this.written = written;
+ sent.addAndGet(written);
+ }
+
+ @Override
+ public void completed() {
+ if (!replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+
+ @Override
+ public void failed(Throwable t) {
+ if (!replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+
+ }
+
+ @Override
+ public void send(ByteBuffer src) throws IOException {
+ try {
+ stallWhilePendingAbove(maxPending);
+ }
+ catch (InterruptedException ignored) {
+ throw new InterruptedIOException("Interrupted waiting for IO");
+ }
+ CompletionHandler pendingTracker = new TrackCompletion(src.remaining());
+ try {
+ send(src, pendingTracker);
+ }
+ catch (Throwable throwable) {
+ pendingTracker.failed(throwable);
+ throw throwable;
+ }
+ }
+
+ private void stallWhilePendingAbove(long pending) throws InterruptedException {
+ while (pendingBytes() > pending) {
+ Thread.sleep(1);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ try {
+ stallWhilePendingAbove(0);
+ }
+ catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted waiting for IO");
+ }
+ }
+
+}
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
index 0bfe4afe07d..0c3c1e2120b 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
@@ -10,9 +10,6 @@ import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.UnsafeContentInputStream;
import com.yahoo.jdisc.handler.ResponseHandler;
-import java.io.InterruptedIOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.io.IOException;
@@ -253,81 +250,4 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler
}
- /**
- * @author baldersheim
- */
- static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
- private final long maxPending;
- private final AtomicLong sent = new AtomicLong(0);
- private final AtomicLong acked = new AtomicLong(0);
-
- public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
- super(endpoint);
- this.maxPending = maxPending;
- }
-
- private long pendingBytes() {
- return sent.get() - acked.get();
- }
-
- private class TrackCompletion implements CompletionHandler {
-
- private final long written;
- private final AtomicBoolean replied = new AtomicBoolean(false);
-
- TrackCompletion(long written) {
- this.written = written;
- sent.addAndGet(written);
- }
-
- @Override
- public void completed() {
- if ( ! replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
-
- @Override
- public void failed(Throwable t) {
- if ( ! replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
- }
-
- @Override
- public void send(ByteBuffer src) throws IOException {
- try {
- stallWhilePendingAbove(maxPending);
- } catch (InterruptedException ignored) {
- throw new InterruptedIOException("Interrupted waiting for IO");
- }
- CompletionHandler pendingTracker = new TrackCompletion(src.remaining());
- try {
- send(src, pendingTracker);
- } catch (Throwable throwable) {
- pendingTracker.failed(throwable);
- throw throwable;
- }
- }
-
- private void stallWhilePendingAbove(long pending) throws InterruptedException {
- while (pendingBytes() > pending) {
- Thread.sleep(1);
- }
- }
-
- @Override
- public void flush() throws IOException {
- super.flush();
- try {
- stallWhilePendingAbove(0);
- }
- catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted waiting for IO");
- }
- }
-
- }
-
}
diff --git a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java
index d1036ce0e45..a9b16799aea 100644
--- a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java
+++ b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc;
-import com.yahoo.container.jdisc.ThreadedHttpRequestHandler.MaxPendingContentChannelOutputStream;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.application.ContainerBuilder;
@@ -20,11 +19,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;