summaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java
blob: a695060649aecb480f9e68914439c8194ef768b9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.rpc;

import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.concurrent.SystemTimer;
import com.yahoo.concurrent.Timer;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * Class used to reuse targets for the same address when sending messages over the rpc network.
 *
 * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
 */
public class RPCTargetPool {

    private final Map<String, Entry> targets = new HashMap<String, Entry>();
    private final Timer timer;
    private final long expireMillis;

    /**
     * Constructs a new instance of this class, and registers the {@link SystemTimer} for detecting and closing
     * connections that have expired according to the given parameter.
     *
     * @param expireSecs The number of seconds until an idle connection is closed.
     */
    public RPCTargetPool(double expireSecs) {
        this(SystemTimer.INSTANCE, expireSecs);
    }

    /**
     * Constructs a new instance of this class, using the given {@link Timer} for detecting and closing connections that
     * have expired according to the second paramter.
     *
     * @param timer      The timer to use for connection expiration.
     * @param expireSecs The number of seconds until an idle connection is closed.
     */
    public RPCTargetPool(Timer timer, double expireSecs) {
        this.timer = timer;
        this.expireMillis = (long)(expireSecs * 1000);
    }

    /**
     * Closes all unused target connections. Unless the force argument is true, this method will allow a grace period
     * for all connections after last use before it starts closing them. This allows the most recently used connections
     * to stay open.
     *
     * @param force Whether or not to force flush.
     */
    public synchronized void flushTargets(boolean force) {
        Iterator<Entry> it = targets.values().iterator();
        long currentTime = timer.milliTime();
        long expireTime = currentTime - expireMillis;
        while (it.hasNext()) {
            Entry entry = it.next();
            RPCTarget target = entry.target;
            if (target.getJRTTarget().isValid()) {
                if (target.getRefCount() > 1) {
                    entry.lastUse = currentTime;
                    continue; // someone is using this
                }
                if (!force) {
                    if (entry.lastUse > expireTime) {
                        continue; // not sufficiently idle
                    }
                }
            }
            target.subRef();
            it.remove();
        }
    }

    /**
     * This method will return a target for the given address. If a target does not currently exist for the given
     * address, it will be created and added to the internal map. Each target is also reference counted so that an
     * active target is never expired.
     *
     * @param orb     The supervisor to use to connect to the target.
     * @param address The address to resolve to a target.
     * @return A target for the given address.
     */
    public RPCTarget getTarget(Supervisor orb, RPCServiceAddress address) {
        Spec spec = address.getConnectionSpec();
        String key = spec.toString();
        RPCTarget ret;
        synchronized (this) {
            Entry entry = targets.get(key);
            if (entry != null) {
                if (entry.target.getJRTTarget().isValid()) {
                    entry.target.addRef();
                    entry.lastUse = timer.milliTime();
                    return entry.target;
                }
                entry.target.subRef();
                targets.remove(key);
            }
            ret = new RPCTarget(spec, orb);
            targets.put(key, new Entry(ret, timer.milliTime()));
        }
        ret.addRef();
        return ret;
    }


    /**
     * Returns the number of targets currently contained in this.
     *
     * @return The size of the internal map.
     */
    public synchronized int size() {
        return targets.size();
    }

    /**
     * Implements a helper class holds the necessary reference and timestamp of a target. The lastUse member is updated
     * when a call to {@link RPCTargetPool#flushTargets(boolean)} iterates over an active target.
     */
    private static class Entry {

        final RPCTarget target;
        long lastUse = 0;

        Entry(RPCTarget target, long lastUse) {
            this.target = target;
            this.lastUse = lastUse;
        }
    }
}