aboutsummaryrefslogtreecommitdiffstats
path: root/jrt/src/com/yahoo/jrt/Supervisor.java
blob: 65deea0dc619b70acfebb03c62e41f898c95f702 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;

import java.util.HashMap;
import java.util.concurrent.atomic.AtomicReference;

/**
 * A Supervisor keeps a method repository and handles dispatching of
 * incoming invocation requests. Each end-point of a connection is
 * represented by a {@link Target} object and each {@link Target} is
 * associated with a single Supervisor that handles the invocation
 * requests obtained from that {@link Target}. Note that RPC
 * invocations can be performed both ways across a connection, so even
 * the client side of a connection has RPC server capabilities.
 */
public class Supervisor {

    private final Transport         transport;
    private final Object            methodMapLock = new Object();
    private final AtomicReference<HashMap<String, Method>> methodMap = new AtomicReference<>(new HashMap<>());
    private int                     maxInputBufferSize  = 64*1024;
    private int                     maxOutputBufferSize = 64*1024;
    private boolean                 dropEmptyBuffers = false;

    /**
     * Creates a new Supervisor based on the given {@link Transport}
     *
     * @param transport object performing low-level operations for this Supervisor
     */
    public Supervisor(Transport transport) {
        this.transport = transport;
        new MandatoryMethods(this);
    }

    /**
     * Drops empty buffers. This will reduce memory footprint for idle
     * connections at the cost of extra allocations when buffer space
     * is needed again.
     *
     * @param value true means drop empty buffers
     */
    public Supervisor setDropEmptyBuffers(boolean value) {
        dropEmptyBuffers = value;
        return this;
    }
    boolean getDropEmptyBuffers() { return dropEmptyBuffers; }

    /**
     * Sets maximum input buffer size. This value will only affect
     * connections that use a common input buffer when decoding
     * incoming packets. Note that this value is not an absolute
     * max. The buffer will still grow larger than this value if
     * needed to decode big packets. However, when the buffer becomes
     * larger than this value, it will be shrunk back when possible.
     *
     * @param bytes buffer size in bytes. 0 means unlimited.
     */
    public void setMaxInputBufferSize(int bytes) {
        maxInputBufferSize = bytes;
    }
    int getMaxInputBufferSize() { return maxInputBufferSize; }

    /**
     * Sets maximum output buffer size. This value will only affect
     * connections that use a common output buffer when encoding
     * outgoing packets. Note that this value is not an absolute
     * max. The buffer will still grow larger than this value if needed
     * to encode big packets. However, when the buffer becomes larger
     * than this value, it will be shrunk back when possible.
     *
     * @param bytes buffer size in bytes. 0 means unlimited.
     */
    public void setMaxOutputBufferSize(int bytes) {
        maxOutputBufferSize = bytes;
    }
    int getMaxOutputBufferSize() { return maxOutputBufferSize; }

    /**
     * Obtains the method map for this Supervisor
     *
     * @return the method map
     */
    HashMap<String, Method> methodMap() {
        return methodMap.getAcquire();
    }

    /**
     * Obtains the underlying Transport object.
     *
     * @return underlying Transport object
     */
    public Transport transport() {
        return transport;
    }

    /**
     * Adds a method to the set of methods held by this Supervisor
     *
     * @param method the method to add
     */
    public void addMethod(Method method) {
        synchronized (methodMapLock) {
            HashMap<String, Method> newMap = new HashMap<>(methodMap());
            newMap.put(method.name(), method);
            methodMap.setRelease(newMap);
        }
    }

    /**
     * Removes a method from the set of methods held by this
     * Supervisor. Use this if you know exactly which method to remove
     * and not only the name.
     *
     * @param method the method to remove
     */
    public void removeMethod(Method method) {
        synchronized (methodMapLock) {
            HashMap<String, Method> newMap = new HashMap<>(methodMap());
            if (newMap.remove(method.name()) == method) {
                methodMap.setRelease(newMap);
            }
        }
    }

    /**
     * Connects to the given address. The new {@link Target} will be
     * associated with this Supervisor.
     *
     * @return Target representing our end of the connection
     * @param spec where to connect
     * @see #connect(com.yahoo.jrt.Spec, java.lang.Object)
     */
    public Target connect(Spec spec) {
        return transport.connect(this, spec, null);
    }

    /**
     * Connects to the given address. The new {@link Target} will be
     * associated with this Supervisor and will have 'context' as
     * application context.
     *
     * @return Target representing our end of the connection
     * @param spec where to connect
     * @param context application context for the Target
     * @see Target#getContext
     */
    public Target connect(Spec spec, Object context) {
        return transport.connect(this, spec, context);
    }

    /**
     * Listens to the given address.
     *
     * @return active object accepting new connections that will be
     * associated with this Supervisor
     * @param spec the address to listen to
     */
    public Acceptor listen(Spec spec) throws ListenFailedException {
        return transport.listen(this, spec);
    }

    /**
     * This method is invoked each time we write a packet. This method
     * is empty and only used for testing through sub-classing.
     *
     * @param info information about the written packet
     */
    void writePacket(PacketInfo info) {}

    /**
     * This method is invoked each time we read a packet. This method
     * is empty and only used for testing through sub-classing.
     *
     * @param info information about the read packet
     */
    void readPacket(PacketInfo info) {}

    /**
     * Handles a packet received on one of the connections associated
     * with this Supervisor. This method is invoked for all packets
     * not handled by a {@link ReplyHandler}
     *
     * @param conn where the packet came from
     * @param packet the packet
     */
    void handlePacket(Connection conn, Packet packet) {
        if (packet.packetCode() != Packet.PCODE_REQUEST) {
            return;
        }
        RequestPacket rp = (RequestPacket) packet;
        Request req = new Request(rp.methodName(), rp.parameters());
        Method method = methodMap().get(req.methodName());
        new InvocationServer(conn, req, method,
                             packet.requestId(),
                             packet.noReply()).invoke();
    }

}