aboutsummaryrefslogtreecommitdiffstats
path: root/jrt/src/com/yahoo/jrt/Acceptor.java
blob: 14b35c5893fb7de6e3b6e15ef096eb297be89c03 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;

import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A class used to listen on a network socket. A separate thread is
 * used to accept connections and register them with the underlying
 * transport thread. To create an acceptor you need to invoke the
 * {@link Supervisor#listen listen} method in the {@link Supervisor}
 * class.
 */
public class Acceptor {

    private class Run implements Runnable {
        public void run() {
            try {
                Acceptor.this.run();
            } catch (Throwable problem) {
                parent.handleFailure(problem, Acceptor.this);
            }
        }
    }

    private final static Logger log = Logger.getLogger(Acceptor.class.getName());

    private final Thread         thread = new Thread(new Run(), "<jrt-acceptor>");
    private final CountDownLatch shutdownGate = new CountDownLatch(1);
    private final Transport      parent;
    private final Supervisor     owner;

    private final ServerSocketChannel serverChannel;

    Acceptor(Transport parent, Supervisor owner, Spec spec) throws ListenFailedException {
        this.parent = parent;
        this.owner  = owner;

        if (spec.malformed())
            throw new ListenFailedException("Malformed spec '" + spec + "'");

        serverChannel = createServerSocketChannel(spec);

        thread.setDaemon(true);
        thread.start();
    }

    private static ServerSocketChannel createServerSocketChannel(Spec spec) throws ListenFailedException {
        ServerSocketChannel serverChannel = null;
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(true);
            if (spec.port() != 0) {
                serverChannel.socket().setReuseAddress(true);
            }
            serverChannel.socket().bind(spec.resolveAddress(), 500);
        } catch (Exception e) {
            if (serverChannel != null) {
                try { serverChannel.socket().close(); } catch (Exception ignore) {}
            }
            throw new ListenFailedException("Failed to listen to " + spec, e);
        }
        return serverChannel;
    }

    /**
     * Obtain the local port number this Acceptor is listening to. If
     * this Acceptor is no longer listening (it has been shut down),
     * -1 will be returned.
     *
     * @return listening port, or -1 if not listening
     **/
    public int port() {
        if (!serverChannel.isOpen()) {
            return -1;
        }
        return serverChannel.socket().getLocalPort();
    }

    /**
     * Obtain the Spec for the local port and host interface this Acceptor
     * is listening to.  If this Acceptor is no longer listening (it has
     * been shut down), null will be returned.
     *
     * @return listening spec, or null if not listening
     */
    public Spec spec() {
        if ( ! serverChannel.isOpen()) {
            return null;
        }
        return new Spec(serverChannel.socket().getInetAddress().getHostName(),
                        serverChannel.socket().getLocalPort());
    }

    private void run() {
        while (serverChannel.isOpen()) {
            try {
                TransportThread tt = parent.selectThread();
                tt.addConnection(new Connection(tt, owner, serverChannel.accept(), parent.getTcpNoDelay()));
                tt.sync();
            } catch (ClosedChannelException ignore) {
            } catch (Exception e) {
                log.log(Level.WARNING, "Error accepting connection", e);
            }
        }
        while (true) {
            try {
                shutdownGate.await();
                return;
            } catch (InterruptedException ignore) {}
        }
    }

    /**
     * Initiate controlled shutdown of the acceptor thread
     *
     * @return this object, to enable chaining with {@link #join join}
     **/
    public Acceptor shutdown() {
        try {
            serverChannel.socket().close();
        } catch (Exception e1) {
            log.log(Level.WARNING, "Error closing server socket", e1);
            Thread.yield(); // throw some salt over the shoulder
            try {
                serverChannel.socket().close();
            } catch (Exception e2) {
                log.log(Level.WARNING, "Error closing server socket", e2);
                Thread.yield(); // throw some salt over the shoulder
                try {
                    serverChannel.socket().close();
                } catch (Exception e3) {
                    log.log(Level.WARNING, "Error closing server socket", e3);
                    throw new Error("Error closing server socket 3 times", e3);
                }
            }
        } finally {
            shutdownGate.countDown();
        }
        return this;
    }

    /**
     * Wait for the acceptor thread to finish
     **/
    public void join() {
        while (true) {
            try {
                thread.join();
                return;
            } catch (InterruptedException ignore) {}
        }
    }
}