summaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/handler/LogHandler.java')
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/LogHandler.java53
1 files changed, 46 insertions, 7 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
index b2a156862eb..4b23eafaa9c 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
@@ -3,13 +3,19 @@ package com.yahoo.container.handler;
import com.google.inject.Inject;
import com.yahoo.container.core.LogHandlerConfig;
+import com.yahoo.container.jdisc.AsyncHttpResponse;
+import com.yahoo.container.jdisc.ContentChannelOutputStream;
import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.logging.Level;
@@ -28,25 +34,58 @@ public class LogHandler extends ThreadedHttpRequestHandler {
}
@Override
- public HttpResponse handle(HttpRequest request) {
-
+ public AsyncHttpResponse handle(HttpRequest request) {
Instant from = Optional.ofNullable(request.getProperty("from"))
.map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MIN);
Instant to = Optional.ofNullable(request.getProperty("to"))
.map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MAX);
-
Optional<String> hostname = Optional.ofNullable(request.getProperty("hostname"));
- return new HttpResponse(200) {
+ return new AsyncHttpResponse(200) {
@Override
- public void render(OutputStream outputStream) {
+ public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) {
try {
- logReader.writeLogs(outputStream, from, to, hostname);
+ OutputStream blockingOutput = new BlockingFlushContentChannelOutputStream(networkChannel);
+ logReader.writeLogs(blockingOutput, from, to, hostname);
+ blockingOutput.close();
}
catch (Throwable t) {
log.log(Level.WARNING, "Failed reading logs from " + from + " to " + to, t);
}
+ finally {
+ networkChannel.close(handler);
+ }
}
};
}
+
+
+ private static class BlockingFlushContentChannelOutputStream extends ContentChannelOutputStream {
+
+ private final ContentChannel channel;
+
+ public BlockingFlushContentChannelOutputStream(ContentChannel endpoint) {
+ super(endpoint);
+ this.channel = endpoint;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ CountDownLatch latch = new CountDownLatch(1);
+ channel.write(ByteBuffer.allocate(0), // :'(
+ new CompletionHandler() {
+ @Override public void completed() { latch.countDown(); }
+ @Override public void failed(Throwable t) { latch.countDown(); }
+ });
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted waiting for underlying IO to complete", e);
+ }
+ }
+
+ }
+
}