diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-09-28 15:00:27 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-09-28 15:00:27 +0200 |
commit | f9dd06806afef90b069f4fa0c99f77beb040851d (patch) | |
tree | ecce12a3af04d80e5d39b0ebce919aa5c0fef81b /documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java | |
parent | 321194bbe4a36d100d92188644d90c6cf4d373c7 (diff) |
Support document-api in application
Diffstat (limited to 'documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java')
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java | 40 |
1 files changed, 30 insertions, 10 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java index 818bc204784..bd191eccbb8 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java @@ -9,6 +9,8 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.messagebus.MessageBus; import com.yahoo.messagebus.RPCMessageBus; import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; import com.yahoo.messagebus.routing.RoutingTable; import java.util.concurrent.Executors; @@ -22,7 +24,13 @@ import java.util.concurrent.ScheduledExecutorService; */ public class MessageBusDocumentAccess extends DocumentAccess { + // either private final RPCMessageBus bus; + // ... or + private final MessageBus messageBus; + private final Network network; + // ... TODO: Do that cleanly + private final MessageBusParams params; // TODO: make pool size configurable? ScheduledExecutorService is not dynamic private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool( @@ -46,32 +54,44 @@ public class MessageBusDocumentAccess extends DocumentAccess { try { com.yahoo.messagebus.MessageBusParams mbusParams = new com.yahoo.messagebus.MessageBusParams(params.getMessageBusParams()); mbusParams.addProtocol(new DocumentProtocol(getDocumentTypeManager(), params.getProtocolConfigId(), params.getLoadTypes())); - bus = new RPCMessageBus(mbusParams, - params.getRPCNetworkParams(), - params.getRoutingConfigId()); + if (System.getProperty("vespa.local", "false").equals("true")) { // TODO: Hackety hack ... see Application + bus = null; + network = new LocalNetwork(new LocalWire()); + messageBus = new MessageBus(network, mbusParams); + } + else { + bus = new RPCMessageBus(mbusParams, + params.getRPCNetworkParams(), + params.getRoutingConfigId()); + network = null; + messageBus = null; + } } catch (Exception e) { throw new DocumentAccessException(e); } } + + private MessageBus messageBus() { + return bus != null ? bus.getMessageBus() : messageBus; + } @Override public void shutdown() { + super.shutdown(); + if (bus != null) bus.destroy(); - if (documentTypeManagerConfig != null) { - documentTypeManagerConfig.close(); - } scheduledExecutorService.shutdownNow(); } @Override public MessageBusSyncSession createSyncSession(SyncParameters parameters) { - return new MessageBusSyncSession(parameters, bus.getMessageBus(), this.params); + return new MessageBusSyncSession(parameters, messageBus(), this.params); } @Override public MessageBusAsyncSession createAsyncSession(AsyncParameters parameters) { - return new MessageBusAsyncSession(parameters, bus.getMessageBus(), this.params); + return new MessageBusAsyncSession(parameters, messageBus(), this.params); } @Override @@ -107,7 +127,7 @@ public class MessageBusDocumentAccess extends DocumentAccess { * @return The internal message bus. */ public MessageBus getMessageBus() { - return bus.getMessageBus(); + return messageBus(); } /** @@ -118,7 +138,7 @@ public class MessageBusDocumentAccess extends DocumentAccess { * @return The network layer. */ public Network getNetwork() { - return bus.getRPCNetwork(); + return bus != null ? bus.getRPCNetwork() : network; } /** |