summaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternalSlobrokPolicy.java
blob: 39242bb6cabe5eba49e4c2f5a7dab83114e6fd8d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;

import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.config.subscription.ConfigSubscriber;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.jrt.slobrok.api.SlobrokList;
import com.yahoo.messagebus.routing.RoutingContext;
import com.yahoo.cloud.config.SlobroksConfig;

import java.util.List;
import java.util.Map;

/**
 * Abstract class for policies that allow you to specify which slobrok to use for the
 * routing.
 */
public abstract class ExternalSlobrokPolicy extends AsyncInitializationPolicy implements ConfigSubscriber.SingleSubscriber<SlobroksConfig> {
    String error;
    private Supervisor orb = null;
    private Mirror mirror = null;
    private SlobrokList slobroks = null;
    private boolean firstTry = true;
    private ConfigSubscriber subscriber;
    String[] configSources = null;
    private final static String slobrokConfigId = "admin/slobrok.0";


    ExternalSlobrokPolicy(Map<String, String> param) {
        super();

        String conf = param.get("config");
        if (conf != null) {
            configSources = conf.split(",");
        }

        String slbrk = param.get("slobroks");
        if (slbrk != null) {
            slobroks = new SlobrokList();
            slobroks.setup(slbrk.split(","));
        }

        if (slobroks != null || configSources != null) {
            needAsynchronousInitialization();
        }
    }

    @Override
    public void init() {
        if (slobroks != null) {
            orb = new Supervisor(new Transport());
            mirror = new Mirror(orb, slobroks);
        }

        if (configSources != null) {
            if (mirror == null) {
                orb = new Supervisor(new Transport());
                subscriber = subscribe(slobrokConfigId, new ConfigSourceSet(configSources));
            }
        }
    }

    private ConfigSubscriber subscribe(String configId, final ConfigSourceSet configSourceSet) {
        ConfigSubscriber subscriber = new ConfigSubscriber(configSourceSet);
        subscriber.subscribe(this, SlobroksConfig.class, configId);
        return subscriber;
    }

    public IMirror getMirror() {
        return mirror;
    }

    public  List<Mirror.Entry> lookup(RoutingContext context, String pattern) {
        IMirror mirror1 = (mirror != null ? mirror : context.getMirror());

        List<Mirror.Entry> arr = mirror1.lookup(pattern);

        if ((arr.isEmpty()) && firstTry) {
            synchronized(this)  {
                try {
                    int count = 0;
                    while (arr.isEmpty() && count < 100) {
                        Thread.sleep(50);
                        arr = mirror1.lookup(pattern);
                        count++;
                    }
                } catch (InterruptedException e) {
                }

            }
        }

        firstTry = false;
        return arr;
    }

    @Override
    public synchronized void configure(SlobroksConfig config) {
        String[] slist = new String[config.slobrok().size()];

        for(int i = 0; i < config.slobrok().size(); i++) {
            slist[i] = config.slobrok(i).connectionspec();
        }
        if (slobroks == null) {
            slobroks = new SlobrokList();
        }
        slobroks.setup(slist);
        if (mirror == null) {
            mirror = new Mirror(orb, slobroks);
        }

    }

    @Override
    public void destroy() {
        if (subscriber!=null) subscriber.close();
    }

}