aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
blob: 4c21489ded27791d616f1fc845187f0adda0204e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.jdisc;

import com.google.inject.Inject;
import com.yahoo.jdisc.AbstractResource;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.service.CurrentContainer;
import com.yahoo.jdisc.service.ServerProvider;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageHandler;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.shared.ServerSession;

import java.net.URI;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * @author Simon Thoresen Hult
 */
public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler {

    private enum State {INITIALIZING, RUNNING, STOPPED}
    private final static Logger log = Logger.getLogger(MbusServer.class.getName());
    private final AtomicReference<State> runState = new AtomicReference<>(State.INITIALIZING);
    private final CurrentContainer container;
    private final ServerSession session;
    private final URI uri;
    private final ResourceReference sessionReference;

    @Inject
    public MbusServer(CurrentContainer container, ServerSession session) {
        this.container = container;
        this.session = session;
        uri = URI.create("mbus://localhost/" + session.name());
        session.setMessageHandler(this);
        sessionReference = session.refer(this);
    }

    @Override
    public void start() {
        log.log(Level.FINE, "Starting message bus server.");
        runState.set(State.RUNNING);
        session.connect();
    }

    @Override
    public void close() {
        log.log(Level.FINE, "Closing message bus server.");
        session.disconnect();
        runState.set(State.STOPPED);
    }

    @Override
    public boolean isMultiplexed() {
        return true;
    }

    @Override
    protected void destroy() {
        log.log(Level.INFO, "Destroying message bus server: " + session.name());
        runState.set(State.STOPPED);
        sessionReference.close();
    }

    @Override
    public void handleMessage(Message msg) {
        State state = runState.get();
        if (state == State.INITIALIZING) {
            dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "MBusServer not started.");
            return;
        }
        if (state == State.STOPPED) {
            dispatchErrorReply(msg, ErrorCode.NETWORK_SHUTDOWN, "MBusServer has been closed.");
            return;
        }
        if (msg.getTrace().shouldTrace(6)) {
            msg.getTrace().trace(6, "Message received by MbusServer.");
        }
        Request request = null;
        ContentChannel content = null;
        try {
            request = new MbusRequest(container, uri, msg);
            content = request.connect(new ServerResponseHandler(msg));
        } catch (RuntimeException e) {
            dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString());
        } finally {
            if (request != null) {
                request.release();
            }
        }
        if (content != null) {
            content.close(IgnoredCompletionHandler.INSTANCE);
        }
    }

    public String connectionSpec() {
        return session.connectionSpec();
    }

    private void dispatchErrorReply(Message msg, int errCode, String errMsg) {
        Reply reply = new EmptyReply();
        reply.swapState(msg);
        reply.addError(new Error(errCode, errMsg));
        session.sendReply(reply);
    }

    private class ServerResponseHandler implements ResponseHandler {

        final Message msg;

        ServerResponseHandler(Message msg) {
            this.msg = msg;
        }

        @Override
        public ContentChannel handleResponse(Response response) {
            Reply reply;
            if (response instanceof MbusResponse) {
                reply = ((MbusResponse)response).getReply();
            } else {
                reply = new EmptyReply();
                reply.swapState(msg);
            }
            Error err = StatusCodes.toMbusError(response.getStatus());
            if (err != null) {
                if (err.isFatal()) {
                    if (!reply.hasFatalErrors()) {
                        reply.addError(err);
                    }
                } else {
                    if (!reply.hasErrors()) {
                        reply.addError(err);
                    }
                }
            }
            if (reply.getTrace().shouldTrace(6)) {
                reply.getTrace().trace(6, "Sending reply from MbusServer.");
            }
            session.sendReply(reply);
            return null;
        }
    }
}