diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-25 10:37:50 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-25 10:37:50 +0200 |
commit | 17c6dc3a23badb17f468fe8b38bda57eab717a4c (patch) | |
tree | 9f8c1b332df340051dc7f2cff1caca3e5d804d2b | |
parent | f6d6f0db9f6dab4a48aea33dd8c41b28ff624ad5 (diff) |
Eliminate config self-subscription from MessageBusDocumentAccess
4 files changed, 45 insertions, 9 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java b/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java index 1fc156d4e90..deabcbad9b4 100644 --- a/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java +++ b/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java @@ -1,6 +1,7 @@ package com.yahoo.container.core.documentapi; import com.google.inject.Inject; +import com.yahoo.cloud.config.SlobroksConfig; import com.yahoo.component.AbstractComponent; import com.yahoo.container.di.componentgraph.Provider; import com.yahoo.document.config.DocumentmanagerConfig; @@ -20,6 +21,7 @@ import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; +import com.yahoo.messagebus.MessagebusConfig; import com.yahoo.vespa.config.content.LoadTypeConfig; /** @@ -33,8 +35,9 @@ public class DocumentAccessProvider extends AbstractComponent implements Provide @Inject // TODO jonmv: Have Slobrok and RPC config injected as well. - public DocumentAccessProvider(DocumentmanagerConfig documentmanagerConfig, LoadTypeConfig loadTypeConfig) { - this.access = new LazyWrapper(documentmanagerConfig, loadTypeConfig); + public DocumentAccessProvider(DocumentmanagerConfig documentmanagerConfig, LoadTypeConfig loadTypeConfig, + SlobroksConfig slobroksConfig, MessagebusConfig messagebusConfig) { + this.access = new LazyWrapper(documentmanagerConfig, loadTypeConfig, slobroksConfig, messagebusConfig); } @Override @@ -50,18 +53,21 @@ public class DocumentAccessProvider extends AbstractComponent implements Provide public static class LazyWrapper extends DocumentAccess { - private final DocumentmanagerConfig documentmanagerConfig; - private final LoadTypeConfig loadTypeConfig; + private final MessageBusParams parameters; private final Object monitor = new Object(); private DocumentAccess delegate = null; private boolean shutDown = false; private LazyWrapper(DocumentmanagerConfig documentmanagerConfig, - LoadTypeConfig loadTypeConfig) { + LoadTypeConfig loadTypeConfig, + SlobroksConfig slobroksConfig, + MessagebusConfig messagebusConfig) { super(new DocumentAccessParams().setDocumentmanagerConfig(documentmanagerConfig)); - this.documentmanagerConfig = documentmanagerConfig; - this.loadTypeConfig = loadTypeConfig; + this.parameters = new MessageBusParams(new LoadTypeSet(loadTypeConfig)); + this.parameters.setDocumentmanagerConfig(documentmanagerConfig); + this.parameters.getRPCNetworkParams().setSlobroksConfig(slobroksConfig); + this.parameters.getMessageBusParams().setMessageBusConfig(messagebusConfig); } private DocumentAccess delegate() { @@ -70,7 +76,7 @@ public class DocumentAccessProvider extends AbstractComponent implements Provide if (shutDown) throw new IllegalStateException("This document access has been shut down"); - delegate = new MessageBusDocumentAccess((MessageBusParams) new MessageBusParams(new LoadTypeSet(loadTypeConfig)).setDocumentmanagerConfig(documentmanagerConfig)); + delegate = new MessageBusDocumentAccess(parameters); } return delegate; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java index c60cd9cc378..3e35e9bd12a 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java @@ -60,7 +60,9 @@ public class MessageBusDocumentAccess extends DocumentAccess { bus = new NetworkMessageBus(network, new MessageBus(network, mbusParams)); } else { - bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId()); + bus = params.getRPCNetworkParams().getSlobroksConfig() != null && mbusParams.getMessageBusConfig() != null + ? new RPCMessageBus(mbusParams, params.getRPCNetworkParams()) // prefer without self-subscription if config is set + : new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId()); } } catch (Exception e) { diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java index 7f55401cf43..2c8382ab715 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java @@ -19,6 +19,7 @@ public class MessageBusParams { private RetryPolicy retryPolicy; private int maxPendingCount; private int maxPendingSize; + private MessagebusConfig config; /** * Constructs a new instance of this parameter object with default values for all members. @@ -27,6 +28,7 @@ public class MessageBusParams { retryPolicy = new RetryTransientErrorsPolicy(); maxPendingCount = 1024; maxPendingSize = 128 * 1024 * 1024; + config = null; } /** @@ -39,6 +41,7 @@ public class MessageBusParams { retryPolicy = params.retryPolicy; maxPendingCount = params.maxPendingCount; maxPendingSize = params.maxPendingSize; + config = params.config; } /** @@ -143,4 +146,14 @@ public class MessageBusParams { this.maxPendingSize = maxSize; return this; } + + public MessagebusConfig getMessageBusConfig() { + return config; + } + + public MessageBusParams setMessageBusConfig(MessagebusConfig config) { + this.config = config; + return this; + } + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java index 5ea278c410b..f5bf03c5420 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java @@ -21,6 +21,21 @@ public class RPCMessageBus extends NetworkMessageBus { /** * Constructs a new instance of this class. * + * @param mbusParams A complete set of message bus parameters, including messagebus config. + * @param rpcParams A complete set of network parameters, including rpc network config. + */ + public RPCMessageBus(MessageBusParams mbusParams, RPCNetworkParams rpcParams) { + this(mbusParams, new RPCNetwork(rpcParams)); + } + + private RPCMessageBus(MessageBusParams mbusParams, RPCNetwork network) { + super(network, new MessageBus(network, mbusParams)); + configAgent = new ConfigAgent(mbusParams.getMessageBusConfig(), getMessageBus()); + } + + /** + * Constructs a new instance of this class. + * * @param mbusParams A complete set of message bus parameters. * @param rpcParams A complete set of network parameters. * @param routingCfgId The config id for message bus routing specs. |