1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.*;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.NetworkMessageBus;
import com.yahoo.messagebus.RPCMessageBus;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.local.LocalNetwork;
import com.yahoo.messagebus.network.local.LocalWire;
import com.yahoo.messagebus.routing.RoutingTable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* This class implements the {@link DocumentAccess} interface using message bus for communication.
*
* @author Einar Rosenvinge
* @author bratseth
*/
public class MessageBusDocumentAccess extends DocumentAccess {
private final NetworkMessageBus bus;
private final MessageBusParams params;
// TODO: make pool size configurable? ScheduledExecutorService is not dynamic
private final ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
ThreadFactoryFactory.getDaemonThreadFactory("mbus.access.scheduler"));
/**
* Creates a new document access using default values for all parameters.
*/
public MessageBusDocumentAccess() {
this(new MessageBusParams());
}
/**
* Creates a new document access using the supplied parameters.
*
* @param params All parameters for construction.
*/
public MessageBusDocumentAccess(MessageBusParams params) {
super(params);
this.params = params;
try {
com.yahoo.messagebus.MessageBusParams mbusParams = new com.yahoo.messagebus.MessageBusParams(params.getMessageBusParams());
mbusParams.addProtocol(new DocumentProtocol(getDocumentTypeManager(), params.getProtocolConfigId(), params.getLoadTypes()));
if (System.getProperty("vespa.local", "false").equals("true")) { // set by Application when running locally
LocalNetwork network = new LocalNetwork();
bus = new NetworkMessageBus(network, new MessageBus(network, mbusParams));
}
else {
bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId());
}
}
catch (Exception e) {
throw new DocumentAccessException(e);
}
}
private MessageBus messageBus() {
return bus.getMessageBus();
}
@Override
public void shutdown() {
super.shutdown();
bus.destroy();
scheduledExecutorService.shutdownNow();
}
@Override
public MessageBusSyncSession createSyncSession(SyncParameters parameters) {
return new MessageBusSyncSession(parameters, messageBus(), this.params);
}
@Override
public MessageBusAsyncSession createAsyncSession(AsyncParameters parameters) {
return new MessageBusAsyncSession(parameters, messageBus(), this.params);
}
@Override
public MessageBusVisitorSession createVisitorSession(VisitorParameters params) throws ParseException, IllegalArgumentException {
MessageBusVisitorSession session = MessageBusVisitorSession.createForMessageBus(
bus.getMessageBus(), scheduledExecutorService, params);
session.start();
return session;
}
@Override
public MessageBusVisitorDestinationSession createVisitorDestinationSession(VisitorDestinationParameters params) {
return new MessageBusVisitorDestinationSession(params, bus.getMessageBus());
}
@Override
public SubscriptionSession createSubscription(SubscriptionParameters parameters) {
throw new UnsupportedOperationException("Subscriptions not supported.");
}
@Override
public SubscriptionSession openSubscription(SubscriptionParameters parameters) {
throw new UnsupportedOperationException("Subscriptions not supported.");
}
/** Returns the internal message bus object so that clients can use it directly. */
public MessageBus getMessageBus() { return messageBus(); }
/**
* Returns the network layer of the internal message bus object so that clients can use it directly. This may seem
* abit arbitrary, but the fact is that the RPCNetwork actually implements the IMirror API as well as exposing the
* SystemState object.
*/
public Network getNetwork() { return bus.getNetwork(); }
/**
* Returns the parameter object that controls the underlying message bus. Changes to these parameters do not affect
* previously created sessions.
*/
public MessageBusParams getParams() { return params; }
}
|