diff options
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/handler/LogHandler.java')
-rw-r--r-- | container-core/src/main/java/com/yahoo/container/handler/LogHandler.java | 53 |
1 files changed, 46 insertions, 7 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java index b2a156862eb..4b23eafaa9c 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java @@ -3,13 +3,19 @@ package com.yahoo.container.handler; import com.google.inject.Inject; import com.yahoo.container.core.LogHandlerConfig; +import com.yahoo.container.jdisc.AsyncHttpResponse; +import com.yahoo.container.jdisc.ContentChannelOutputStream; import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.logging.Level; @@ -28,25 +34,58 @@ public class LogHandler extends ThreadedHttpRequestHandler { } @Override - public HttpResponse handle(HttpRequest request) { - + public AsyncHttpResponse handle(HttpRequest request) { Instant from = Optional.ofNullable(request.getProperty("from")) .map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MIN); Instant to = Optional.ofNullable(request.getProperty("to")) .map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MAX); - Optional<String> hostname = Optional.ofNullable(request.getProperty("hostname")); - return new HttpResponse(200) { + return new AsyncHttpResponse(200) { @Override - public void render(OutputStream outputStream) { + public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) { try { - logReader.writeLogs(outputStream, from, to, hostname); + OutputStream blockingOutput = new BlockingFlushContentChannelOutputStream(networkChannel); + logReader.writeLogs(blockingOutput, from, to, hostname); + blockingOutput.close(); } catch (Throwable t) { log.log(Level.WARNING, "Failed reading logs from " + from + " to " + to, t); } + finally { + networkChannel.close(handler); + } } }; } + + + private static class BlockingFlushContentChannelOutputStream extends ContentChannelOutputStream { + + private final ContentChannel channel; + + public BlockingFlushContentChannelOutputStream(ContentChannel endpoint) { + super(endpoint); + this.channel = endpoint; + } + + @Override + public void flush() throws IOException { + super.flush(); + CountDownLatch latch = new CountDownLatch(1); + channel.write(ByteBuffer.allocate(0), // :'( + new CompletionHandler() { + @Override public void completed() { latch.countDown(); } + @Override public void failed(Throwable t) { latch.countDown(); } + }); + try { + latch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException("Interrupted waiting for underlying IO to complete", e); + } + } + + } + } |