aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java
blob: 6e2c88b3faa3c0df7115b7d87a40216b2de2f5fd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;

import com.yahoo.concurrent.CopyOnWriteHashMap;
import java.util.logging.Level;
import com.yahoo.messagebus.routing.RoutingPolicy;
import com.yahoo.text.Utf8String;

import java.util.logging.Logger;

/**
 * Implements a thread-safe repository for protocols and their routing policies. This manages an internal cache of
 * routing policies so that similarly referenced policy directives share the same instance of a policy.
 *
 * @author Simon Thoresen Hult
 */
public class ProtocolRepository {

    private static final Logger log = Logger.getLogger(ProtocolRepository.class.getName());
    private final CopyOnWriteHashMap<String, Protocol> protocols = new CopyOnWriteHashMap<>();
    private final CopyOnWriteHashMap<String, RoutingPolicy> routingPolicyCache = new CopyOnWriteHashMap<>();

    /**
     * Registers a protocol with this repository. This will overwrite any protocol that was registered earlier that has
     * the same name. If this method detects a protocol replacement, it will clear its internal routing policy cache.
     *
     * @param protocol The protocol to register.
     */
    public void putProtocol(Protocol protocol) {
        if (protocols.put(protocol.getName(), protocol) != null) {
            clearPolicyCache();
        }
    }

    /**
     * Returns whether or not this repository contains a protocol with the given name. Given the concurrent nature of
     * things, one should not invoke this method followed by {@link #getProtocol(String)} and expect the return value to
     * be non-null. Instead just get the protocol and compare it to null.
     *
     * @param name The name to check for.
     * @return True if the named protocol is registered.
     */
    public boolean hasProtocol(String name) {
        return protocols.containsKey(name);
    }

    /**
     * Returns the protocol whose name matches the given argument. This method will return null if no such protocol has
     * been registered.
     *
     * @param name The name of the protocol to return.
     * @return The protocol registered, or null.
     */
    public Protocol getProtocol(String name) {
        return protocols.get(name);
    }

    /**
     * Creates and returns a routing policy that matches the given arguments. If a routing policy has been created
     * previously using the exact same parameters, this method will returned that cached instance instead of creating
     * another. Not that when you replace a protocol using {@link #putProtocol(Protocol)} the policy cache is cleared.
     *
     * @param protocolName The name of the protocol whose routing policy to create.
     * @param policyName   The name of the routing policy to create.
     * @param policyParam  The parameter to pass to the routing policy constructor.
     * @return The created routing policy.
     */
    public RoutingPolicy getRoutingPolicy(String protocolName, String policyName, String policyParam) {
        String cacheKey = protocolName + "." + policyName + "." + policyParam;
        RoutingPolicy ret = routingPolicyCache.get(cacheKey);
        if (ret != null) {
            return ret;
        }
        synchronized (this) {
            ret = routingPolicyCache.get(cacheKey);
            if (ret != null) {
                return ret;
            }
            Protocol protocol = getProtocol(protocolName);
            if (protocol == null) {
                log.log(Level.SEVERE, "Protocol '" + protocolName + "' not supported.");
                return null;
            }
            try {
                ret = protocol.createPolicy(policyName, policyParam);
            } catch (RuntimeException e) {
                log.log(Level.SEVERE, "Protocol '" + protocolName + "' threw an exception: " + e.getMessage(), e);
                return null;
            }
            if (ret == null) {
                log.log(Level.SEVERE, "Protocol '" + protocolName + "' failed to create routing policy '" + policyName +
                                        "' with parameter '" + policyParam + "'.");
                return null;
            }
            routingPolicyCache.put(cacheKey, ret);
        }
        return ret;
    }

    public final RoutingPolicy getRoutingPolicy(Utf8String protocolName, String policyName, String policyParam) {
        return getRoutingPolicy(protocolName.toString(), policyName, policyParam);
    }

    /**
     * Clears the internal cache of routing policies.
     */
    public synchronized void clearPolicyCache() {
        for (RoutingPolicy policy : routingPolicyCache.values()) {
            policy.destroy();
        }
        routingPolicyCache.clear();
    }
}