aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternPolicy.java
blob: 3c64832b7f43984957da4428211b53af9ca9bd89 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;

import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.jrt.slobrok.api.SlobrokList;
import com.yahoo.messagebus.ErrorCode;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingContext;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * This policy implements the necessary logic to communicate with an external Vespa application and resolve its list of
 * recipients using that other application's slobrok servers.
 *
 * @author Simon Thoresen Hult
 */
public class ExternPolicy implements DocumentProtocolRoutingPolicy {

    private Supervisor orb = null;
    private Mirror mirror = null;
    private String pattern = null;
    private String session = null;
    private final String error;
    private int offset = 0;
    private int generation = 0;
    private final List<Hop> recipients = new ArrayList<>();
    private final AtomicBoolean destroyed = new AtomicBoolean(false);

    /**
     * Constructs a new instance of this policy. The argument given is the connection spec to the slobrok to use for
     * resolving recipients, as well as the pattern to use when querying. This constructor does _not_ wait for the
     * mirror to become ready.
     *
     * @param arg The slobrok connection spec.
     */
    public ExternPolicy(String arg) {
        if (arg == null || arg.length() == 0) {
            error = "Expected parameter, got empty string.";
            return;
        }
        String[] args = arg.split(";", 2);
        if (args.length != 2 || args[0].length() == 0 || args[1].length() == 0) {
            error = "Expected parameter on the form '<spec>;<pattern>', got '" + arg + "'.";
            return;
        }
        int pos = args[1].lastIndexOf('/');
        if (pos < 0) {
            error = "Expected pattern on the form '<service>/<session>', got '" + args[1] + "'.";
            return;
        }
        SlobrokList slobroks = new SlobrokList();
        slobroks.setup(args[0].split(","));
        pattern = args[1];
        session = pattern.substring(pos);
        orb = new Supervisor(new Transport("externpolicy"));
        orb.setDropEmptyBuffers(true);
        mirror = new Mirror(orb, slobroks);
        error = null;
    }

    /**
     * This is a safety mechanism to allow the constructor to fail and signal that it can not be used.
     *
     * @return The error string, or null if no error.
     */
    public String getError() {
        return error;
    }

    /**
     * Returns the slobrok mirror used by this policy to resolve external recipients.
     *
     * @return The external mirror.
     */
    public Mirror getMirror() {
        return mirror;
    }

    /**
     * Returns the appropriate recipient hop. This method provides synchronized access to the internal mirror.
     *
     * @return The recipient hop to use.
     */
    private synchronized Hop getRecipient() {
        update();
        if (recipients.isEmpty()) {
            return null;
        }
        int offset = ++this.offset & Integer.MAX_VALUE; // mask signed bit because of modulo
        return new Hop(recipients.get(offset % recipients.size()));
    }

    /**
     * Updates the list of matching recipients by querying the extern slobrok.
     */
    private void update() {
        int upd = mirror.updates();
        if (generation != upd) {
            generation = upd;
            recipients.clear();
            List<Mirror.Entry> arr = mirror.lookup(pattern);
            for (Mirror.Entry entry : arr) {
                recipients.add(Hop.parse(entry.getSpecString() + session));
            }
        }
    }

    @Override
    public void select(RoutingContext ctx) {
        if (error != null) {
            ctx.setError(DocumentProtocol.ERROR_POLICY_FAILURE, error);
        } else if (mirror.ready()) {
            Hop hop = getRecipient();
            if (hop != null) {
                Route route = new Route(ctx.getRoute());
                route.setHop(0, hop);
                ctx.addChild(route);
            } else {
                ctx.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE,
                             "Could not resolve any recipients from '" + pattern + "'.");
            }
        } else {
            ctx.setError(ErrorCode.APP_TRANSIENT_ERROR, "Extern slobrok not ready.");
        }
    }

    @Override
    public void merge(RoutingContext ctx) {
        DocumentProtocol.merge(ctx);
    }

    @Override
    public void destroy() {
        if (destroyed.getAndSet(true)) throw new RuntimeException("Already destroyed");
        mirror.shutdown();
        orb.transport().shutdown().join();
    }
}