summaryrefslogtreecommitdiffstats
path: root/container-core
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-08-31 17:19:44 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-08-31 17:19:44 +0200
commitefd1227b1e839691832f56a5d252e7c11a496b25 (patch)
tree8f9e9e439a47324507d8109622e13b6605e04c1e /container-core
parent6b5474b527417e41f6d8ef2ef922899c5accc2c5 (diff)
Mak log handler wait for network IO on every flush
Diffstat (limited to 'container-core')
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/LogHandler.java53
-rw-r--r--container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java22
2 files changed, 58 insertions, 17 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
index b2a156862eb..4b23eafaa9c 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
@@ -3,13 +3,19 @@ package com.yahoo.container.handler;
import com.google.inject.Inject;
import com.yahoo.container.core.LogHandlerConfig;
+import com.yahoo.container.jdisc.AsyncHttpResponse;
+import com.yahoo.container.jdisc.ContentChannelOutputStream;
import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.logging.Level;
@@ -28,25 +34,58 @@ public class LogHandler extends ThreadedHttpRequestHandler {
}
@Override
- public HttpResponse handle(HttpRequest request) {
-
+ public AsyncHttpResponse handle(HttpRequest request) {
Instant from = Optional.ofNullable(request.getProperty("from"))
.map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MIN);
Instant to = Optional.ofNullable(request.getProperty("to"))
.map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MAX);
-
Optional<String> hostname = Optional.ofNullable(request.getProperty("hostname"));
- return new HttpResponse(200) {
+ return new AsyncHttpResponse(200) {
@Override
- public void render(OutputStream outputStream) {
+ public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) {
try {
- logReader.writeLogs(outputStream, from, to, hostname);
+ OutputStream blockingOutput = new BlockingFlushContentChannelOutputStream(networkChannel);
+ logReader.writeLogs(blockingOutput, from, to, hostname);
+ blockingOutput.close();
}
catch (Throwable t) {
log.log(Level.WARNING, "Failed reading logs from " + from + " to " + to, t);
}
+ finally {
+ networkChannel.close(handler);
+ }
}
};
}
+
+
+ private static class BlockingFlushContentChannelOutputStream extends ContentChannelOutputStream {
+
+ private final ContentChannel channel;
+
+ public BlockingFlushContentChannelOutputStream(ContentChannel endpoint) {
+ super(endpoint);
+ this.channel = endpoint;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ CountDownLatch latch = new CountDownLatch(1);
+ channel.write(ByteBuffer.allocate(0), // :'(
+ new CompletionHandler() {
+ @Override public void completed() { latch.countDown(); }
+ @Override public void failed(Throwable t) { latch.countDown(); }
+ });
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted waiting for underlying IO to complete", e);
+ }
+ }
+
+ }
+
}
diff --git a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java
index 97aa8864eae..afe57579a97 100644
--- a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java
+++ b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java
@@ -1,17 +1,19 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.handler;
+import com.yahoo.container.jdisc.AsyncHttpResponse;
import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.jdisc.handler.ReadableContentChannel;
+import com.yahoo.yolean.Exceptions;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.Executor;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@@ -24,20 +26,20 @@ public class LogHandlerTest {
{
String uri = "http://myhost.com:1111/logs?from=1000&to=2000";
- HttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET));
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- response.render(bos);
+ AsyncHttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET));
+ ReadableContentChannel out = new ReadableContentChannel();
+ new Thread(() -> Exceptions.uncheck(() -> response.render(null, out, null))).start();
String expectedResponse = "newer log";
- assertEquals(expectedResponse, bos.toString());
+ assertEquals(expectedResponse, new String(out.toStream().readAllBytes(), UTF_8));
}
{
String uri = "http://myhost.com:1111/logs?from=0&to=1000";
- HttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET));
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- response.render(bos);
+ AsyncHttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET));
+ ReadableContentChannel out = new ReadableContentChannel();
+ new Thread(() -> Exceptions.uncheck(() -> response.render(null, out, null))).start();
String expectedResponse = "older log";
- assertEquals(expectedResponse, bos.toString());
+ assertEquals(expectedResponse, new String(out.toStream().readAllBytes(), UTF_8));
}
}