aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LocalServicePolicy.java
blob: e835cc4e5e097dc84f7701cb17029a07324b2b47 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;

import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingContext;
import com.yahoo.messagebus.routing.VerbatimDirective;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * This policy implements the logic to prefer local services that matches a slobrok pattern.
 *
 * @author Simon Thoresen Hult
 */
public class LocalServicePolicy implements DocumentProtocolRoutingPolicy {

    private final String localAddress;
    private final Map<String, CacheEntry> cache = new HashMap<>();

    /**
     * Constructs a policy that will choose local services that match the slobrok pattern in which this policy occured.
     * If no local service can be found, this policy simply returns the asterisk to allow the network to choose any.
     *
     * @param param The address to use for this, if empty this will resolve to hostname.
     */
    LocalServicePolicy(String param) {
        localAddress = (param != null && param.length() > 0) ? param : null;
    }

    public void select(RoutingContext ctx) {
        Route route = new Route(ctx.getRoute());
        route.setHop(0, getRecipient(ctx));
        ctx.addChild(route);
    }

    public void merge(RoutingContext ctx) {
        DocumentProtocol.merge(ctx);
    }

    /**
     * Returns the appropriate recipient hop for the given routing context. This method provides synchronized access to
     * the internal cache.
     *
     * @param ctx The routing context.
     * @return The recipient hop to use.
     */
    private synchronized Hop getRecipient(RoutingContext ctx) {
        CacheEntry entry = update(ctx);
        if (entry.recipients.isEmpty()) {
            Hop hop = new Hop(ctx.getRoute().getHop(0));
            hop.setDirective(ctx.getDirectiveIndex(), new VerbatimDirective("*"));
            return hop;
        }
        if (++entry.offset >= entry.recipients.size()) {
            entry.offset = 0;
        }
        return new Hop(entry.recipients.get(entry.offset));
    }

    /**
     * Updates and returns the cache entry for the given routing context. This method assumes that synchronization is
     * handled outside of it.
     *
     * @param ctx The routing context.
     * @return The updated cache entry.
     */
    private CacheEntry update(RoutingContext ctx) {
        String key = getCacheKey(ctx);
        CacheEntry entry = cache.get(key);
        if (entry == null) {
            entry = new CacheEntry();
            cache.put(key, entry);
        }
        int upd = ctx.getMirror().updates();
        if (entry.generation != upd) {
            entry.generation = upd;
            entry.recipients.clear();

            List<Mirror.Entry> arr = ctx.getMirror().lookup(ctx.getHopPrefix() + "*" + ctx.getHopSuffix());
            String self = localAddress != null ? localAddress : toAddress(ctx.getMessageBus().getConnectionSpec());
            for (Mirror.Entry item : arr) {
                if (self.equals(toAddress(item.getSpecString()))) {
                    entry.recipients.add(Hop.parse(item.getName()));
                }
            }
        }
        return entry;
    }

    /**
     * Returns a cache key for this instance of the policy. Because behaviour is based on the hop in which the policy
     * occurs, the cache key is the hop string itself.
     *
     * @param ctx The routing context.
     * @return The cache key.
     */
    private String getCacheKey(RoutingContext ctx) {
        return ctx.getRoute().getHop(0).toString();
    }

    /**
     * Defines the necessary cache data.
     */
    private class CacheEntry {
        private final List<Hop> recipients = new ArrayList<>();
        private int generation = 0;
        private int offset = 0;
    }

    /**
     * Searches the given connection spec for a hostname or IP address. If an address is not found, this method returns
     * null.
     *
     * @param connection The connection spec to search.
     * @return The address, may be null.
     */
    private static String toAddress(String connection) {
        if (connection.startsWith("tcp/")) {
            int pos = connection.indexOf(':');
            if (pos > 4) {
                return connection.substring(4, pos);
            }
        }
        return null;
    }

    public void destroy() {
    }
}