summaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java
blob: 168f5cd767f60fc55afa852a9389dfd60baf5d19 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedapi;

import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.jdisc.Metric;
import com.yahoo.vespa.config.content.LoadTypeConfig;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.clientmetrics.ClientMetrics;
import com.yahoo.vespaclient.ClusterList;
import com.yahoo.vespaclient.config.FeederConfig;

import javax.naming.OperationNotSupportedException;
import java.util.Map;
import java.util.TreeMap;

public class FeedContext {
    
    private final SessionFactory factory;
    private final MessagePropertyProcessor propertyProcessor;
    private final DocumentTypeManager docTypeManager;
    private final ClusterList clusterList;
    private final ClientMetrics metrics;
    private final Metric metric;
    private Map<String, SharedSender> senders = new TreeMap<>();

    public static final Object sync = new Object();
    public static FeedContext instance = null;

    public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager, ClusterList clusterList, Metric metric) {
        this.propertyProcessor = propertyProcessor;
        this.factory = factory;
        docTypeManager = manager;
        this.clusterList = clusterList;
        metrics = new ClientMetrics();
        this.metric = metric;
    }

    public ClientMetrics getMetrics() {
        return metrics;
    }

    public ClusterList getClusterList() {
        return clusterList;
    }

    public SessionFactory getSessionFactory() {
        return factory;
    }

    public void shutdownSenders() {
        for (SharedSender s : senders.values()) {
            s.shutdown();
        }
    }

    public synchronized SharedSender getSharedSender(String route) {
        if (propertyProcessor.configChanged()) {
            Map<String, SharedSender> newSenders = new TreeMap<>();

            for (Map.Entry<String, SharedSender> sender : senders.entrySet()) {
                newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue(), metric));
            }

            shutdownSenders();
            senders = newSenders;
            propertyProcessor.setConfigChanged(false);
        }

        if (route == null) {
            route = propertyProcessor.getFeederOptions().getRoute();
        }

        SharedSender sender = senders.get(route);

        if (sender == null) {
            sender = new SharedSender(route, factory, sender, metric);
            senders.put(route, sender);
            metrics.addRouteMetricSet(sender.getMetrics());
        }

        return sender;
    }

    public MessagePropertyProcessor getPropertyProcessor() {
        return propertyProcessor;
    }

    public DocumentTypeManager getDocumentTypeManager() {
        return docTypeManager;
    }

    public static FeedContext getInstance(FeederConfig feederConfig, 
                                          LoadTypeConfig loadTypeConfig, 
                                          DocumentmanagerConfig documentmanagerConfig, 
                                          SlobroksConfig slobroksConfig,
                                          ClusterListConfig clusterListConfig,
                                          Metric metric) {
        synchronized (sync) {
            try {
                if (instance == null) {
                    MessagePropertyProcessor proc = new MessagePropertyProcessor(feederConfig, loadTypeConfig);
                    
                    if (System.getProperty("vespa.local", "false").equals("true")) {
                        // Use injected configs when running from Application. This means we cannot reconfigure
                        MessageBusSessionFactory mbusFactory = new MessageBusSessionFactory(proc, documentmanagerConfig, slobroksConfig);
                        instance = new FeedContext(proc,
                                                   mbusFactory,
                                                   mbusFactory.getAccess().getDocumentTypeManager(),
                                                   new ClusterList(clusterListConfig), metric);
                    }
                    else {
                        // Don't send configs to messagebus to make it self-subscribe instead as this instance
                        // survives reconfig :-/
                        // This code will die soon ...
                        MessageBusSessionFactory mbusFactory = new MessageBusSessionFactory(proc, null, null);
                        instance = new FeedContext(proc,
                                                   mbusFactory,
                                                   mbusFactory.getAccess().getDocumentTypeManager(),
                                                   new ClusterList("client"), metric);
                    }
                } else {
                    instance.getPropertyProcessor().configure(feederConfig, loadTypeConfig);
                }

                return instance;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

}