aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
blob: 4e10a72c858cb86f21a4c48d1803fad0fd3f2a4c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Sequencing is implemented as a message handler that is configured in a source session in that session's chain of
 * linked message handlers. Each message that carries a sequencing id is queued in an internal list of messages for that
 * id, and messages are only sent when they are at the front of their list. When a reply arrives, the current front of
 * the list is removed and the next message, if any, is sent.
 *
 * @author Simon Thoresen Hult
 */
public class Sequencer implements MessageHandler, ReplyHandler {

    private final AtomicBoolean destroyed = new AtomicBoolean(false);
    private final MessageHandler sender;
    private final Map<Long, Queue<Message>> seqMap = new HashMap<>();
    private final Messenger msn;
    private final static ThreadLocal<Boolean> isSending = ThreadLocal.withInitial(() -> Boolean.FALSE);

    /**
     * Constructs a new sequencer on top of the given async sender.
     *
     * @param sender The underlying sender.
     */
    public Sequencer(MessageHandler sender, Messenger msn) {
        this.sender = sender;
        this.msn = msn;
    }
    public Sequencer(MessageHandler sender) {
        this(sender, null);
    }

    /**
     * Sets the destroyed flag to true. The very first time this method is called, it cleans up all its dependencies.
     * Even if you retain a reference to this object, all of its content is allowed to be garbage collected.
     *
     * @return True if content existed and was destroyed.
     */
    public boolean destroy() {
        if (!destroyed.getAndSet(true)) {
            synchronized (this) {
                for (Queue<Message> queue : seqMap.values()) {
                    if (queue != null) {
                        for (Message msg : queue) {
                            msg.discard();
                        }
                    }
                }
                seqMap.clear();
            }
            return true;
        }
        return false;
    }

    /**
     * Filter a message against the current sequencing state. If this method returns true, the message has been cleared
     * for sending and its sequencing information has been added to the state. If this method returns false, it has been
     * queued for later sending due to sequencing restrictions. This method also sets the sequence id as message
     * context.
     *
     * @param msg the message to filter
     * @return true if the message was consumed
     */
    private boolean filter(Message msg) {
        long seqId = msg.getSequenceId();
        msg.setContext(seqId);
        synchronized (this) {
            if (seqMap.containsKey(seqId)) {
                Queue<Message> queue = seqMap.computeIfAbsent(seqId, k -> new LinkedList<>());
                if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
                    msg.getTrace().trace(TraceLevel.COMPONENT,
                                         "Sequencer queued message with sequence id '" + seqId + "'.");
                }
                queue.add(msg);
                return false;
            }
            seqMap.put(seqId, null);
        }
        return true;
    }

    /**
     * Internal method for forwarding a sequenced message to the underlying sender.
     *
     * @param msg The message to forward.
     */
    private void sequencedSend(Message msg) {
        if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
            msg.getTrace().trace(TraceLevel.COMPONENT,
                                 "Sequencer sending message with sequence id '" + msg.getContext() + "'.");
        }
        msg.pushHandler(this);
        sender.handleMessage(msg);
    }

    /**
     * All messages pass through this handler when being sent by the owning source session. In case the message has no
     * sequencing-id, it is simply passed through to the next handler in the chain. Sequenced messages are sent only if
     * there is no queue for their id, otherwise they are queued.
     *
     * @param msg the message to send.
     */
    @Override
    public void handleMessage(Message msg) {
        if (destroyed.get()) {
            msg.discard();
            return;
        }
        if (msg.hasSequenceId()) {
            if (filter(msg)) {
                sequencedSend(msg);
            }
        } else {
            sender.handleMessage(msg); // unsequenced
        }
    }

    /**
     * Lookup the sequencing id of an incoming reply to pop the front of the corresponding queue, and then send the next
     * message in line, if any.
     *
     * @param reply The reply received.
     */
    @Override
    public void handleReply(Reply reply) {
        if (destroyed.get()) {
            reply.discard();
            return;
        }
        long seqId = (Long)reply.getContext(); // non-sequenced messages do not enter here
        if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
            reply.getTrace().trace(TraceLevel.COMPONENT,
                                   "Sequencer received reply with sequence id '" + seqId + "'.");
        }
        sendNextInSequence(seqId);
        ReplyHandler handler = reply.popHandler();
        handler.handleReply(reply);
    }

    private class SequencedSendTask implements Messenger.Task {
        private Message msg;
        SequencedSendTask(Message msg) { this.msg = msg; }
        @Override public void run() { sequencedSend(msg); msg = null; }
        @Override public void destroy() { if (msg != null) msg.discard(); }
    }

    private void sendNextInSequence(long seqId) {
        Message msg = null;
        synchronized (this) {
            Queue<Message> queue = seqMap.get(seqId);
            if (queue == null || queue.isEmpty()) {
                seqMap.remove(seqId);
            } else {
                msg = queue.remove();
            }
        }
        if (msg != null) {
            Boolean alreadySending = isSending.get();
            if (alreadySending && (msn != null)) {
                // Dispatch in another thread to break possibly very long recursion.
                msn.enqueue(new SequencedSendTask(msg));
            } else {
                isSending.set(Boolean.TRUE);
                sequencedSend(msg);
            }
            isSending.set(Boolean.FALSE);
        }
    }

}