aboutsummaryrefslogtreecommitdiffstats
path: root/jdisc_core/src/main/java/com/yahoo/jdisc/handler/BufferedContentChannel.java
blob: 76ebc230f8d745409386e021986194155958d9f4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jdisc.handler;

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;

/**
 * This class implements an unlimited, non-blocking content queue. All {@link ContentChannel} methods are implemented
 * by pushing to a thread-safe internal queue. All of the queued calls are forwarded to another ContentChannel when
 * {@link #connectTo(ContentChannel)} is called. Once connected, this class becomes a non-buffering proxy for the
 * connected ContentChannel.
 *
 * @author Simon Thoresen Hult
 */
public final class BufferedContentChannel implements ContentChannel {

    private final Object lock = new Object();
    private List<Entry> queue = new LinkedList<>();
    private ContentChannel content = null;
    private boolean closed = false;
    private CompletionHandler closeCompletion = null;

    /**
     * <p>Connects this BufferedContentChannel to a ContentChannel. First, this method forwards all queued calls to the
     * connected ContentChannel. Once this method has been called, all future calls to {@link #write(ByteBuffer,
     * CompletionHandler)} and {@link #close(CompletionHandler)} are synchronously forwarded to the connected
     * ContentChannel.</p>
     *
     * @param content The ContentChannel to connect to.
     * @throws NullPointerException  If the <em>content</em> argument is null.
     * @throws IllegalStateException If another ContentChannel has already been connected.
     */
    public void connectTo(ContentChannel content) {
        Objects.requireNonNull(content, "content");
        boolean closed;
        List<Entry> queue;
        synchronized (lock) {
            if (this.content != null || this.queue == null) {
                throw new IllegalStateException();
            }
            closed = this.closed;
            queue = this.queue;
            this.queue = null;
        }
        for (Entry entry : queue) {
            content.write(entry.buf, entry.handler);
        }
        if (closed) {
            content.close(closeCompletion);
        }
        synchronized (lock) {
            this.content = content;
            lock.notifyAll();
        }
    }

    /**
     * <p>Returns whether or not {@link #connectTo(ContentChannel)} has been called. Even if this method returns false,
     * calling {@link #connectTo(ContentChannel)} might still throw an IllegalStateException if there is a race.</p>
     *
     * @return True if {@link #connectTo(ContentChannel)} has been called.
     */
    public boolean isConnected() {
        synchronized (lock) {
            return content != null;
        }
    }

    /**
     * <p>Creates a {@link ReadableContentChannel} and {@link #connectTo(ContentChannel) connects} to it. </p>
     *
     * @return The new ReadableContentChannel that this connected to.
     */
    public ReadableContentChannel toReadable() {
        ReadableContentChannel ret = new ReadableContentChannel();
        connectTo(ret);
        return ret;
    }

    /**
     * <p>Creates a {@link ContentInputStream} and {@link #connectTo(ContentChannel) connects} to its internal
     * ContentChannel.</p>
     *
     * @return The new ContentInputStream that this connected to.
     */
    public ContentInputStream toStream() {
        return toReadable().toStream();
    }

    @Override
    public void write(ByteBuffer buf, CompletionHandler handler) {
        ContentChannel content;
        synchronized (lock) {
            if (closed) {
                throw new IllegalStateException();
            }
            if (queue != null) {
                queue.add(new Entry(buf, handler));
                return;
            }
            try {
                while (this.content == null) {
                    lock.wait(); // waiting for connectTo()
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            if (closed) {
                throw new IllegalStateException();
            }
            content = this.content;
        }
        content.write(buf, handler);
    }

    @Override
    public void close(CompletionHandler handler) {
        ContentChannel content;
        synchronized (lock) {
            if (closed) {
                throw new IllegalStateException();
            }
            if (queue != null) {
                closed = true;
                closeCompletion = handler;
                return;
            }
            try {
                while (this.content == null) {
                    lock.wait(); // waiting for connectTo()
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            if (closed) {
                throw new IllegalStateException();
            }
            closed = true;
            content = this.content;
        }
        content.close(handler);
    }

    private static class Entry {

        final ByteBuffer buf;
        final CompletionHandler handler;

        Entry(ByteBuffer buf, CompletionHandler handler) {
            this.handler = handler;
            this.buf = buf;
        }
    }

}