aboutsummaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
blob: 991cd83ffa8d0b7629dbaea22ffb68f1fa900422 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.handler;

import com.google.inject.Inject;
import com.yahoo.container.core.LogHandlerConfig;
import com.yahoo.container.jdisc.AsyncHttpResponse;
import com.yahoo.container.jdisc.ContentChannelOutputStream;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
import com.yahoo.jdisc.handler.CompletionHandler;
import com.yahoo.jdisc.handler.ContentChannel;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

public class LogHandler extends ThreadedHttpRequestHandler {

    private final LogReader logReader;
    private static final long MB = 1024*1024;

    @Inject
    public LogHandler(Executor executor, LogHandlerConfig config) {
        this(executor, new LogReader(config.logDirectory(), config.logPattern()));
    }

    LogHandler(Executor executor, LogReader logReader) {
        super(executor);
        this.logReader = logReader;
    }

    @Override
    public AsyncHttpResponse handle(HttpRequest request) {
        Instant from = Optional.ofNullable(request.getProperty("from"))
                               .map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MIN);
        Instant to = Optional.ofNullable(request.getProperty("to"))
                             .map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MAX);
        Optional<String> hostname = Optional.ofNullable(request.getProperty("hostname"));

        return new AsyncHttpResponse(200) {
            @Override
            public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) {
                try {
                    OutputStream blockingOutput = new MaxPendingContentChannelOutputStream(networkChannel, 1*MB);
                    logReader.writeLogs(blockingOutput, from, to, hostname);
                    blockingOutput.close();
                }
                catch (Throwable t) {
                    log.log(Level.WARNING, "Failed reading logs from " + from + " to " + to, t);
                }
                finally {
                    networkChannel.close(handler);
                }
            }
        };
    }


    private static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
        private final long maxPending;
        private final AtomicLong sent = new AtomicLong(0);
        private final AtomicLong acked = new AtomicLong(0);

        public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
            super(endpoint);
            this.maxPending = maxPending;
        }

        private long pendingBytes() {
            return sent.get() - acked.get();
        }

        private class TrackCompletition implements CompletionHandler {
            private final long written;
            private final AtomicBoolean replied = new AtomicBoolean(false);
            TrackCompletition(long written) {
                this.written = written;
                sent.addAndGet(written);
            }
            @Override
            public void completed() {
                if (!replied.getAndSet(true)) {
                    acked.addAndGet(written);
                }
            }

            @Override
            public void failed(Throwable t) {
                if (!replied.getAndSet(true)) {
                    acked.addAndGet(written);
                }
            }
        }
        @Override
        public void send(ByteBuffer src) throws IOException {
            try {
                stallWhilePendingAbove(maxPending);
            } catch (InterruptedException ignored) {
                throw new IOException("Interrupted waiting for IO");
            }
            CompletionHandler pendingTracker = new TrackCompletition(src.remaining());
            try {
                send(src, pendingTracker);
            } catch (Throwable throwable) {
                pendingTracker.failed(throwable);
                throw throwable;
            }
        }

        private void stallWhilePendingAbove(long pending) throws InterruptedException {
            while (pendingBytes() > pending) {
                Thread.sleep(1);
            }
        }

        @Override
        public void flush() throws IOException {
            super.flush();
            try {
                stallWhilePendingAbove(0);
            }
            catch (InterruptedException e) {
                throw new IOException("Interrupted waiting for IO");
            }
        }

    }

}