summaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/OOSClient.java
blob: 8b16fd44cee58b84c6501ef9920e260185407619 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.rpc;

import com.yahoo.jrt.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;

/**
 * This class keeps track of OOS information obtained from a single server. This class is used by the OOSManager class.
 * Note that since this class is only used inside the transport thread it has no synchronization. Using it directly will
 * lead to race conditions and possible crashes.
 *
 * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
 */
public class OOSClient implements Runnable, RequestWaiter {

    private Supervisor orb;
    private Target target = null;
    private Request request = null;
    private boolean requestDone = false;
    private Spec spec;
    private Task task;
    private List<String> oosList = new ArrayList<String>();
    private int requestGen = 0;
    private int listGen = 0;
    private int dumpGen = 0;
    private boolean shutdown = false;

    /**
     * Create a new OOSClient polling oos information from the given server.
     *
     * @param orb  The object used for RPC operations.
     * @param spec The fnet connect spec for oos server.
     */
    public OOSClient(Supervisor orb, Spec spec) {
        this.orb = orb;
        this.spec = spec;

        task = this.orb.transport().createTask(this);
        task.scheduleNow();
    }

    /**
     * Handle a server reply.
     */
    private void handleReply() {
        if (!request.checkReturnTypes("Si")) {
            if (target != null) {
                target.close();
                target = null;
            }
            task.schedule(1.0);
            return;
        }

        Values ret = request.returnValues();
        int retGen = ret.get(1).asInt32();
        if (requestGen != retGen) {
            List<String> oos = new ArrayList<String>();
            oos.addAll(Arrays.asList(ret.get(0).asStringArray()));
            oosList = oos;
            requestGen = retGen;
            listGen = retGen;
        }
        task.schedule(0.1);
    }

    /**
     * Handle server (re)connect.
     */
    private void handleConnect() {
        if (target == null) {
            target = orb.connect(spec);
            requestGen = 0;
        }
    }

    /**
     * Handle server invocation.
     */
    private void handleInvoke() {
        if (target == null) {
            throw new IllegalStateException("Attempting to invoke a request on a null target.");
        }
        request = new Request("fleet.getOOSList");
        request.parameters().add(new Int32Value(requestGen));
        request.parameters().add(new Int32Value(60000));
        target.invokeAsync(request, 70.0, this);
    }

    /**
     * Implements runnable. Performs overall server poll logic.
     */
    public void run() {
        if (shutdown) {
            task.kill();
            if (target != null) {
                target.close();
            }
        } else if (requestDone) {
            requestDone = false;
            handleReply();
        } else {
            handleConnect();
            handleInvoke();
        }
    }

    /**
     * Shut down this OOS client. Invoking this method will take down any active connections and block further activity
     * from this object.
     */
    public void shutdown() {
        shutdown = true;
        task.scheduleNow();
    }

    /**
     * From FRT_IRequestWait, picks up server replies.
     *
     * @param request The request that has completed.
     */
    public void handleRequestDone(Request request) {
        if (request != this.request || requestDone) {
            throw new IllegalStateException("Multiple invocations of RequestDone().");
        }
        requestDone = true;
        task.scheduleNow();
    }

    /**
     * Obtain the connect spec of the OOS server this client is talking to.
     *
     * @return OOS server connect spec
     */
    public Spec getSpec() {
        return spec;
    }

    /**
     * Check if this client has changed. A client has changed if it  has obtain now information after the dumpState
     * method was last invoked.
     *
     * @return True is this client has changed.
     */
    public boolean isChanged() {
        return listGen != dumpGen;
    }

    /**
     * Returns whether or not this client has receieved any reply at all from the server it is connected to.
     *
     * @return True if initial request has returned.
     */
    public boolean isReady() {
        return listGen != 0;
    }

    /**
     * Dump the current oos information known by this client into the given string set.
     *
     * @param dst The object used to aggregate oos information.
     */
    public void dumpState(Set<String> dst) {
        dst.addAll(oosList);
        dumpGen = listGen;
    }
}