summaryrefslogtreecommitdiffstats
path: root/container-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-02-17 17:29:51 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2021-02-17 17:39:12 +0100
commit6856c7448dcdae692f60aa65a79ec34c984b0586 (patch)
tree70305bdd01a82e8fcf6d3bd984ce2bb59cf50ea4 /container-core
parent1e4291ddeeb9072b49e64ef962f61a28dd256b9b (diff)
Track sent and acked bytes to ensure a smooth flow of bytes not exceeding maxpending by more than
than a small byte buffer.
Diffstat (limited to 'container-core')
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/LogHandler.java56
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/LogReader.java6
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java20
-rw-r--r--container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java2
4 files changed, 61 insertions, 23 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
index 4b23eafaa9c..ad03f8405ac 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
@@ -15,13 +15,14 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
public class LogHandler extends ThreadedHttpRequestHandler {
private final LogReader logReader;
+ private static final long MB = 1024*1024;
@Inject
public LogHandler(Executor executor, LogHandlerConfig config) {
@@ -45,7 +46,7 @@ public class LogHandler extends ThreadedHttpRequestHandler {
@Override
public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) {
try {
- OutputStream blockingOutput = new BlockingFlushContentChannelOutputStream(networkChannel);
+ OutputStream blockingOutput = new MaxPendingContentChannelOutputStream(networkChannel, 1*MB);
logReader.writeLogs(blockingOutput, from, to, hostname);
blockingOutput.close();
}
@@ -60,26 +61,57 @@ public class LogHandler extends ThreadedHttpRequestHandler {
}
- private static class BlockingFlushContentChannelOutputStream extends ContentChannelOutputStream {
-
+ private static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
private final ContentChannel channel;
+ private final long maxPending;
+ private AtomicLong sent = new AtomicLong(0);
+ private AtomicLong acked = new AtomicLong(0);
- public BlockingFlushContentChannelOutputStream(ContentChannel endpoint) {
+ public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
super(endpoint);
this.channel = endpoint;
+ this.maxPending = maxPending;
+ }
+
+ private long pendingBytes() {
+ return sent.get() - acked.get();
+ }
+
+ private class TrackCompletition implements CompletionHandler {
+ final long written;
+ TrackCompletition(long written) {
+ this.written = written;
+ sent.addAndGet(written);
+ }
+ @Override
+ public void completed() {
+ acked.addAndGet(written);
+ }
+
+ @Override
+ public void failed(Throwable t) {
+ acked.addAndGet(written);
+ }
+ }
+ @Override
+ public void send(ByteBuffer src) throws IOException {
+ try {
+ stallWhilePendingAbove(maxPending);
+ } catch (InterruptedException e) {}
+ send(src, new TrackCompletition(src.remaining()));
+ }
+
+ private void stallWhilePendingAbove(long pending) throws InterruptedException {
+ while (pendingBytes() > pending) {
+ Thread.sleep(1);
+ }
}
@Override
public void flush() throws IOException {
super.flush();
- CountDownLatch latch = new CountDownLatch(1);
- channel.write(ByteBuffer.allocate(0), // :'(
- new CompletionHandler() {
- @Override public void completed() { latch.countDown(); }
- @Override public void failed(Throwable t) { latch.countDown(); }
- });
try {
- latch.await();
+ stallWhilePendingAbove(0);
}
catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for underlying IO to complete", e);
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java
index a7fcb8a9f71..af83f97fd9b 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java
@@ -72,16 +72,10 @@ class LogReader {
Iterator<LineWithTimestamp> lines = Iterators.mergeSorted(logLineIterators,
Comparator.comparingDouble(LineWithTimestamp::timestamp));
- long charsWritten = 0;
while (lines.hasNext()) {
String line = lines.next().line();
writer.write(line);
writer.newLine();
- charsWritten += line.length();
- if (charsWritten > 0x100000) {
- writer.flush();
- charsWritten = 0;
- }
}
}
catch (IOException e) {
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java
index 1d4c20efe5e..270da0c4ab0 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java
@@ -12,7 +12,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -125,9 +124,13 @@ public class ContentChannelOutputStream extends OutputStream implements Writable
@Override
public void send(ByteBuffer src) throws IOException {
// Don't do a buffer.flush() from here, this method is used by the buffer itself
+ send(src, null);
+ }
+
+ protected void send(ByteBuffer src, CompletionHandler completionHandler) throws IOException {
try {
byteBufferData += src.remaining();
- endpoint.write(src, new LoggingCompletionHandler());
+ endpoint.write(src, new LoggingCompletionHandler(completionHandler));
} catch (RuntimeException e) {
throw new IOException(Exceptions.toMessageString(e), e);
}
@@ -138,10 +141,16 @@ public class ContentChannelOutputStream extends OutputStream implements Writable
return buffer.appended() + byteBufferData;
}
- class LoggingCompletionHandler implements CompletionHandler {
-
+ private class LoggingCompletionHandler implements CompletionHandler {
+ private final CompletionHandler nested;
+ LoggingCompletionHandler(CompletionHandler nested) {
+ this.nested = nested;
+ }
@Override
public void completed() {
+ if (nested != null) {
+ nested.completed();
+ }
}
@Override
@@ -158,6 +167,9 @@ public class ContentChannelOutputStream extends OutputStream implements Writable
if (log.isLoggable(logLevel)) {
log.log(logLevel, "Got exception when writing to client: " + Exceptions.toMessageString(t));
}
+ if (nested != null) {
+ nested.failed(t);
+ }
}
}
diff --git a/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java b/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java
index 22e55eb5291..bdaf6f7919f 100644
--- a/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java
+++ b/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java
@@ -96,7 +96,7 @@ public class LoggingTestCase {
}
@Test
- public final void testFailed() throws IOException, InterruptedException {
+ public final void testFailed() throws IOException {
stream.send(createData());
stream.send(createData());
stream.send(createData());