summaryrefslogtreecommitdiffstats
path: root/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
blob: efeaacf225b4e5c443ec59afa33133c38143cb0f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config;

import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.log.LogLevel;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;

/**
 * A pool of JRT connections to a config source (either a config server or a config proxy).
 * The current connection is chosen randomly when calling {#link #setNewCurrentConnection}
 * (since the connection is chosen randomly, it might end up using the same connection again,
 * and it will always do so if there is only one source).
 * The current connection is available with {@link #getCurrent()}.
 * When calling {@link #setError(Connection, int)}, {#link #setNewCurrentConnection} will always be called.
 *
 * @author Gunnar Gauslaa Bergem
 * @author hmusum
 */
public class JRTConnectionPool implements ConnectionPool {

    private static final Logger log = Logger.getLogger(JRTConnectionPool.class.getName());

    private final Supervisor supervisor = new Supervisor(new Transport());
    private final Map<String, JRTConnection> connections = new LinkedHashMap<>();

    // The config sources used by this connection pool.
    private ConfigSourceSet sourceSet = null;

    // The current connection used by this connection pool.
    private volatile JRTConnection currentConnection;

    public JRTConnectionPool(ConfigSourceSet sourceSet) {
        addSources(sourceSet);
    }

    public JRTConnectionPool(List<String> addresses) {
        this(new ConfigSourceSet(addresses));
    }

    public void addSources(ConfigSourceSet sourceSet) {
        this.sourceSet = sourceSet;
        synchronized (connections) {
            for (String address : sourceSet.getSources()) {
                connections.put(address, new JRTConnection(address, supervisor));
            }
        }
        setNewCurrentConnection();
    }

    /**
     * Returns the current JRTConnection instance
     *
     * @return a JRTConnection
     */
    public synchronized JRTConnection getCurrent() {
        return currentConnection;
    }

    /**
     * Returns and set the current JRTConnection instance by randomly choosing
     * from the available sources (this means that you might end up using
     * the same connection).
     *
     * @return a JRTConnection
     */
    public synchronized JRTConnection setNewCurrentConnection() {
        List<JRTConnection> sources = getSources();
        currentConnection = sources.get(ThreadLocalRandom.current().nextInt(0, sources.size()));
        if (log.isLoggable(LogLevel.DEBUG)) {
            log.log(LogLevel.DEBUG, "Choosing new connection: " + currentConnection);
        }
        return currentConnection;
    }

    List<JRTConnection> getSources() {
        List<JRTConnection> ret = new ArrayList<>();
        synchronized (connections) {
            for (JRTConnection source : connections.values()) {
                ret.add(source);
            }
        }
        return ret;
    }

    ConfigSourceSet getSourceSet() {
        return sourceSet;
    }

    @Override
    public void setError(Connection connection, int errorCode) {
        connection.setError(errorCode);
        setNewCurrentConnection();
    }

    public JRTConnectionPool updateSources(List<String> addresses) {
        ConfigSourceSet newSources = new ConfigSourceSet(addresses);
        return updateSources(newSources);
    }

    public JRTConnectionPool updateSources(ConfigSourceSet sourceSet) {
        synchronized (connections) {
            for (JRTConnection conn : connections.values()) {
                conn.getTarget().close();
            }
            connections.clear();
            addSources(sourceSet);
        }
        return this;
    }

    public String getAllSourceAddresses() {
        StringBuilder sb = new StringBuilder();
        synchronized (connections) {
            for (JRTConnection conn : connections.values()) {
                sb.append(conn.getAddress());
                sb.append(",");
            }
        }
        // Remove trailing ","
        sb.deleteCharAt(sb.length() - 1);
        return sb.toString();
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        synchronized (connections) {
            for (JRTConnection conn : connections.values()) {
                sb.append(conn.toString());
                sb.append("\n");
            }
        }
        return sb.toString();
    }

    public void close() {
        supervisor.transport().shutdown().join();
    }

    @Override
    public int getSize() {
        synchronized (connections) {
            return connections.size();
        }
    }

    @Override
    public Supervisor getSupervisor() {
        return supervisor;
    }

}