aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
blob: 75751b1e2ce9d88ad7b1bbd47a4fa8c38227d8e0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;

import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;

import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * A session supporting sending new messages.
 *
 * @author Simon Thoresen Hult
 */
public final class SourceSession implements ReplyHandler, MessageBus.SendBlockedMessages {

    private final AtomicBoolean destroyed = new AtomicBoolean(false);
    private final CountDownLatch done = new CountDownLatch(1);
    private final AtomicBoolean sendingBlockedToken = new AtomicBoolean(false);
    private final Object lock = new Object();
    private final MessageBus mbus;
    private final Sequencer sequencer;
    private final ReplyHandler replyHandler;
    private final ThrottlePolicy throttlePolicy;
    private volatile double timeout;  // volatile only for tests
    private volatile int pendingCount = 0;
    private volatile boolean closed = false;
    private final Deque<BlockedMessage> blockedQ = new LinkedList<>();
    private final static class Counter {
        private int count = 0;
        void inc() { count ++; }
        void dec() { count --; }
        boolean enough() { return count > 5; }
    }
    private static final ThreadLocal<Counter> sendBlockedRecurseLevel = ThreadLocal.withInitial(Counter::new);

    /**
     * The default constructor requires values for all final member variables
     * of this. It expects all arguments but the {@link SourceSessionParams} to
     * be proper, so no checks are performed. The constructor is declared
     * package private since only {@link MessageBus} is supposed to instantiate
     * it.
     *
     * @param mbus   the message bus that created this instance
     * @param params a parameter object that holds configuration parameters
     */
    SourceSession(MessageBus mbus, SourceSessionParams params) {
        this.mbus = mbus;
        sequencer = new Sequencer(mbus, mbus.messenger());
        if (!params.hasReplyHandler()) {
             throw new NullPointerException("Reply handler is null.");
        }
        replyHandler = params.getReplyHandler();
        throttlePolicy = params.getThrottlePolicy();
        timeout = params.getTimeout();
        mbus.register(this);
    }

    /**
     * Sets the destroyed flag to true. The very first time this method is
     * called, it cleans up all its dependencies.  Even if you retain a
     * reference to this object, all of its content is allowed to be garbage
     * collected.
     *
     * @return true if content existed and was destroyed.
     */
    public boolean destroy() {
        if (destroyed.getAndSet(true)) {
            return false;
        }
        synchronized (lock) {
            closed = true;
        }
        sequencer.destroy();
        mbus.sync();
        return true;
    }

    /**
     * Reject all new messages and wait until no messages are pending. Before
     * returning, this method calls {@link #destroy()}.
     */
    public void close() {
        synchronized (lock) {
            closed = true;
        }
        if (pendingCount == 0) {
            done.countDown();
        }
        try {
            done.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        destroy();
    }

    /**
     * <p>Sends a new message. Calling this immediately causes one of three
     * possible results:</p>
     * <ul><li>A result is returned indicating that the message is accepted. In
     * this case, a reply to the message is guaranteed to be produced on this
     * session within a timeout limit. That reply may indicate either success or
     * failure.</li>
     * <li>A result is returned indicating that the message is not
     * accepted. This is a <i>transient failure</i>, retrying the same operation
     * after some wait period should cause it to be accepted.</li>
     * <li>An exception is thrown, indicating a non-transient error which is not
     * expected to be fixed before some corrective action is taken.</li> </ul>
     *
     * <p>A source client should typically do some equivalent of:</p>
     * <code>
     * do {
     *     Result result = sourceSession.send(message);
     *     if (!result.isAccepted())
     *         // Do something else or wait a while
     * } while (!result.isAccepted());
     * </code>
     *
     * @param message the message to send
     * @return the result of <i>initiating</i> sending of this message
     */
    public Result send(Message message) {
        return sendInternal(updateTiming(message));
    }

    private Message updateTiming(Message msg) {
        msg.setTimeReceivedNow();
        if (msg.getTimeRemaining() <= 0) {
            msg.setTimeRemaining((long)(timeout) * 1000L);
        }
        return msg;
    }

    private Result sendInternal(Message message) {
        synchronized (lock) {
            if (closed) {
                return new Result(ErrorCode.SEND_QUEUE_CLOSED, "Source session is closed.");
            }
            if (throttlePolicy != null) {
                if (! throttlePolicy.canSend(message, pendingCount)) {
                    return new Result(ErrorCode.SEND_QUEUE_FULL,
                            "Too much pending data (" + pendingCount + " messages).");
                }
                message.pushHandler(replyHandler);
                throttlePolicy.processMessage(message);
            } else {
                message.pushHandler(replyHandler);
            }
            ++pendingCount;
        }
        if (message.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
            message.getTrace().trace(TraceLevel.COMPONENT,
                                     "Source session accepted a " + message.getApproxSize() + " byte message. " +
                                     pendingCount + " message(s) now pending.");
        }
        message.pushHandler(this);
        sequencer.handleMessage(message);
        return Result.ACCEPTED;
    }

    @Override
    public boolean trySend() {
        if (destroyed.get()) return false;
        sendBlockedMessages();
        expireStalledBlockedMessages();
        return true;
    }

    private class BlockedMessage {
        private final Message msg;
        private Result result = null;
        BlockedMessage(Message msg) {
            this.msg = msg;
        }

        private void notifyComplete(Result result) {
            synchronized (this) {
                this.result = result;
                notify();
            }
        }

        boolean notifyIfExpired() {
            if (msg.isExpired()) {
                Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
                notifyComplete(new Result(error));
                replyHandler.handleReply(createSendTimedOutReply(msg, error));
                return true;
            }
            return false;
        }

        boolean sendOrExpire() {
            if ( ! notifyIfExpired() ) {
                Result res = sendInternal(msg);
                if ( ! isSendQFull(res) ) {
                    notifyComplete(res);
                } else {
                    return false;
                }
            }
            return true;
        }

        Result waitComplete() throws InterruptedException {
            synchronized (this) {
                while (result == null) {
                    this.wait();
                }
            }
            return result;
        }
    }

    private Reply createSendTimedOutReply(Message msg, Error error) {
        Reply reply = new EmptyReply();
        reply.setMessage(msg);
        reply.addError(error);
        msg.swapState(reply);
        return reply;
    }

    static private boolean isSendQFull(Result res) {
        return !res.isAccepted() && (res.getError().getCode() == ErrorCode.SEND_QUEUE_FULL);
    }

    /**
     * This is a blocking proxy to the {@link #send(Message)} method. This
     * method blocks until the message is accepted by the send queue. Note that
     * the message timeout does not activate by calling this method. This method
     * will also return if this session is closed or the calling thread is
     * interrupted.
     *
     * @param msg the message to send
     * @return the result of initiating send
     * @throws InterruptedException thrown if the calling thread is interrupted
     */
    public Result sendBlocking(Message msg) throws InterruptedException {
        Result res = send(msg);
        if (isSendQFull(res)) {
            BlockedMessage blockedMessage = new BlockedMessage(msg);
            synchronized (blockedQ) {
                blockedQ.add(blockedMessage);
            }
            res = blockedMessage.waitComplete();
        }
        return res;
    }

    private void expireStalledBlockedMessages() {
        synchronized (blockedQ) {
            blockedQ.removeIf(BlockedMessage::notifyIfExpired);
        }
    }

    private BlockedMessage getNextBlockedMessage() {
        synchronized (blockedQ) {
            return blockedQ.poll();
        }
    }

    private void sendBlockedMessages() {
        Counter recurselevel = sendBlockedRecurseLevel.get();
        if (recurselevel.enough()) return;
        boolean someoneElseIsTakingCareOfIt = sendingBlockedToken.getAndSet(true);
        if (someoneElseIsTakingCareOfIt) return;
        try {
            recurselevel.inc();
            BlockedMessage msg = getNextBlockedMessage();
            for (boolean success = true; success && msg != null; ) {
                success = msg.sendOrExpire();
                if (!success) {
                    // Failed sending, put it back at the head of the Q.
                    synchronized (blockedQ) {
                        blockedQ.addFirst(msg);
                    }
                } else {
                    msg = getNextBlockedMessage();
                }
            }
        } finally {
            recurselevel.dec();
            sendingBlockedToken.set(false);
        }
    }

    @Override
    public void handleReply(Reply reply) {
        if (destroyed.get()) {
            reply.discard();
            return;
        }
        boolean done;
        synchronized (lock) {
            --pendingCount;
            if (throttlePolicy != null) {
                throttlePolicy.processReply(reply);
            }
            done = (closed && pendingCount == 0);
        }
        sendBlockedMessages();
        if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
            reply.getTrace().trace(TraceLevel.COMPONENT,
                                   "Source session received reply. " + pendingCount + " message(s) now pending.");
        }
        ReplyHandler handler = reply.popHandler();
        handler.handleReply(reply);
        if (done) {
            this.done.countDown();
        }
    }

    /**
     * This is a convenience function to assign a given route to the given
     * message, and then pass it to the other {@link #send(Message)} method of
     * this session.
     *
     * @param msg   the message to send
     * @param route the route to assign to the message
     * @return the immediate result of the attempt to send this message
     */
    public Result send(Message msg, Route route) {
        return send(msg.setRoute(route));
    }

    /**
     * This is a convenience method to call {@link
     * #send(Message,String,boolean)} with a <code>false</code> value for the
     * 'parseIfNotFound' parameter.
     *
     * @param msg       the message to send
     * @param routeName the route to assign to the message
     * @return the immediate result of the attempt to send this message
     */
    public Result send(Message msg, String routeName) {
        return send(msg, routeName, false);
    }

    /**
     * This is a convenience function to assign a named route to the given
     * message, and then pass it to the other {@link #send(Message)} method of
     * this session. If the route could not be found this methods returns with
     * an appropriate error, unless the 'parseIfNotFound' argument is true. In
     * that case, the route name is passed through to the Route factory method
     * {@link Route#parse}.
     *
     * @param msg             the message to send
     * @param routeName       the route to assign to the message
     * @param parseIfNotFound whether or not to parse routeName as a route if it could not be found
     * @return the immediate result of the attempt to send this message
     */
    public Result send(Message msg, String routeName, boolean parseIfNotFound) {
        boolean found = false;
        RoutingTable table = mbus.getRoutingTable(msg.getProtocol().toString());
        if (table != null) {
            Route route = table.getRoute(routeName);
            if (route != null) {
                msg.setRoute(new Route(route));
                found = true;
            } else if ( ! parseIfNotFound) {
                return new Result(ErrorCode.ILLEGAL_ROUTE,
                                  "Route '" + routeName + "' not found for protocol '" + msg.getProtocol() + "'.");
            }
        } else if ( ! parseIfNotFound) {
            return new Result(ErrorCode.ILLEGAL_ROUTE,
                              "Protocol '" + msg.getProtocol() + "' has no routing table.");
        }
        if ( ! found) {
            msg.setRoute(Route.parse(routeName));
        }
        return send(msg);
    }

    /** Returns the reply handler of this session */
    public ReplyHandler getReplyHandler() {
        return replyHandler;
    }

    /** Returns the number of messages sent that have not been replied to yet */
    public int getPendingCount() {
        return pendingCount;
    }

    /**
     * Sets the number of seconds a message can be attempted sent until it times out.
     *
     * @param timeout the number of seconds allowed.
     * @return this, to allow chaining.
     */
    public SourceSession setTimeout(double timeout) {
        this.timeout = timeout;
        return this;
    }

    public ThrottlePolicy getThrottlePolicy() {
        return throttlePolicy;
    }

}