blob: 7b5b152d65a85321b2eed26df180597dd792bb17 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;
import com.google.inject.Inject;
import com.yahoo.container.di.componentgraph.Provider;
import com.yahoo.container.jdisc.config.SessionConfig;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.messagebus.AllPassThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.jdisc.MbusClient;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedSourceSession;
/**
* @author tonytv
* @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
*/
public class MbusClientProvider implements Provider<MbusClient> {
private final MbusClient client;
private static MbusClient createSourceClient(
SessionCache sessionCache,
boolean setAllPassThrottlePolicy) {
final SourceSessionParams sourceSessionParams = new SourceSessionParams();
if (setAllPassThrottlePolicy) {
sourceSessionParams.setThrottlePolicy(new AllPassThrottlePolicy());
}
try (ReferencedResource<SharedSourceSession> ref = sessionCache.retainSource(sourceSessionParams)) {
return new MbusClient(ref.getResource());
}
}
@Inject
public MbusClientProvider(SessionCache sessionCache, SessionConfig sessionConfig) {
switch (sessionConfig.type()) {
case INTERMEDIATE:
final IntermediateSessionParams intermediateSessionParams =
MbusServerProvider.createIntermediateSessionParams(true, sessionConfig.name());
try (final ReferencedResource<SharedIntermediateSession> ref =
sessionCache.retainIntermediate(intermediateSessionParams)) {
client = new MbusClient(ref.getResource());
}
break;
case SOURCE:
client = createSourceClient(sessionCache, false);
break;
case INTERNAL:
client = createSourceClient(sessionCache, true);
break;
default:
throw new IllegalArgumentException("Unknown session type: " + sessionConfig.type());
}
}
@Override
public MbusClient get() {
return client;
}
@Override
public void deconstruct() {
client.release();
}
}
|