aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
blob: 2464a272ed11d0f22429ff10e28ca1b59bb43dd0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;

import com.yahoo.component.AbstractComponent;
import com.yahoo.component.annotation.Inject;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.References;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.SharedResource;
import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
import com.yahoo.yolean.concurrent.Memoized;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Class to encapsulate access to slobrok sessions.
 *
 * @author Steinar Knutsen
 * @author Einar Rosenvinge
 */
// TODO jonmv: Remove this: only used with more than one entry by FeedHandlerV3, where only timeout varies.
// rant: This whole construct is because DI at one point didn't exist, so getting hold of a shared resource
//       or session was hard(?), and one resorted to routing through the Container, using URIs, to the correct
//       MbusClient, with or without throttling. This introduced the problem of ownership during shutdown,
//       which was solved with manual reference counting. This is all much better solved with DI, which (now)
//       owns everything, and does component shutdown in reverse construction order, which is always right.
//       So for the sake of everyone's mental health, this should all just be removed now! I suspect this is
//       even the case for Request; we can track in handlers, and warn when requests have been misplaced.
public final class SessionCache extends AbstractComponent {

    private static final Logger log = Logger.getLogger(SessionCache.class.getName());

    private final Memoized<SharedMessageBus, RuntimeException> messageBus;

    private final Object intermediateLock = new Object();
    private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
    private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();

    private final Object sourceLock = new Object();
    private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
    private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();

    @Inject
    public SessionCache(NetworkMultiplexerProvider nets, ContainerMbusConfig containerMbusConfig,
                        DocumentTypeManager documentTypeManager,
                        MessagebusConfig messagebusConfig) {
        this(nets::net, containerMbusConfig, documentTypeManager, messagebusConfig);

    }

    public SessionCache(Supplier<NetworkMultiplexer> net, ContainerMbusConfig containerMbusConfig,
                        DocumentTypeManager documentTypeManager,
                        MessagebusConfig messagebusConfig) {
        this(net,
             containerMbusConfig,
             messagebusConfig,
             new DocumentProtocol(documentTypeManager));
    }

    public SessionCache(Supplier<NetworkMultiplexer> net, ContainerMbusConfig containerMbusConfig,
                        MessagebusConfig messagebusConfig, Protocol protocol) {
        this.messageBus = new Memoized<>(() -> createSharedMessageBus(net.get(), containerMbusConfig, messagebusConfig, protocol),
                                         SharedMessageBus::release);
    }

    @Override
    public void deconstruct() {
        messageBus.close();
    }

    // Lazily create shared message bus.
    private SharedMessageBus bus() {
        return messageBus.get();
    }

    private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net,
                                                           ContainerMbusConfig mbusConfig,
                                                           MessagebusConfig messagebusConfig,
                                                           Protocol protocol) {
        MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);

        mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount());

        MessageBus bus = new MessageBus(net, mbusParams);
        new ConfigAgent(messagebusConfig, bus); // Configure the wrapped MessageBus with a routing table.
        return new SharedMessageBus(bus);
    }

    ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
        return intermediatesCreator.retain(intermediateLock, intermediates, p);
    }

    public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
        return sourcesCreator.retain(sourceLock, sources, p);
    }

    private abstract static class SessionCreator<PARAMS, KEY, SESSION extends SharedResource> {

        abstract SESSION create(PARAMS p);

        abstract KEY buildKey(PARAMS p);

        abstract void logReuse(SESSION session);

        ReferencedResource<SESSION> retain(Object lock, Map<KEY, SESSION> registry, PARAMS p) {
            SESSION session;
            ResourceReference sessionReference;
            KEY key = buildKey(p);
            // this lock is held for a horribly long time, but I see no way of
            // making it slimmer
            synchronized (lock) {
                session = registry.get(key);
                if (session == null) {
                    session = createAndStore(registry, p, key);
                    sessionReference = References.fromResource(session);
                } else {
                    try {
                        sessionReference = session.refer(this);
                        logReuse(session);
                    } catch (final IllegalStateException e) {
                        session = createAndStore(registry, p, key);
                        sessionReference = References.fromResource(session);
                    }
                }
            }
            return new ReferencedResource<>(session, sessionReference);
        }

        SESSION createAndStore(Map<KEY, SESSION> registry, PARAMS p, KEY key) {
            SESSION session = create(p);
            registry.put(key, session);
            return session;
        }

    }

    private class SourceSessionCreator
            extends SessionCreator<SourceSessionParams, SourceSessionKey, SharedSourceSession> {

        @Override
        SharedSourceSession create(SourceSessionParams p) {
            log.log(Level.FINE, "Creating new source session.");
            return bus().newSourceSession(p);
        }

        @Override
        SourceSessionKey buildKey(SourceSessionParams p) {
            return new SourceSessionKey(p);
        }

        @Override
        void logReuse(final SharedSourceSession session) {
            log.log(Level.FINE, "Reusing source session.");
        }
    }

    private class IntermediateSessionCreator
            extends SessionCreator<IntermediateSessionParams, String, SharedIntermediateSession> {

        @Override
        SharedIntermediateSession create(IntermediateSessionParams p) {
            log.log(Level.FINE, "Creating new intermediate session " + p.getName() + "");
            return bus().newIntermediateSession(p);
        }

        @Override
        String buildKey(IntermediateSessionParams p) {
            return p.getName();
        }

        @Override
        void logReuse(SharedIntermediateSession session) {
            log.log(Level.FINE, "Reusing intermediate session " + session.name() + "");
        }
    }

    static class ThrottlePolicySignature {

        @Override
        public int hashCode() {
            return getClass().hashCode();
        }

    }

    static class StaticThrottlePolicySignature extends ThrottlePolicySignature {

        private final int maxPendingCount;
        private final long maxPendingSize;

        StaticThrottlePolicySignature(final StaticThrottlePolicy policy) {
            maxPendingCount = policy.getMaxPendingCount();
            maxPendingSize = policy.getMaxPendingSize();
        }

        @Override
        public int hashCode() {
            int prime = 31;
            int result = super.hashCode();
            result = prime * result + maxPendingCount;
            result = prime * result
                    + (int) (maxPendingSize ^ (maxPendingSize >>> 32));
            return result;
        }

        @Override
        public boolean equals(final Object obj) {
            if (this == obj) {
                return true;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            final StaticThrottlePolicySignature other = (StaticThrottlePolicySignature) obj;
            if (maxPendingCount != other.maxPendingCount) {
                return false;
            }
            if (maxPendingSize != other.maxPendingSize) {
                return false;
            }
            return true;
        }

    }

    static class DynamicThrottlePolicySignature extends ThrottlePolicySignature {

        private final int maxPending;
        private final double maxWindowSize;
        private final double minWindowSize;
        private final double windowSizeBackoff;
        private final double windowSizeIncrement;

        DynamicThrottlePolicySignature(final DynamicThrottlePolicy policy) {
            maxPending = policy.getMaxPendingCount();
            maxWindowSize = policy.getMaxWindowSize();
            minWindowSize = policy.getMinWindowSize();
            windowSizeBackoff = policy.getWindowSizeBackOff();
            windowSizeIncrement = policy.getWindowSizeIncrement();
        }

        @Override
        public int hashCode() {
            int prime = 31;
            int result = super.hashCode();
            result = prime * result + maxPending;
            long temp;
            temp = Double.doubleToLongBits(maxWindowSize);
            result = prime * result + (int) (temp ^ (temp >>> 32));
            temp = Double.doubleToLongBits(minWindowSize);
            result = prime * result + (int) (temp ^ (temp >>> 32));
            temp = Double.doubleToLongBits(windowSizeBackoff);
            result = prime * result + (int) (temp ^ (temp >>> 32));
            temp = Double.doubleToLongBits(windowSizeIncrement);
            result = prime * result + (int) (temp ^ (temp >>> 32));
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            DynamicThrottlePolicySignature other = (DynamicThrottlePolicySignature) obj;
            if (maxPending != other.maxPending) {
                return false;
            }
            if (Double.doubleToLongBits(maxWindowSize) != Double.doubleToLongBits(other.maxWindowSize)) {
                return false;
            }
            if (Double.doubleToLongBits(minWindowSize) != Double
                    .doubleToLongBits(other.minWindowSize)) {
                return false;
            }
            if (Double.doubleToLongBits(windowSizeBackoff) != Double.doubleToLongBits(other.windowSizeBackoff)) {
                return false;
            }
            if (Double.doubleToLongBits(windowSizeIncrement) != Double.doubleToLongBits(other.windowSizeIncrement)) {
                return false;
            }
            return true;
        }

    }

    static class UnknownThrottlePolicySignature extends ThrottlePolicySignature {

        private final ThrottlePolicy policy;

        UnknownThrottlePolicySignature(final ThrottlePolicy policy) {
            this.policy = policy;
        }

        @Override
        public boolean equals(Object other) {
            if (other == null) {
                return false;
            }
            if (other.getClass() != getClass()) {
                return false;
            }
            return ((UnknownThrottlePolicySignature) other).policy == policy;
        }
    }

    static class SourceSessionKey {

        private final double timeout;
        private final ThrottlePolicySignature policy;

        SourceSessionKey(SourceSessionParams p) {
            timeout = p.getTimeout();
            policy = createSignature(p.getThrottlePolicy());
        }

        private static ThrottlePolicySignature createSignature(ThrottlePolicy policy) {
            Class<?> policyClass = policy.getClass();
            if (policyClass == DynamicThrottlePolicy.class) {
                return new DynamicThrottlePolicySignature((DynamicThrottlePolicy) policy);
            } else if (policyClass == StaticThrottlePolicy.class) {
                return new StaticThrottlePolicySignature((StaticThrottlePolicy) policy);
            } else {
                return new UnknownThrottlePolicySignature(policy);
            }
        }

        @Override
        public String toString() {
            return "SourceSessionKey{" + "timeout=" + timeout + ", policy=" + policy + '}';
        }

        @Override
        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = prime * result + ((policy == null) ? 0 : policy.hashCode());
            long temp;
            temp = Double.doubleToLongBits(timeout);
            result = prime * result + (int) (temp ^ (temp >>> 32));
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) return true;
            if (obj == null) return false;
            if (getClass() != obj.getClass()) return false;
            SourceSessionKey other = (SourceSessionKey) obj;
            if (policy == null) {
                if (other.policy != null) return false;
            } else if (!policy.equals(other.policy)) {
                return false;
            }
            if (Double.doubleToLongBits(timeout) != Double.doubleToLongBits(other.timeout)) return false;
            return true;
        }
    }

}