aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
blob: 4b911d7c38e905156d944c57352dc7b4817b87b4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;

import com.yahoo.concurrent.SystemTimer;

import java.time.Duration;
import java.util.logging.Level;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.network.NetworkOwner;
import com.yahoo.messagebus.routing.Resender;
import com.yahoo.messagebus.routing.RetryPolicy;
import com.yahoo.messagebus.routing.RoutingPolicy;
import com.yahoo.messagebus.routing.RoutingSpec;
import com.yahoo.messagebus.routing.RoutingTable;
import com.yahoo.messagebus.routing.RoutingTableSpec;
import com.yahoo.protect.Process;
import com.yahoo.text.Utf8Array;
import com.yahoo.text.Utf8String;
import com.yahoo.vespa.defaults.Defaults;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

/**
 * <p>A message bus contains the factory for creating sessions to send, receive
 * and forward messages.</p>
 *
 * <p>There are three types of sessions:</p>
 * <ul>
 *     <li>{@link SourceSession Source sessions} sends messages and receives replies</li>
 *     <li>{@link IntermediateSession Intermediate sessions} receives messages on
 *         their way to their final destination, and may decide to forward the messages or reply directly.
 *     <li>{@link DestinationSession Destination sessions} are the final recipient
 *         of messages, and are expected to reply to every one of them, but may not forward messages.
 * </ul>
 *
 * <p>A message bus is configured with a {@link Protocol protocol}. This table
 * enumerates the permissible routes from intermediates to destinations and the
 * messaging semantics of each hop.</p>
 *
 * The responsibilities of a message bus are:
 * <ul>
 *     <li>Assign a route to every send message from its routing table
 *     <li>Deliver every message it <i>accepts</i> to the next hop on its route
 *         <i>or</i> deliver a <i>failure reply</i>.
 *     <li>Deliver replies back to message sources through all the intermediate hops.
 * </ul>
 *
 * A runtime will typically
 * <ul>
 *     <li>Create a message bus implementation and set properties on this implementation once.
 *     <li>Create sessions using that message bus many places.</li>
 * </ul>
 *
 * @author bratseth
 * @author Simon Thoresen Hult
 */
public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, ReplyHandler {

    private final static Logger log = Logger.getLogger(MessageBus.class.getName());
    private final AtomicBoolean destroyed = new AtomicBoolean(false);
    private final ProtocolRepository protocolRepository = new ProtocolRepository();
    private final AtomicReference<Map<String, RoutingTable>> tablesRef = new AtomicReference<>(null);
    private final Map<String, MessageHandler> sessions = new ConcurrentHashMap<>();
    private final NetworkMultiplexer net;
    private final Messenger msn;
    private final Resender resender;
    private int maxPendingCount;
    private int pendingCount = 0;
    private int pendingSize = 0;
    private final Thread careTaker = new Thread(this::sendBlockedMessages);
    private final Map<SendBlockedMessages, Long> blockedSenders = new ConcurrentHashMap<>();

    public interface SendBlockedMessages {
        /**
         * Do what you want, but dont block.
         * You will be called regularly until you signal you are done
         * @return true unless you are done
         */
        boolean trySend();
    }

    public void register(SendBlockedMessages sender) {
        blockedSenders.put(sender, SystemTimer.INSTANCE.milliTime());
    }

    private void sendBlockedMessages() {
        long timeout = SystemTimer.adjustTimeoutByDetectedHz(Duration.ofMillis(10)).toMillis();
        while (! destroyed.get()) {
            for (SendBlockedMessages sender : blockedSenders.keySet()) {
                if (!sender.trySend()) {
                    blockedSenders.remove(sender);
                }
            }
            try {
                Thread.sleep(timeout);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    /**
     * <p>Convenience constructor that proxies {@link #MessageBus(Network,
     * MessageBusParams)} by adding the given protocols to a default {@link
     * MessageBusParams} object.</p>
     *
     * @param net       The network to associate with.
     * @param protocols An array of protocols to register.
     */
    public MessageBus(Network net, List<Protocol> protocols) {
        this(net, new MessageBusParams().addProtocols(protocols));
    }

    /**
     * <p>Constructs an instance of message bus. This requires a network object
     * that it will associate with. This assignment may not change during the lifetime
     * of this message bus, and this bus will be the single owner of this net.</p>
     *
     * @param net    The network to associate with.
     * @param params The parameters that controls this bus.
     */
    public MessageBus(Network net, MessageBusParams params) {
        this(NetworkMultiplexer.dedicated(net), params);
    }

    /**
     * <p>Constructs an instance of message bus. This requires a network multiplexer
     * that it will associate with. This assignment may not change during the
     * lifetime of this message bus.</p>
     *
     * @param net    The network multiplexer to associate with.
     * @param params The parameters that controls this bus.
     */
    public MessageBus(NetworkMultiplexer net, MessageBusParams params) {
        // Add all known protocols to the repository.
        maxPendingCount = params.getMaxPendingCount();
        for (int i = 0, len = params.getNumProtocols(); i < len; ++i) {
            protocolRepository.putProtocol(params.getProtocol(i));
        }

        // Attach and start network.
        this.net = net;
        net.attach(this);
        if ( ! net.net().waitUntilReady(180)) {
            var failure = new IllegalStateException("Network failed to become ready in time.");
            try {
                var tmp = net.net().getMirror();
                var mirror = (com.yahoo.jrt.slobrok.api.Mirror) tmp;
                mirror.dumpState();
                if (mirror.ready()) {
                    log.warning("location broker mirror is ready, but network is not");
                } else if (mirror.getIterations() < 2) {
                    Process.dumpThreads();
                    String fn = "var/crash/java_pid." + ProcessHandle.current().pid() + ".hprof";
                    Process.dumpHeap(Defaults.getDefaults().underVespaHome(fn), true);
                } else {
                    failure = new IllegalStateException("No answer from any service location broker, failing startup");
                }
            } catch (Exception e) {
                // ignore
            }
            throw failure;
        }

        // Start messenger.
        msn = new Messenger();

        RetryPolicy retryPolicy = params.getRetryPolicy();
        if (retryPolicy != null) {
            resender = new Resender(retryPolicy);
            msn.addRecurrentTask(new ResenderTask(resender));
        } else {
            resender = null;
        }
        careTaker.setDaemon(true);
        careTaker.start();

        msn.start();
    }

    /**
     * <p>Sets the destroyed flag to true. The very first time this method is
     * called, it cleans up all its dependencies. Even if you retain a reference
     * to this object, all of its content is allowed to be garbage
     * collected.</p>
     *
     * @return True if content existed and was destroyed.
     */
    public boolean destroy() {
        if (!destroyed.getAndSet(true)) {
            try {
                careTaker.join();
            } catch (InterruptedException e) { }
            protocolRepository.clearPolicyCache();
            net.detach(this);
            msn.destroy();
            if (resender != null) {
                resender.destroy();
            }
            return true;
        }
        return false;
    }

    /**
     * <p>Synchronize with internal threads. This method will handshake with all
     * internal threads. This has the implicit effect of waiting for all active
     * callbacks. Note that this method should never be invoked from a callback
     * since that would make the thread wait for itself... forever. This method
     * is typically used to untangle during session shutdown.</p>
     */
    public void sync() {
        msn.sync();
        net.net().sync();
    }

    /**
     * <p>This is a convenience method to call {@link
     * #createSourceSession(SourceSessionParams)} with default values for the
     * {@link SourceSessionParams} object.</p>
     *
     * @param handler The reply handler to receive the replies for the session.
     * @return The created session.
     */
    public SourceSession createSourceSession(ReplyHandler handler) {
        return createSourceSession(new SourceSessionParams().setReplyHandler(handler));
    }

    /**
     * <p>This is a convenience method to call {@link
     * #createSourceSession(SourceSessionParams)} by first assigning the reply
     * handler to the parameter object.</p>
     *
     * @param handler The reply handler to receive the replies for the session.
     * @param params  The parameters to control the session.
     * @return The created session.
     */
    public SourceSession createSourceSession(ReplyHandler handler, SourceSessionParams params) {
        return createSourceSession(new SourceSessionParams(params).setReplyHandler(handler));
    }

    /**
     * <p>Creates a source session on top of this message bus.</p>
     *
     * @param params The parameters to control the session.
     * @return The created session.
     */
    public SourceSession createSourceSession(SourceSessionParams params) {
        if (destroyed.get()) {
            throw new IllegalStateException("Object is destroyed.");
        }
        return new SourceSession(this, params);
    }

    /**
     * <p>This is a convenience method to call {@link
     * #createIntermediateSession(IntermediateSessionParams)} with default
     * values for the {@link IntermediateSessionParams} object.</p>
     *
     * @param name          The local unique name for the created session.
     * @param broadcastName Whether or not to broadcast this session's name on
     *                      the network.
     * @param msgHandler    The handler to receive the messages for the session.
     * @param replyHandler  The handler to received the replies for the session.
     * @return The created session.
     */
    public IntermediateSession createIntermediateSession(String name,
                                                         boolean broadcastName,
                                                         MessageHandler msgHandler,
                                                         ReplyHandler replyHandler) {
        return createIntermediateSession(
                new IntermediateSessionParams()
                        .setName(name)
                        .setBroadcastName(broadcastName)
                        .setMessageHandler(msgHandler)
                        .setReplyHandler(replyHandler));
    }

    /**
     * <p>Creates an intermediate session on top of this message bus using the
     * given handlers and parameter object.</p>
     *
     * @param params The parameters to control the session.
     * @return The created session.
     */
    public synchronized IntermediateSession createIntermediateSession(IntermediateSessionParams params) {
        IntermediateSession session = createDetachedIntermediateSession(params);
        connect(params.getName(), params.getBroadcastName());
        return session;
    }

    public synchronized IntermediateSession createDetachedIntermediateSession(IntermediateSessionParams params) {
        if (destroyed.get()) {
            throw new IllegalStateException("Object is destroyed.");
        }
        if (sessions.containsKey(params.getName())) {
            throw new IllegalArgumentException("Name '" + params.getName() + "' is not unique.");
        }
        IntermediateSession session = new IntermediateSession(this, params);
        sessions.put(params.getName(), session);
        return session;
    }

    /**
     * <p>This is a convenience method to call {@link
     * #createDestinationSession(DestinationSessionParams)} with default values
     * for the {@link DestinationSessionParams} object.</p>
     *
     * @param name          The local unique name for the created session.
     * @param broadcastName Whether or not to broadcast this session's name on
     *                      the network.
     * @param handler       The handler to receive the messages for the session.
     * @return The created session.
     */
    public DestinationSession createDestinationSession(String name,
                                                       boolean broadcastName,
                                                       MessageHandler handler) {
        return createDestinationSession(
                new DestinationSessionParams()
                        .setName(name)
                        .setBroadcastName(broadcastName)
                        .setMessageHandler(handler));
    }

    /**
     * <p>Creates a destination session on top of this message bus using the
     * given handlers and parameter object.</p>
     *
     * @param params The parameters to control the session.
     * @return The created session.
     */
    public synchronized DestinationSession createDestinationSession(DestinationSessionParams params) {
        DestinationSession session = createDetachedDestinationSession(params);
        connect(params.getName(), params.getBroadcastName());
        return session;
    }

    public synchronized DestinationSession createDetachedDestinationSession(DestinationSessionParams params) {
        if (destroyed.get()) {
            throw new IllegalStateException("Object is destroyed.");
        }
        if (sessions.containsKey(params.getName())) {
            throw new IllegalArgumentException("Name '" + params.getName() + "' is not unique.");
        }
        DestinationSession session = new DestinationSession(this, params);
        sessions.put(params.getName(), session);
        return session;
    }

    /** Connects the given session to the network, so it will receive requests. */
    public void connect(String session, boolean broadcast) {
        net.registerSession(session, this, broadcast);
    }

    /**
     * <p>This method is invoked by the {@link
     * com.yahoo.messagebus.IntermediateSession#destroy()} to unregister
     * sessions from receiving data from message bus.</p>
     *
     * @param name          The name of the session to remove.
     * @param broadcastName Whether or not session name was broadcast.
     */
    public synchronized void unregisterSession(String name, boolean broadcastName) {
        net.unregisterSession(name, this, broadcastName);
        sessions.remove(name);
    }

    private boolean doAccounting() {
        return (maxPendingCount > 0);
    }
    /**
     * <p>This method handles choking input data so that message bus does not
     * blindly accept everything. This prevents an application running
     * out-of-memory in case it fail to choke input data itself. If this method
     * returns false, it means that it should be rejected.</p>
     *
     * @param msg The message to count.
     * @return True if the message was accepted.
     */
    private boolean checkPending(Message msg) {
        boolean busy = false;
        int size = msg.getApproxSize();

        if (doAccounting()) {
            synchronized (this) {
                busy = (maxPendingCount > 0 && pendingCount >= maxPendingCount);
                if (!busy) {
                    pendingCount++;
                    pendingSize += size;
                }
            }
        }
        if (busy) {
            return false;
        }
        msg.setContext(size);
        msg.pushHandler(this);
        return true;
    }

    @Override
    public void handleMessage(Message msg) {
        if (resender != null && msg.hasBucketSequence()) {
            deliverError(msg, ErrorCode.SEQUENCE_ERROR, "Bucket sequences not supported when resender is enabled.");
            return;
        }
        SendProxy proxy = new SendProxy(this, net.net(), resender);
        msn.deliverMessage(msg, proxy);
    }

    @Override
    public void handleReply(Reply reply) {
        if (destroyed.get()) {
            reply.discard();
            return;
        }
        if (doAccounting()) {
            synchronized (this) {
                --pendingCount;
                pendingSize -= (Integer)reply.getContext();
            }
        }
        deliverReply(reply, reply.popHandler());
    }

    @Override
    public void deliverMessage(Message msg, String session) {
        MessageHandler msgHandler = sessions.get(session);
        if (msgHandler == null) {
            deliverError(msg, ErrorCode.UNKNOWN_SESSION, "Session '" + session + "' does not exist.");
        } else if (!checkPending(msg)) {
            deliverError(msg, ErrorCode.SESSION_BUSY, "Session '" + net.net().getConnectionSpec() + "/" + session +
                                                      "' is busy, try again later.");
        } else {
            msn.deliverMessage(msg, msgHandler);
        }
    }

    /**
     * <p>Adds a protocol to the internal repository of protocols, replacing any
     * previous instance of the protocol and clearing the associated routing
     * policy cache.</p>
     *
     * @param protocol The protocol to add.
     */
    public void putProtocol(Protocol protocol) {
        protocolRepository.putProtocol(protocol);
    }

    @Override
    public Protocol getProtocol(Utf8Array name) {
        return protocolRepository.getProtocol(name.toString());
    }

    public void deliverReply(Reply reply, ReplyHandler handler) {
        msn.deliverReply(reply, handler);
    }

    @Override
    public void setupRouting(RoutingSpec spec) {
        Map<String, RoutingTable> tables = new HashMap<>();
        for (int i = 0, len = spec.getNumTables(); i < len; ++i) {
            RoutingTableSpec table = spec.getTable(i);
            String name = table.getProtocol();
            if (!protocolRepository.hasProtocol(name)) {
                log.log(Level.INFO, "Protocol '" + name + "' is not supported, ignoring routing table.");
                continue;
            }
            tables.put(name, new RoutingTable(table));
        }
        tablesRef.set(tables);
        protocolRepository.clearPolicyCache();
    }

    /**
     * <p>Returns the resender that is running within this message bus.</p>
     *
     * @return The resender.
     */
    @Deprecated (forRemoval = true)// Remove on 9
    public Resender getResender() {
        return resender;
    }

    /**
     * <p>Returns the number of messages received that have not been replied to
     * yet.</p>
     *
     * @return The pending count.
     */
    @Deprecated // Package private on 9
    public synchronized int getPendingCount() {
        return pendingCount;
    }

    /**
     * <p>Returns the size of messages received that have not been replied to
     * yet.</p>
     *
     * @return The pending size.
     */
    @Deprecated // Package private on 9
    public synchronized int getPendingSize() {
        return pendingSize;
    }

    /**
     * <p>Sets the maximum number of messages that can be received without being
     * replied to yet.</p>
     *
     * @param maxCount The max count.
     */
    @Deprecated(forRemoval = true) // Remove on 9
    public void setMaxPendingCount(int maxCount) {
        maxPendingCount = maxCount;
    }

    /**
     * Gets maximum number of messages that can be received without being
     * replied to yet.
     */
    @Deprecated (forRemoval = true)// Remove on 9
    public int getMaxPendingCount() {
        return maxPendingCount;
    }

    /**
     * <p>Sets the maximum size of messages that can be received without being
     * replied to yet.</p>
     *
     * @param maxSize The max size.
     */
    @Deprecated (forRemoval = true)// Remove on 9
    public void setMaxPendingSize(int maxSize) {

    }

    /**
     * Gets maximum combined size of messages that can be received without
     * being replied to yet.
     */
    @Deprecated (forRemoval = true)// Remove on 9
    public int getMaxPendingSize() {
        return Integer.MAX_VALUE;
    }

    /**
     * <p>Returns a named routing table, may return null.</p>
     *
     * @param name The name of the routing table to return.
     * @return The routing table object.
     */
    public RoutingTable getRoutingTable(String name) {
        Map<String, RoutingTable> tables = tablesRef.get();
        if (tables == null) {
            return null;
        }
        return tables.get(name);
    }
    /**
     * <p>Returns a named routing table, may return null.</p>
     *
     * @param name The name of the routing table to return.
     * @return The routing table object.
     */
    public RoutingTable getRoutingTable(Utf8String name) {

        return getRoutingTable(name.toString());
    }

    /**
     * <p>Returns a routing policy that corresponds to the argument protocol
     * name, policy name and policy parameter. This will cache reuse all
     * policies as soon as they are first requested.</p>
     *
     * @param protocolName The name of the protocol to invoke {@link Protocol#createPolicy(String,String)} on.
     * @param policyName   The name of the routing policy to retrieve.
     * @param policyParam  The parameter for the routing policy to retrieve.
     * @return A corresponding routing policy, or null.
     */
    public RoutingPolicy getRoutingPolicy(String protocolName, String policyName, String policyParam) {
        return protocolRepository.getRoutingPolicy(protocolName, policyName, policyParam);
    }

    /**
     * <p>Returns a routing policy that corresponds to the argument protocol
     * name, policy name and policy parameter. This will cache reuse all
     * policies as soon as they are first requested.</p>
     *
     * @param protocolName The name of the protocol to invoke {@link Protocol#createPolicy(String,String)} on.
     * @param policyName   The name of the routing policy to retrieve.
     * @param policyParam  The parameter for the routing policy to retrieve.
     * @return A corresponding routing policy, or null.
     */
    public RoutingPolicy getRoutingPolicy(Utf8String protocolName, String policyName, String policyParam) {
        return protocolRepository.getRoutingPolicy(protocolName.toString(), policyName, policyParam);
    }

    /**
     * <p>Returns the connection spec string for the network layer of this
     * message bus. This is merely a proxy of the same function in the network
     * layer.</p>
     *
     * @return The connection string.
     */
    public String getConnectionSpec() {
        return net.net().getConnectionSpec();
    }

    /**
     * <p>Constructs and schedules a Reply containing an error to the handler of the given Message.</p>
     *
     * @param msg     The message to reply to.
     * @param errCode The code of the error to set.
     * @param errMsg  The message of the error to set.
     */
    private void deliverError(Message msg, int errCode, String errMsg) {
        Reply reply = new EmptyReply();
        reply.swapState(msg);
        reply.addError(new Error(errCode, errMsg));
        deliverReply(reply, reply.popHandler());
    }

    /**
     * <p>Implements a task for running the resender in the messenger
     * thread. This task acts as a proxy for the resender, allowing the task to
     * be deleted without affecting the resender itself.</p>
     */
    private static class ResenderTask implements Messenger.Task {

        final Resender resender;

        ResenderTask(Resender resender) {
            this.resender = resender;
        }

        public void destroy() {
            // empty
        }

        public void run() {
            resender.resendScheduled();
        }

    }

}