aboutsummaryrefslogtreecommitdiffstats
path: root/jdisc_core/src/main/java/com/yahoo/jdisc/handler/ThreadedRequestHandler.java
blob: 6e2895f118bec9baf47fcf50ca57513fea5332f0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jdisc.handler;

import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.Response;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
 * <p>This class implements a {@link RequestHandler} with a synchronous {@link #handleRequest(Request,
 * BufferedContentChannel, ResponseHandler)} API for handling {@link Request}s. An Executor is provided at construction
 * time, and all Requests are automatically scheduled for processing on that Executor.</p>
 *
 * <p>A very simple echo handler could be implemented like this:</p>
 * <pre>
 * class MyRequestHandler extends ThreadedRequestHandler {
 *
 *     &#64;Inject
 *     MyRequestHandler(Executor executor) {
 *         super(executor);
 *     }
 *
 *     &#64;Override
 *     protected void handleRequest(Request request, ReadableContentChannel requestContent, ResponseHandler handler) {
 *         ContentWriter responseContent = ResponseDispatch.newInstance(Response.Status.OK).connectWriter(handler);
 *         try {
 *             for (ByteBuffer buf : requestContent) {
 *                 responseContent.write(buf);
 *             }
 *         } catch (RuntimeException e) {
 *             requestContent.failed(e);
 *             throw e;
 *         } finally {
 *             responseContent.close();
 *         }
 *     }
 * }
 * </pre>
 *
 * @author Simon Thoresen Hult
 */
public abstract class ThreadedRequestHandler extends AbstractRequestHandler {

    private final Executor executor;
    private volatile long timeout = 0;

    protected ThreadedRequestHandler(Executor executor) {
        Objects.requireNonNull(executor, "executor");
        this.executor = executor;
    }

    /**
     * <p>Sets the timeout that this ThreadedRequestHandler sets on all handled {@link Request}s. If the
     * <em>timeout</em> value is less than or equal to zero, no timeout will be applied.</p>
     *
     * @param timeout The allocated amount of time.
     * @param unit    The time unit of the <em>timeout</em> argument.
     */
    public final void setTimeout(long timeout, TimeUnit unit) {
        this.timeout = unit.toMillis(timeout);
    }

    /**
     * <p>Returns the timeout that this ThreadedRequestHandler sets on all handled {@link Request}s.</p>
     *
     * @param unit The unit to use for the return value.
     * @return The timeout in the appropriate unit.
     */
    public final long getTimeout(TimeUnit unit) {
        return unit.convert(timeout, TimeUnit.MILLISECONDS);
    }

    @Override
    public final ContentChannel handleRequest(Request request, ResponseHandler responseHandler) {
        if (timeout > 0) {
            request.setTimeout(timeout, TimeUnit.MILLISECONDS);
        }
        BufferedContentChannel content = new BufferedContentChannel();
        executor.execute(new RequestTask(request, content, responseHandler));
        return content;
    }

    /**
     * <p>Override this method if you want to access the {@link Request}'s content using a {@link
     * BufferedContentChannel}. If you do not override this method, it will call {@link #handleRequest(Request,
     * ReadableContentChannel, ResponseHandler)}.</p>
     *
     * @param request         The Request to handle.
     * @param responseHandler The handler to pass the corresponding {@link Response} to.
     * @param requestContent  The content of the Request.
     */
    protected void handleRequest(Request request, BufferedContentChannel requestContent,
                                 ResponseHandler responseHandler)
    {
        ReadableContentChannel readable = requestContent.toReadable();
        try {
            handleRequest(request, readable, responseHandler);
        } finally {
            while (readable.read() != null) {} // consume all ignored content
        }
    }

    /**
     * <p>Implement this method if you want to access the {@link Request}'s content using a {@link
     * ReadableContentChannel}. If you do not override this method, it will call {@link #handleRequest(Request,
     * ContentInputStream, ResponseHandler)}.</p>
     *
     * @param request         The Request to handle.
     * @param responseHandler The handler to pass the corresponding {@link Response} to.
     * @param requestContent  The content of the Request.
     */
    protected void handleRequest(Request request, ReadableContentChannel requestContent,
                                 ResponseHandler responseHandler)
    {
        ContentInputStream inputStream = requestContent.toStream();
        try {
            handleRequest(request, inputStream, responseHandler);
        } finally {
            while (inputStream.read() >= 0) {} // consume all ignored content
        }
    }

    /**
     * <p>Implement this method if you want to access the {@link Request}'s content using a {@link ContentInputStream}.
     * If you do not override this method, it will dispatch a {@link Response} to the {@link ResponseHandler} with a
     * <code>Response.Status.NOT_IMPLEMENTED</code> status.</p>
     *
     * @param request         The Request to handle.
     * @param responseHandler The handler to pass the corresponding {@link Response} to.
     * @param requestContent  The content of the Request.
     */
    @SuppressWarnings("UnusedParameters")
    protected void handleRequest(Request request, ContentInputStream requestContent,
                                 ResponseHandler responseHandler)
    {
        ResponseDispatch.newInstance(Response.Status.NOT_IMPLEMENTED).dispatch(responseHandler);
    }

    private class RequestTask implements Runnable {

        final Request request;
        final BufferedContentChannel content;
        final ResponseHandler responseHandler;
        private final ResourceReference requestReference;

        RequestTask(Request request, BufferedContentChannel content, ResponseHandler responseHandler) {
            this.request = request;
            this.content = content;
            this.responseHandler = responseHandler;
            this.requestReference = request.refer();
        }

        @Override
        public void run() {
            try (final ResourceReference ref = requestReference) {
                ThreadedRequestHandler.this.handleRequest(request, content, responseHandler);
                consumeRequestContent();
            }
        }

        private void consumeRequestContent() {
            if (content.isConnected()) return;
            try {
                ReadableContentChannel requestContent = content.toReadable();
                while (requestContent.read() != null) {
                    // consume all ignored content
                }
            } catch (IllegalStateException ignored) {}
        }
    }
}