summaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
blob: edb426c6392253589be3329d8dfb9052455a170c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus;

import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.*;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.NetworkMessageBus;
import com.yahoo.messagebus.RPCMessageBus;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.local.LocalNetwork;
import com.yahoo.messagebus.network.local.LocalWire;
import com.yahoo.messagebus.routing.RoutingTable;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/**
 * This class implements the {@link DocumentAccess} interface using message bus for communication.
 *
 * @author Einar Rosenvinge
 * @author bratseth
 */
public class MessageBusDocumentAccess extends DocumentAccess {

    private final NetworkMessageBus bus;

    private final MessageBusParams params;
    // TODO: make pool size configurable? ScheduledExecutorService is not dynamic
    private final ScheduledExecutorService scheduledExecutorService =
            Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
                                             ThreadFactoryFactory.getDaemonThreadFactory("mbus.access.scheduler"));

    /**
     * Creates a new document access using default values for all parameters.
     */
    public MessageBusDocumentAccess() {
        this(new MessageBusParams());
    }

    /**
     * Creates a new document access using the supplied parameters.
     *
     * @param params All parameters for construction.
     */
    public MessageBusDocumentAccess(MessageBusParams params) {
        super(params);
        this.params = params;
        try {
            com.yahoo.messagebus.MessageBusParams mbusParams = new com.yahoo.messagebus.MessageBusParams(params.getMessageBusParams());
            mbusParams.addProtocol(new DocumentProtocol(getDocumentTypeManager(), params.getProtocolConfigId(), params.getLoadTypes()));
            if (System.getProperty("vespa.local", "false").equals("true")) { // set by Application when running locally
                LocalNetwork network = new LocalNetwork();
                bus = new NetworkMessageBus(network, new MessageBus(network, mbusParams));
            }
            else {
                bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId());
            }
        }
        catch (Exception e) {
            throw new DocumentAccessException(e);
        }
    }

    private MessageBus messageBus() {
        return bus.getMessageBus();
    }

    @Override
    public void shutdown() {
        super.shutdown();
        bus.destroy();
        scheduledExecutorService.shutdownNow();
    }

    @Override
    public MessageBusSyncSession createSyncSession(SyncParameters parameters) {
        return new MessageBusSyncSession(parameters, messageBus(), this.params);
    }

    @Override
    public MessageBusAsyncSession createAsyncSession(AsyncParameters parameters) {
        return new MessageBusAsyncSession(parameters, messageBus(), this.params);
    }

    @Override
    public MessageBusVisitorSession createVisitorSession(VisitorParameters params) throws ParseException, IllegalArgumentException {
        MessageBusVisitorSession session = MessageBusVisitorSession.createForMessageBus(
                bus.getMessageBus(), scheduledExecutorService, params);
        session.start();
        return session;
    }

    @Override
    public MessageBusVisitorDestinationSession createVisitorDestinationSession(VisitorDestinationParameters params) {
        return new MessageBusVisitorDestinationSession(params, bus.getMessageBus());
    }

    @Override
    public SubscriptionSession createSubscription(SubscriptionParameters parameters) {
        throw new UnsupportedOperationException("Subscriptions not supported.");
    }

    @Override
    public SubscriptionSession openSubscription(SubscriptionParameters parameters) {
        throw new UnsupportedOperationException("Subscriptions not supported.");
    }

    /** Returns the internal message bus object so that clients can use it directly. */
    public MessageBus getMessageBus() { return messageBus(); }

    /**
     * Returns the network layer of the internal message bus object so that clients can use it directly. This may seem
     * abit arbitrary, but the fact is that the RPCNetwork actually implements the IMirror API as well as exposing the
     * SystemState object.
     */
    public Network getNetwork() { return bus.getNetwork(); }

    /**
     * Returns the parameter object that controls the underlying message bus. Changes to these parameters do not affect
     * previously created sessions.
     */
    public MessageBusParams getParams() { return params; }

}