aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/Message.java
blob: 7fd7018070b9d4375128b0339377e462c4e7fbd6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;

import com.yahoo.concurrent.SystemTimer;
import com.yahoo.messagebus.routing.Route;

/**
 * A message is a child of Routable, it is not a reply, and it has a sequencing identifier. Furthermore, a message
 * contains a retry counter that holds what retry the message is currently on. See the method comment {@link #getRetry}
 * for more information.
 *
 * @author Simon Thoresen Hult
 */
public abstract class Message extends Routable {

    private Route route = null;
    private long timeReceived = 0;
    private long timeRemaining = 0;
    private boolean retryEnabled = true;
    private int retry = 0;

    @Override
    public void swapState(Routable rhs) {
        super.swapState(rhs);
        if (rhs instanceof Message) {
            Message msg = (Message)rhs;

            Route route = this.route;
            this.route = msg.route;
            msg.route = route;

            boolean retryEnabled = this.retryEnabled;
            this.retryEnabled = msg.retryEnabled;
            msg.retryEnabled = retryEnabled;

            int retry = this.retry;
            this.retry = msg.retry;
            msg.retry = retry;

            long timeReceived = this.timeReceived;
            this.timeReceived = msg.timeReceived;
            msg.timeReceived = timeReceived;

            long timeRemaining = this.timeRemaining;
            this.timeRemaining = msg.timeRemaining;
            msg.timeRemaining = timeRemaining;
        }
    }

    /** Returns the route of this routable */
    public Route getRoute() { return route; }

    /** Sets a new route for this routable */
    public Message setRoute(Route route) {
        this.route = new Route(route);
        return this;
    }

    /**
     * <p>Returns the timestamp for when this message was last seen by message bus. If you are using this to determine
     * message expiration, you should use {@link #isExpired()} instead.</p>
     *
     * @return The timestamp this was last seen.
     */
    public long getTimeReceived() {
        return timeReceived;
    }

    /**
     * <p>Sets the timestamp for when this message was last seen by message bus to the given time in milliseconds since
     * epoch. Please see comment on {@link #isExpired()} for more information on how to determine whether or not a
     * message has expired. You should never need to call this method yourself, as it is touched automatically whenever
     * message bus encounters a new message.</p>
     *
     * @param timeReceived The time received in milliseconds.
     * @return This, to allow chaining.
     */
    public Message setTimeReceived(long timeReceived) {
        this.timeReceived = timeReceived;
        return this;
    }

    /**
     * <p>This is a convenience method to call {@link #setTimeReceived(long)} passing the current time as argument.</p>
     *
     * @return This, to allow chaining.
     */
    public Message setTimeReceivedNow() {
        return setTimeReceived(SystemTimer.INSTANCE.milliTime());
    }

    /**
     * <p>Returns the number of milliseconds that remain before this message times out. This value is only updated by
     * the network layer, and is therefore not current. If you are trying to determine message expiration, use {@link
     * #isExpired()} instead.</p>
     *
     * @return The remaining time in milliseconds.
     */
    public long getTimeRemaining() {
        return timeRemaining;
    }

    /**
     * <p>Sets the number of milliseconds that remain before this message times out. Please see comment on {@link
     * #isExpired()} for more information on how to determine whether or not a message has expired.</p>
     *
     * @param timeRemaining The number of milliseconds until expiration.
     * @return This, to allow chaining.
     */
    public Message setTimeRemaining(long timeRemaining) {
        this.timeRemaining = timeRemaining;
        return this;
    }

    /**
     * <p>Returns the number of milliseconds that remain right now before this message times out. This is a function of
     * {@link #getTimeReceived()}, {@link #getTimeRemaining()} and current time. Whenever a message is transmitted by
     * message bus, a new remaining time is calculated and serialized as <code>timeRemaining = timeRemaining -
     * (currentTime - timeReceived)</code>. This means that we are doing an over-estimate of remaining time, as we are
     * only factoring in the time used by the application above message bus.</p>
     *
     * @return The remaining time in milliseconds.
     */
    public long getTimeRemainingNow() {
        return timeRemaining - (SystemTimer.INSTANCE.milliTime() - timeReceived);
    }

    /**
     * <p>Returns whether or not this message has expired.</p>
     *
     * @return True if {@link #getTimeRemainingNow()} is less than or equal to zero.
     */
    public boolean isExpired() {
        return getTimeRemainingNow() <= 0;
    }

    /**
     * <p>Returns whether or not this message contains a sequence identifier that should be respected, i.e. whether or
     * not this message requires sequencing.</p>
     *
     * @return True to enable sequencing.
     * @see #getSequenceId()
     */
    public boolean hasSequenceId() {
        return false;
    }

    /**
     * Returns the identifier used to order messages. Any two messages that have the same sequence id are ensured to
     * arrive at the recipient in the order they were sent by the client. This value is only respected if the {@link
     * #hasSequenceId()} method returns true.
     */
    public long getSequenceId() {
        return 0;
    }

    /**
     * <p>Returns whether or not this message contains a sequence bucket that should be respected, i.e. whether or not
     * this message requires bucket-level sequencing.</p>
     *
     * @return True to enable bucket sequencing.
     * @see #getBucketSequence()
     */
    public boolean hasBucketSequence() {
        return false;
    }

    /**
     * <p>Returns the identifier used to order message buckets. Any two messages that have the same bucket sequence are
     * ensured to arrive at the NEXT peer in the order they were sent by THIS peer. This value is only respected if the
     * {@link #hasBucketSequence()} method returns true.</p>
     *
     * @return The bucket sequence.
     */
    public long getBucketSequence() {
        return 0;
    }

    /**
     * <p>Obtain the approximate size of this message object in bytes. This enables messagebus to track the size of the
     * send queue in both memory usage and item count. This method returns 1 by default, and must be overridden to
     * enable message size tracking.</p>
     *
     * @return 1
     */
    public int getApproxSize() {
        return 1;
    }

    /**
     * <p>Sets whether or not this message can be resent.</p>
     *
     * @param enabled Resendable flag.
     */
    public void setRetryEnabled(boolean enabled) {
        retryEnabled = enabled;
    }

    /**
     * <p>Returns whether or not this message can be resent.</p>
     *
     * @return True if this can be resent.
     */
    public boolean getRetryEnabled() {
        return retryEnabled;
    }

    /**
     * <p>Returns the number of times the sending of this message has been retried. This is available for inspection so
     * that clients may implement logic to control resending.</p>
     *
     * @return The retry count.
     * @see Reply#setRetryDelay This method can be used to request resending that differs from the default.
     */
    public int getRetry() {
        return retry;
    }

    /**
     * <p>Sets the number of times the sending of this message has been retried. This method only makes sense to modify
     * BEFORE sending it, since its value is not serialized back into any reply that it may create.</p>
     *
     * @param retry The retry count.
     * @return This, to allow chaining.
     */
    public Message setRetry(int retry) {
        this.retry = retry;
        return this;
    }
}