aboutsummaryrefslogtreecommitdiffstats
path: root/jrt/src/com/yahoo/jrt/Supervisor.java
blob: d4168e977434e1cf279ed76b096399de925b5bfe (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;

import java.util.HashMap;
import java.util.concurrent.atomic.AtomicReference;

/**
 * A Supervisor keeps a method repository and handles dispatching of
 * incoming invocation requests. Each end-point of a connection is
 * represented by a {@link Target} object and each {@link Target} is
 * associated with a single Supervisor that handles the invocation
 * requests obtained from that {@link Target}. Note that RPC
 * invocations can be performed both ways across a connection, so even
 * the client side of a connection has RPC server capabilities.
 **/
public class Supervisor {

    private final Transport         transport;
    private SessionHandler          sessionHandler = null;
    private final Object            methodMapLock = new Object();
    private final AtomicReference<HashMap<String, Method>> methodMap = new AtomicReference<>(new HashMap<>());
    private int                     maxInputBufferSize  = 0;
    private int                     maxOutputBufferSize = 0;

    /**
     * Create a new Supervisor based on the given {@link Transport}
     *
     * @param transport object performing low-level operations for
     * this Supervisor
     **/
    public Supervisor(Transport transport) {
        this.transport = transport;
        new MandatoryMethods(this);
    }

    /**
     * Set maximum input buffer size. This value will only affect
     * connections that use a common input buffer when decoding
     * incoming packets. Note that this value is not an absolute
     * max. The buffer will still grow larger than this value if
     * needed to decode big packets. However, when the buffer becomes
     * larger than this value, it will be shrunk back when possible.
     *
     * @param bytes buffer size in bytes. 0 means unlimited.
     **/
    public void setMaxInputBufferSize(int bytes) {
        maxInputBufferSize = bytes;
    }

    /**
     * Set maximum output buffer size. This value will only affect
     * connections that use a common output buffer when encoding
     * outgoing packets. Note that this value is not an absolute
     * max. The buffer will still grow larger than this value if needed
     * to encode big packets. However, when the buffer becomes larger
     * than this value, it will be shrunk back when possible.
     *
     * @param bytes buffer size in bytes. 0 means unlimited.
     **/
    public void setMaxOutputBufferSize(int bytes) {
        maxOutputBufferSize = bytes;
    }

    /**
     * Obtain the method map for this Supervisor
     *
     * @return the method map
     **/
    HashMap<String, Method> methodMap() {
        return methodMap.getAcquire();
    }

    /**
     * Obtain the underlying Transport object.
     *
     * @return underlying Transport object
     **/
    public Transport transport() {
        return transport;
    }

    /**
     * Set the session handler for this Supervisor
     *
     * @param handler the session handler
     **/
    public void setSessionHandler(SessionHandler handler) {
        sessionHandler = handler;
    }

    /**
     * Add a method to the set of methods held by this Supervisor
     *
     * @param method the method to add
     **/
    public void addMethod(Method method) {
        synchronized (methodMapLock) {
            HashMap<String, Method> newMap = new HashMap<>(methodMap());
            newMap.put(method.name(), method);
            methodMap.setRelease(newMap);
        }
    }

    /**
     * Remove a method from the set of methods held by this
     * Supervisor. Use this if you know exactly which method to remove
     * and not only the name.
     *
     * @param method the method to remove
     **/
    public void removeMethod(Method method) {
        synchronized (methodMapLock) {
            HashMap<String, Method> newMap = new HashMap<>(methodMap());
            if (newMap.remove(method.name()) == method) {
                methodMap.setRelease(newMap);
            }
        }
    }

    /**
     * Connect to the given address. The new {@link Target} will be
     * associated with this Supervisor.
     *
     * @return Target representing our end of the connection
     * @param spec where to connect
     * @see #connect(com.yahoo.jrt.Spec, java.lang.Object)
     **/
    public Target connect(Spec spec) {
        return transport.connect(this, spec, null);
    }

    /**
     * Connect to the given address. The new {@link Target} will be
     * associated with this Supervisor and will have 'context' as
     * application context.
     *
     * @return Target representing our end of the connection
     * @param spec where to connect
     * @param context application context for the Target
     * @see Target#getContext
     **/
    public Target connect(Spec spec, Object context) {
        return transport.connect(this, spec, context);
    }

    /**
     * Listen to the given address.
     *
     * @return active object accepting new connections that will be
     * associated with this Supervisor
     * @param spec the address to listen to
     **/
    public Acceptor listen(Spec spec) throws ListenFailedException {
        return transport.listen(this, spec);
    }

    /**
     * Convenience method for connecting to a peer, invoking a method
     * and disconnecting.
     *
     * @param spec the address to connect to
     * @param req the invocation request
     * @param timeout request timeout in seconds
     **/
    public void invokeBatch(Spec spec, Request req, double timeout) {
        Target target = connect(spec);
        try {
            target.invokeSync(req, timeout);
        } finally {
            target.close();
        }
    }

    /**
     * This method is invoked when a new target is created
     *
     * @param target the target
     **/
    void sessionInit(Target target) {
        if (target instanceof Connection) {
            Connection conn = (Connection) target;
            conn.setMaxInputSize(maxInputBufferSize);
            conn.setMaxOutputSize(maxOutputBufferSize);
        }
        SessionHandler handler = sessionHandler;
        if (handler != null) {
            handler.handleSessionInit(target);
        }
    }

    /**
     * This method is invoked when a target establishes a connection
     * with its peer
     *
     * @param target the target
     **/
    void sessionLive(Target target) {
        SessionHandler handler = sessionHandler;
        if (handler != null) {
            handler.handleSessionLive(target);
        }
    }

    /**
     * This method is invoked when a target becomes invalid
     *
     * @param target the target
     **/
    void sessionDown(Target target) {
        SessionHandler handler = sessionHandler;
        if (handler != null) {
            handler.handleSessionDown(target);
        }
    }

    /**
     * This method is invoked when a target is invalid and no more
     * invocations are active
     *
     * @param target the target
     **/
    void sessionFini(Target target) {
        SessionHandler handler = sessionHandler;
        if (handler != null) {
            handler.handleSessionFini(target);
        }
    }

    /**
     * This method is invoked each time we write a packet. This method
     * is empty and only used for testing through sub-classing.
     *
     * @param info information about the written packet
     **/
    void writePacket(PacketInfo info) {}

    /**
     * This method is invoked each time we read a packet. This method
     * is empty and only used for testing through sub-classing.
     *
     * @param info information about the read packet
     **/
    void readPacket(PacketInfo info) {}

    /**
     * Handle a packet received on one of the connections associated
     * with this Supervisor. This method is invoked for all packets
     * not handled by a {@link ReplyHandler}
     *
     * @param conn where the packet came from
     * @param packet the packet
     **/
    void handlePacket(Connection conn, Packet packet) {
        if (packet.packetCode() != Packet.PCODE_REQUEST) {
            return;
        }
        RequestPacket rp = (RequestPacket) packet;
        Request req = new Request(rp.methodName(), rp.parameters());
        Method method = methodMap().get(req.methodName());
        new InvocationServer(conn, req, method,
                             packet.requestId(),
                             packet.noReply()).invoke();
    }
}