diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2020-08-31 19:30:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-31 19:30:45 +0200 |
commit | f6792aac5a511e0e67cb5fe1c989bff4c786cbab (patch) | |
tree | a7232a94802477d7e37796267044b3670a8a35b0 | |
parent | 924ba9becee4f608db610f402422efd71ab6073c (diff) | |
parent | f1554688a4d125392241d48eb8eef334463e1a27 (diff) |
Merge pull request #14213 from vespa-engine/jonmv/log-handler-with-blocking-flush
Jonmv/log handler with blocking flush
4 files changed, 61 insertions, 23 deletions
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json index aa2e5ccfa5f..9292a946e82 100644 --- a/container-core/abi-spec.json +++ b/container-core/abi-spec.json @@ -114,7 +114,8 @@ ], "methods": [ "public void <init>(java.util.concurrent.Executor, com.yahoo.container.core.LogHandlerConfig)", - "public com.yahoo.container.jdisc.HttpResponse handle(com.yahoo.container.jdisc.HttpRequest)" + "public com.yahoo.container.jdisc.AsyncHttpResponse handle(com.yahoo.container.jdisc.HttpRequest)", + "public bridge synthetic com.yahoo.container.jdisc.HttpResponse handle(com.yahoo.container.jdisc.HttpRequest)" ], "fields": [] }, diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java index b2a156862eb..4b23eafaa9c 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java @@ -3,13 +3,19 @@ package com.yahoo.container.handler; import com.google.inject.Inject; import com.yahoo.container.core.LogHandlerConfig; +import com.yahoo.container.jdisc.AsyncHttpResponse; +import com.yahoo.container.jdisc.ContentChannelOutputStream; import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.jdisc.ThreadedHttpRequestHandler; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.logging.Level; @@ -28,25 +34,58 @@ public class LogHandler extends ThreadedHttpRequestHandler { } @Override - public HttpResponse handle(HttpRequest request) { - + public AsyncHttpResponse handle(HttpRequest request) { Instant from = Optional.ofNullable(request.getProperty("from")) .map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MIN); Instant to = Optional.ofNullable(request.getProperty("to")) .map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MAX); - Optional<String> hostname = Optional.ofNullable(request.getProperty("hostname")); - return new HttpResponse(200) { + return new AsyncHttpResponse(200) { @Override - public void render(OutputStream outputStream) { + public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) { try { - logReader.writeLogs(outputStream, from, to, hostname); + OutputStream blockingOutput = new BlockingFlushContentChannelOutputStream(networkChannel); + logReader.writeLogs(blockingOutput, from, to, hostname); + blockingOutput.close(); } catch (Throwable t) { log.log(Level.WARNING, "Failed reading logs from " + from + " to " + to, t); } + finally { + networkChannel.close(handler); + } } }; } + + + private static class BlockingFlushContentChannelOutputStream extends ContentChannelOutputStream { + + private final ContentChannel channel; + + public BlockingFlushContentChannelOutputStream(ContentChannel endpoint) { + super(endpoint); + this.channel = endpoint; + } + + @Override + public void flush() throws IOException { + super.flush(); + CountDownLatch latch = new CountDownLatch(1); + channel.write(ByteBuffer.allocate(0), // :'( + new CompletionHandler() { + @Override public void completed() { latch.countDown(); } + @Override public void failed(Throwable t) { latch.countDown(); } + }); + try { + latch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException("Interrupted waiting for underlying IO to complete", e); + } + } + + } + } diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java index 9c48955bf4c..329889e70c0 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java @@ -53,11 +53,7 @@ public class ContentChannelOutputStream extends OutputStream implements Writable public void close() throws IOException { // the endpoint is closed in a finally{} block inside AbstractHttpRequestHandler // this class should be possible to close willynilly as it is exposed to plug-ins - try { - buffer.flush(); - } catch (RuntimeException e) { - throw new IOException(Exceptions.toMessageString(e), e); - } + flush(); } /** diff --git a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java index 97aa8864eae..afe57579a97 100644 --- a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java +++ b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java @@ -1,17 +1,19 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.handler; +import com.yahoo.container.jdisc.AsyncHttpResponse; import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.jdisc.handler.ReadableContentChannel; +import com.yahoo.yolean.Exceptions; import org.junit.Test; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.time.Instant; import java.util.Optional; import java.util.concurrent.Executor; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -24,20 +26,20 @@ public class LogHandlerTest { { String uri = "http://myhost.com:1111/logs?from=1000&to=2000"; - HttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET)); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - response.render(bos); + AsyncHttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET)); + ReadableContentChannel out = new ReadableContentChannel(); + new Thread(() -> Exceptions.uncheck(() -> response.render(null, out, null))).start(); String expectedResponse = "newer log"; - assertEquals(expectedResponse, bos.toString()); + assertEquals(expectedResponse, new String(out.toStream().readAllBytes(), UTF_8)); } { String uri = "http://myhost.com:1111/logs?from=0&to=1000"; - HttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET)); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - response.render(bos); + AsyncHttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET)); + ReadableContentChannel out = new ReadableContentChannel(); + new Thread(() -> Exceptions.uncheck(() -> response.render(null, out, null))).start(); String expectedResponse = "older log"; - assertEquals(expectedResponse, bos.toString()); + assertEquals(expectedResponse, new String(out.toStream().readAllBytes(), UTF_8)); } } |