diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java |
Publish
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java')
-rw-r--r-- | container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java new file mode 100644 index 00000000000..99a4960d25d --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java @@ -0,0 +1,71 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc.messagebus; + +import com.google.inject.Inject; +import com.yahoo.container.di.componentgraph.Provider; +import com.yahoo.container.jdisc.config.SessionConfig; +import com.yahoo.jdisc.ReferencedResource; +import com.yahoo.messagebus.AllPassThrottlePolicy; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.jdisc.MbusClient; +import com.yahoo.messagebus.shared.SharedIntermediateSession; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.messagebus.ThrottlePolicy; +import com.yahoo.messagebus.Message; + +import com.yahoo.messagebus.Reply; + +/** + * @author tonytv + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class MbusClientProvider implements Provider<MbusClient> { + + private final MbusClient client; + + private static MbusClient createSourceClient( + SessionCache sessionCache, + SessionConfig sessionConfig, + boolean setAllPassThrottlePolicy) { + final SourceSessionParams sourceSessionParams = new SourceSessionParams(); + if (setAllPassThrottlePolicy) { + sourceSessionParams.setThrottlePolicy(new AllPassThrottlePolicy()); + } + try (ReferencedResource<SharedSourceSession> ref = sessionCache.retainSource(sourceSessionParams)) { + return new MbusClient(ref.getResource()); + } + } + + @Inject + public MbusClientProvider(SessionCache sessionCache, SessionConfig sessionConfig) { + switch (sessionConfig.type()) { + case INTERMEDIATE: + final IntermediateSessionParams intermediateSessionParams = + MbusServerProvider.createIntermediateSessionParams(true, sessionConfig.name()); + try (final ReferencedResource<SharedIntermediateSession> ref = + sessionCache.retainIntermediate(intermediateSessionParams)) { + client = new MbusClient(ref.getResource()); + } + break; + case SOURCE: + client = createSourceClient(sessionCache, sessionConfig, false); + break; + case INTERNAL: + client = createSourceClient(sessionCache, sessionConfig, true); + break; + default: + throw new IllegalArgumentException("Unknown session type: " + sessionConfig.type()); + } + } + + @Override + public MbusClient get() { + return client; + } + + @Override + public void deconstruct() { + client.release(); + } +} |