aboutsummaryrefslogtreecommitdiffstats
path: root/jdisc_core/src/main/java/com/yahoo/jdisc/handler/ReadableContentChannel.java
blob: ab51e91191092d1c6c9e15123e1fc30929334bab (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jdisc.handler;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;

/**
 * <p>This class implements a {@link ContentChannel} that has a blocking <em>read</em> interface. Use this class if you
 * intend to consume the content of the ContentChannel yourself. If you intend to forward the content to another
 * ContentChannel, use {@link BufferedContentChannel} instead. If you <em>might</em> want to consume the content, return
 * a {@link BufferedContentChannel} up front, and {@link BufferedContentChannel#connectTo(ContentChannel) connect} that
 * to a ReadableContentChannel at the point where you decide to consume the data.</p>
 *
 * @author Simon Thoresen Hult
 */
public final class ReadableContentChannel implements ContentChannel, Iterable<ByteBuffer> {

    private final Object lock = new Object();
    private Queue<Entry> queue = new LinkedList<>();
    private boolean closed = false;

    @Override
    public void write(ByteBuffer buf, CompletionHandler handler) {
        Objects.requireNonNull(buf, "buf");
        synchronized (lock) {
            if (closed || queue == null) {
                throw new IllegalStateException(this + " is closed");
            }
            queue.add(new Entry(buf, handler));
            lock.notifyAll();
        }
    }

    @Override
    public void close(CompletionHandler handler) {
        synchronized (lock) {
            if (closed || queue == null) {
                throw new IllegalStateException(this + " is already closed");
            }
            closed = true;
            queue.add(new Entry(null, handler));
            lock.notifyAll();
        }
    }

    @Override
    public Iterator<ByteBuffer> iterator() {
        return new MyIterator();
    }

    /**
     * <p>Returns a lower-bound estimate on the number of bytes available to be {@link #read()} without blocking. If
     * the returned number is larger than zero, the next call to {@link #read()} is guaranteed to not block.</p>
     *
     * @return The number of bytes available to be read without blocking.
     */
    public int available() {
        Entry entry;
        synchronized (lock) {
            if (queue == null) {
                return 0;
            }
            entry = queue.peek();
        }
        if (entry == null || entry.buf == null) {
            return 0;
        }
        return entry.buf.remaining();
    }

    /**
     * <p>Returns the next ByteBuffer in the internal queue. Before returning, this method calls {@link
     * CompletionHandler#completed()} on the {@link CompletionHandler} that was submitted along with the ByteBuffer. If
     * there are no ByteBuffers in the queue, this method waits indefinitely for either {@link
     * #write(ByteBuffer, CompletionHandler)} or {@link #close(CompletionHandler)} to be called. Once closed and the
     * internal queue drained, this method returns null.</p>
     *
     * @return The next ByteBuffer in queue, or null if this ReadableContentChannel is closed.
     * @throws IllegalStateException If the current thread is interrupted while waiting.
     */
    public ByteBuffer read() {
        Entry entry;
        synchronized (lock) {
            try {
                while (queue != null && queue.isEmpty()) {
                    lock.wait();
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            if (queue == null) {
                return null;
            }
            entry = queue.poll();
            if (entry.buf == null) {
                queue = null;
            }
        }
        if (entry.handler != null) {
            entry.handler.completed();
        }
        return entry.buf;
    }

    /**
     * <p>This method calls {@link CompletionHandler#failed(Throwable)} on all pending {@link CompletionHandler}s, and
     * blocks all future operations to this ContentChannel (i.e. calls to {@link #write(ByteBuffer, CompletionHandler)}
     * and {@link #close(CompletionHandler)} throw IllegalStateExceptions).</p>
     *
     * <p>This method will also notify any thread waiting in {@link #read()}.</p>
     *
     * @param t The Throwable to pass to all pending CompletionHandlers.
     * @throws IllegalStateException If this method is called more than once.
     */
    public void failed(Throwable t) {
        Queue<Entry> queue;
        synchronized (lock) {
            if ((queue = this.queue) == null) {
                throw new IllegalStateException();
            }
            this.queue = null;
            lock.notifyAll();
        }
        for (Entry entry : queue) {
            entry.handler.failed(t);
        }
    }

    /**
     * <p>Creates a {@link ContentInputStream} that wraps this ReadableContentChannel.</p>
     *
     * @return The new ContentInputStream that wraps this.
     */
    public ContentInputStream toStream() {
        return new ContentInputStream(this);
    }

    private class MyIterator implements Iterator<ByteBuffer> {

        ByteBuffer next;

        @Override
        public boolean hasNext() {
            if (next != null) {
                return true;
            }
            next = read();
            return next != null;
        }

        @Override
        public ByteBuffer next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            ByteBuffer ret = next;
            next = null;
            return ret;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    private static class Entry {

        final ByteBuffer buf;
        final CompletionHandler handler;

        Entry(ByteBuffer buf, CompletionHandler handler) {
            this.handler = handler;
            this.buf = buf;
        }
    }
}