1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
import com.yahoo.messagebus.*;
import com.yahoo.messagebus.test.*;
import com.yahoo.config.*;
import com.yahoo.messagebus.routing.*;
import com.yahoo.messagebus.network.*;
import com.yahoo.messagebus.network.rpc.*;
import java.util.Arrays;
import java.util.logging.*;
public class JavaServer implements MessageHandler, ReplyHandler {
private static Logger log = Logger.getLogger(JavaServer.class.getName());
private IntermediateSession session;
private String name;
public JavaServer(RPCMessageBus mb, String name) {
session = mb.getMessageBus().createIntermediateSession("session", true, this, this);
this.name = name;
}
public void handleMessage(Message msg) {
msg.getTrace().trace(1, name + " (message)", false);
if (msg.getRoute() == null || !msg.getRoute().hasHops()) {
System.out.println("**** Server '" + name + "' replying.");
Reply reply = new EmptyReply();
msg.swapState(reply);
handleReply(reply);
} else {
System.out.println("**** Server '" + name + "' forwarding message.");
session.forward(msg);
}
}
public void handleReply(Reply reply) {
reply.getTrace().trace(1, name + " (reply)", false);
session.forward(reply);
}
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("usage: JavaServer <service prefix>");
System.exit(1);
}
String name = args[0];
SimpleProtocol protocol = new SimpleProtocol();
protocol.addPolicyFactory("All", new SimpleProtocol.PolicyFactory() {
@Override
public RoutingPolicy create(String param) {
return new AllPolicy();
}
});
try {
RPCMessageBus mb = new RPCMessageBus(
Arrays.<Protocol>asList(protocol),
new RPCNetworkParams()
.setIdentity(new Identity(name))
.setSlobrokConfigId("file:slobrok.cfg"),
"file:routing.cfg");
JavaServer server = new JavaServer(mb, name);
System.out.printf("java server started name=%s\n", name);
while (true) {
Thread.sleep(1000);
}
} catch (Exception e) {
log.log(Level.SEVERE, "JAVA-SERVER: Failed", e);
System.exit(1);
}
}
private static class AllPolicy implements RoutingPolicy {
@Override
public void select(RoutingContext ctx) {
ctx.addChildren(ctx.getMatchedRecipients());
}
@Override
public void merge(RoutingContext ctx) {
EmptyReply ret = new EmptyReply();
for (RoutingNodeIterator it = ctx.getChildIterator();
it.isValid(); it.next()) {
Reply reply = it.getReplyRef();
for (int i = 0; i < reply.getNumErrors(); ++i) {
ret.addError(reply.getError(i));
}
}
ctx.setReply(ret);
}
@Override
public void destroy() {
}
}
}
|