summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--application/src/main/java/com/yahoo/application/Application.java9
-rw-r--r--document/src/test/java/com/yahoo/document/json/DocumentUpdateJsonSerializerTest.java1
-rw-r--r--document/src/test/java/com/yahoo/document/json/JsonTestHelper.java3
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java53
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java43
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java49
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java4
7 files changed, 93 insertions, 69 deletions
diff --git a/application/src/main/java/com/yahoo/application/Application.java b/application/src/main/java/com/yahoo/application/Application.java
index 22c3a94bedc..cfcce72487b 100644
--- a/application/src/main/java/com/yahoo/application/Application.java
+++ b/application/src/main/java/com/yahoo/application/Application.java
@@ -44,6 +44,13 @@ import java.util.*;
@Beta
public final class Application implements AutoCloseable {
+ /**
+ * This system property is set to "true" upon creation of an Application.
+ * This is useful for components which are created by dependendy injection which needs to modify
+ * their behavior to function without reliance on any processes outside the JVM.
+ */
+ public static final String vespaLocalProperty = "vespa.local";
+
private final JDisc container;
private final List<ContentCluster> contentClusters;
private final Path path;
@@ -51,7 +58,7 @@ public final class Application implements AutoCloseable {
// For internal use only
Application(Path path, Networking networking, boolean deletePathWhenClosing) {
- System.setProperty("vespa.local", "true");
+ System.setProperty(vespaLocalProperty, "true");
this.path = path;
this.deletePathWhenClosing = deletePathWhenClosing;
contentClusters = ContentCluster.fromPath(path);
diff --git a/document/src/test/java/com/yahoo/document/json/DocumentUpdateJsonSerializerTest.java b/document/src/test/java/com/yahoo/document/json/DocumentUpdateJsonSerializerTest.java
index 53e7ed701a7..b3deae547ab 100644
--- a/document/src/test/java/com/yahoo/document/json/DocumentUpdateJsonSerializerTest.java
+++ b/document/src/test/java/com/yahoo/document/json/DocumentUpdateJsonSerializerTest.java
@@ -26,6 +26,7 @@ import static com.yahoo.document.json.JsonTestHelper.inputJson;
* @author Vegard Sjonfjell
*/
public class DocumentUpdateJsonSerializerTest {
+
final static DocumentTypeManager types = new DocumentTypeManager();
final static JsonFactory parserFactory = new JsonFactory();
final static DocumentType docType = new DocumentType("doctype");
diff --git a/document/src/test/java/com/yahoo/document/json/JsonTestHelper.java b/document/src/test/java/com/yahoo/document/json/JsonTestHelper.java
index 45128e6d02a..09e6a74e68a 100644
--- a/document/src/test/java/com/yahoo/document/json/JsonTestHelper.java
+++ b/document/src/test/java/com/yahoo/document/json/JsonTestHelper.java
@@ -1,5 +1,6 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.json;
+
import com.google.common.base.Joiner;
import static org.hamcrest.MatcherAssert.assertThat;
import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs;
@@ -8,6 +9,7 @@ import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs;
* @author Vegard Sjonfjell
*/
public class JsonTestHelper {
+
/**
* Convenience method to input JSON without escaping double quotes and newlines
* Each parameter represents a line of JSON encoded data
@@ -23,4 +25,5 @@ public class JsonTestHelper {
public static void assertJsonEquals(String inputJson, String expectedJson) {
assertThat(inputJson, sameJSONAs(expectedJson));
}
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
index bd191eccbb8..0a57a700276 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
@@ -7,6 +7,7 @@ import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.*;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.NetworkMessageBus;
import com.yahoo.messagebus.RPCMessageBus;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.local.LocalNetwork;
@@ -19,22 +20,18 @@ import java.util.concurrent.ScheduledExecutorService;
/**
* This class implements the {@link DocumentAccess} interface using message bus for communication.
*
- * @author <a href="mailto:einarmr@yahoo-inc.com">Einar Rosenvinge</a>
+ * @author Einar Rosenvinge
* @author bratseth
*/
public class MessageBusDocumentAccess extends DocumentAccess {
- // either
- private final RPCMessageBus bus;
- // ... or
- private final MessageBus messageBus;
- private final Network network;
- // ... TODO: Do that cleanly
+ private final NetworkMessageBus bus;
private final MessageBusParams params;
// TODO: make pool size configurable? ScheduledExecutorService is not dynamic
- private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
- Runtime.getRuntime().availableProcessors(), ThreadFactoryFactory.getDaemonThreadFactory("mbus.access.scheduler"));
+ private final ScheduledExecutorService scheduledExecutorService =
+ Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
+ ThreadFactoryFactory.getDaemonThreadFactory("mbus.access.scheduler"));
/**
* Creates a new document access using default values for all parameters.
@@ -54,17 +51,12 @@ public class MessageBusDocumentAccess extends DocumentAccess {
try {
com.yahoo.messagebus.MessageBusParams mbusParams = new com.yahoo.messagebus.MessageBusParams(params.getMessageBusParams());
mbusParams.addProtocol(new DocumentProtocol(getDocumentTypeManager(), params.getProtocolConfigId(), params.getLoadTypes()));
- if (System.getProperty("vespa.local", "false").equals("true")) { // TODO: Hackety hack ... see Application
- bus = null;
- network = new LocalNetwork(new LocalWire());
- messageBus = new MessageBus(network, mbusParams);
+ if (System.getProperty("vespa.local", "false").equals("true")) { // set by Application when running locally
+ LocalNetwork network = new LocalNetwork();
+ bus = new NetworkMessageBus(network, new MessageBus(network, mbusParams));
}
else {
- bus = new RPCMessageBus(mbusParams,
- params.getRPCNetworkParams(),
- params.getRoutingConfigId());
- network = null;
- messageBus = null;
+ bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId());
}
}
catch (Exception e) {
@@ -73,13 +65,12 @@ public class MessageBusDocumentAccess extends DocumentAccess {
}
private MessageBus messageBus() {
- return bus != null ? bus.getMessageBus() : messageBus;
+ return bus.getMessageBus();
}
@Override
public void shutdown() {
super.shutdown();
- if (bus != null)
bus.destroy();
scheduledExecutorService.shutdownNow();
}
@@ -121,34 +112,20 @@ public class MessageBusDocumentAccess extends DocumentAccess {
throw new UnsupportedOperationException("Subscriptions not supported.");
}
- /**
- * Returns the internal message bus object so that clients can use it directly.
- *
- * @return The internal message bus.
- */
- public MessageBus getMessageBus() {
- return messageBus();
- }
+ /** Returns the internal message bus object so that clients can use it directly. */
+ public MessageBus getMessageBus() { return messageBus(); }
/**
* Returns the network layer of the internal message bus object so that clients can use it directly. This may seem
* abit arbitrary, but the fact is that the RPCNetwork actually implements the IMirror API as well as exposing the
* SystemState object.
- *
- * @return The network layer.
*/
- public Network getNetwork() {
- return bus != null ? bus.getRPCNetwork() : network;
- }
+ public Network getNetwork() { return bus.getNetwork(); }
/**
* Returns the parameter object that controls the underlying message bus. Changes to these parameters do not affect
* previously created sessions.
- *
- * @return The parameter object.
*/
- public MessageBusParams getParams() {
- return params;
- }
+ public MessageBusParams getParams() { return params; }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java
new file mode 100644
index 00000000000..24e177f1fbf
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java
@@ -0,0 +1,43 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.messagebus.network.Network;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The combination of a messagebus and a network over which it may send data.
+ *
+ * @author bratseth
+ */
+public class NetworkMessageBus {
+
+ private final Network network;
+ private final MessageBus messageBus;
+
+ private final AtomicBoolean destroyed = new AtomicBoolean(false);
+
+ public NetworkMessageBus(Network network, MessageBus messageBus) {
+ this.network = network;
+ this.messageBus = messageBus;
+ }
+
+ /** Returns the contained message bus object */
+ public MessageBus getMessageBus() { return messageBus; }
+
+ /** Returns the network of this as a Network */
+ public Network getNetwork() { return network; }
+
+ /**
+ * Irreversibly destroys the content of this.
+ *
+ * @return whether this destroyed anything, or if it was already destroyed
+ */
+ public boolean destroy() {
+ if ( destroyed.getAndSet(true)) return false;
+
+ getMessageBus().destroy();
+ return true;
+ }
+
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
index d767e197b11..cfa50a35549 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
@@ -3,6 +3,7 @@ package com.yahoo.messagebus;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -17,12 +18,9 @@ import java.util.logging.Logger;
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public class RPCMessageBus {
+public class RPCMessageBus extends NetworkMessageBus {
private static final Logger log = Logger.getLogger(RPCMessageBus.class.getName());
- private final AtomicBoolean destroyed = new AtomicBoolean(false);
- private final MessageBus mbus;
- private final RPCNetwork net;
private final ConfigAgent configAgent;
/**
@@ -33,9 +31,16 @@ public class RPCMessageBus {
* @param routingCfgId The config id for message bus routing specs.
*/
public RPCMessageBus(MessageBusParams mbusParams, RPCNetworkParams rpcParams, String routingCfgId) {
- net = new RPCNetwork(rpcParams);
- mbus = new MessageBus(net, mbusParams);
- configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", mbus);
+ this(mbusParams, new RPCNetwork(rpcParams), routingCfgId);
+ }
+
+ private RPCMessageBus(MessageBusParams mbusParams, RPCNetwork network, String routingCfgId) {
+ this(new MessageBus(network, mbusParams), network, routingCfgId);
+ }
+
+ private RPCMessageBus(MessageBus messageBus, RPCNetwork network, String routingCfgId) {
+ super(network, messageBus);
+ configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", messageBus);
configAgent.subscribe();
}
@@ -80,33 +85,17 @@ public class RPCMessageBus {
* Sets the destroyed flag to true. The very first time this method is called, it cleans up all its dependencies.
* Even if you retain a reference to this object, all of its content is allowed to be garbage collected.
*
- * @return True if content existed and was destroyed.
+ * @return true if content existed and was destroyed.
*/
+ @Override
public boolean destroy() {
- if (!destroyed.getAndSet(true)) {
+ boolean destroyed = super.destroy();
+ if (destroyed)
configAgent.shutdown();
- mbus.destroy();
- return true;
- }
- return false;
+ return destroyed;
}
- /**
- * Returns the contained message bus object.
- *
- * @return Message bus.
- */
- public MessageBus getMessageBus() {
- return mbus;
- }
-
- /**
- * Returns the contained rpc network object.
- *
- * @return RPC network.
- */
- public RPCNetwork getRPCNetwork() {
- return net;
- }
+ /** Returns the network of this as a RPCNetwork */
+ public RPCNetwork getRPCNetwork() { return (RPCNetwork)getNetwork(); }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
index bbbb49e4370..78cf352cfbf 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
@@ -32,6 +32,10 @@ public class LocalNetwork implements Network {
private final String hostId;
private volatile NetworkOwner owner;
+ public LocalNetwork() {
+ this(new LocalWire());
+ }
+
public LocalNetwork(LocalWire wire) {
this.wire = wire;
this.hostId = wire.newHostId();