aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-25 10:37:50 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-25 10:37:50 +0200
commit17c6dc3a23badb17f468fe8b38bda57eab717a4c (patch)
tree9f8c1b332df340051dc7f2cff1caca3e5d804d2b
parentf6d6f0db9f6dab4a48aea33dd8c41b28ff624ad5 (diff)
Eliminate config self-subscription from MessageBusDocumentAccess
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java22
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java4
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java13
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java15
4 files changed, 45 insertions, 9 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java b/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java
index 1fc156d4e90..deabcbad9b4 100644
--- a/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java
+++ b/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java
@@ -1,6 +1,7 @@
package com.yahoo.container.core.documentapi;
import com.google.inject.Inject;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.component.AbstractComponent;
import com.yahoo.container.di.componentgraph.Provider;
import com.yahoo.document.config.DocumentmanagerConfig;
@@ -20,6 +21,7 @@ import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
import com.yahoo.documentapi.messagebus.MessageBusParams;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
+import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.vespa.config.content.LoadTypeConfig;
/**
@@ -33,8 +35,9 @@ public class DocumentAccessProvider extends AbstractComponent implements Provide
@Inject
// TODO jonmv: Have Slobrok and RPC config injected as well.
- public DocumentAccessProvider(DocumentmanagerConfig documentmanagerConfig, LoadTypeConfig loadTypeConfig) {
- this.access = new LazyWrapper(documentmanagerConfig, loadTypeConfig);
+ public DocumentAccessProvider(DocumentmanagerConfig documentmanagerConfig, LoadTypeConfig loadTypeConfig,
+ SlobroksConfig slobroksConfig, MessagebusConfig messagebusConfig) {
+ this.access = new LazyWrapper(documentmanagerConfig, loadTypeConfig, slobroksConfig, messagebusConfig);
}
@Override
@@ -50,18 +53,21 @@ public class DocumentAccessProvider extends AbstractComponent implements Provide
public static class LazyWrapper extends DocumentAccess {
- private final DocumentmanagerConfig documentmanagerConfig;
- private final LoadTypeConfig loadTypeConfig;
+ private final MessageBusParams parameters;
private final Object monitor = new Object();
private DocumentAccess delegate = null;
private boolean shutDown = false;
private LazyWrapper(DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig) {
+ LoadTypeConfig loadTypeConfig,
+ SlobroksConfig slobroksConfig,
+ MessagebusConfig messagebusConfig) {
super(new DocumentAccessParams().setDocumentmanagerConfig(documentmanagerConfig));
- this.documentmanagerConfig = documentmanagerConfig;
- this.loadTypeConfig = loadTypeConfig;
+ this.parameters = new MessageBusParams(new LoadTypeSet(loadTypeConfig));
+ this.parameters.setDocumentmanagerConfig(documentmanagerConfig);
+ this.parameters.getRPCNetworkParams().setSlobroksConfig(slobroksConfig);
+ this.parameters.getMessageBusParams().setMessageBusConfig(messagebusConfig);
}
private DocumentAccess delegate() {
@@ -70,7 +76,7 @@ public class DocumentAccessProvider extends AbstractComponent implements Provide
if (shutDown)
throw new IllegalStateException("This document access has been shut down");
- delegate = new MessageBusDocumentAccess((MessageBusParams) new MessageBusParams(new LoadTypeSet(loadTypeConfig)).setDocumentmanagerConfig(documentmanagerConfig));
+ delegate = new MessageBusDocumentAccess(parameters);
}
return delegate;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
index c60cd9cc378..3e35e9bd12a 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
@@ -60,7 +60,9 @@ public class MessageBusDocumentAccess extends DocumentAccess {
bus = new NetworkMessageBus(network, new MessageBus(network, mbusParams));
}
else {
- bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId());
+ bus = params.getRPCNetworkParams().getSlobroksConfig() != null && mbusParams.getMessageBusConfig() != null
+ ? new RPCMessageBus(mbusParams, params.getRPCNetworkParams()) // prefer without self-subscription if config is set
+ : new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId());
}
}
catch (Exception e) {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java
index 7f55401cf43..2c8382ab715 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java
@@ -19,6 +19,7 @@ public class MessageBusParams {
private RetryPolicy retryPolicy;
private int maxPendingCount;
private int maxPendingSize;
+ private MessagebusConfig config;
/**
* Constructs a new instance of this parameter object with default values for all members.
@@ -27,6 +28,7 @@ public class MessageBusParams {
retryPolicy = new RetryTransientErrorsPolicy();
maxPendingCount = 1024;
maxPendingSize = 128 * 1024 * 1024;
+ config = null;
}
/**
@@ -39,6 +41,7 @@ public class MessageBusParams {
retryPolicy = params.retryPolicy;
maxPendingCount = params.maxPendingCount;
maxPendingSize = params.maxPendingSize;
+ config = params.config;
}
/**
@@ -143,4 +146,14 @@ public class MessageBusParams {
this.maxPendingSize = maxSize;
return this;
}
+
+ public MessagebusConfig getMessageBusConfig() {
+ return config;
+ }
+
+ public MessageBusParams setMessageBusConfig(MessagebusConfig config) {
+ this.config = config;
+ return this;
+ }
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
index 5ea278c410b..f5bf03c5420 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
@@ -21,6 +21,21 @@ public class RPCMessageBus extends NetworkMessageBus {
/**
* Constructs a new instance of this class.
*
+ * @param mbusParams A complete set of message bus parameters, including messagebus config.
+ * @param rpcParams A complete set of network parameters, including rpc network config.
+ */
+ public RPCMessageBus(MessageBusParams mbusParams, RPCNetworkParams rpcParams) {
+ this(mbusParams, new RPCNetwork(rpcParams));
+ }
+
+ private RPCMessageBus(MessageBusParams mbusParams, RPCNetwork network) {
+ super(network, new MessageBus(network, mbusParams));
+ configAgent = new ConfigAgent(mbusParams.getMessageBusConfig(), getMessageBus());
+ }
+
+ /**
+ * Constructs a new instance of this class.
+ *
* @param mbusParams A complete set of message bus parameters.
* @param rpcParams A complete set of network parameters.
* @param routingCfgId The config id for message bus routing specs.