aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentRouteSelectorPolicy.java
blob: fbc8c9deb8e3e428de3241e146570317eaf5b424 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;

import com.yahoo.config.subscription.ConfigSubscriber;
import com.yahoo.document.DocumentGet;
import com.yahoo.document.select.DocumentSelector;
import com.yahoo.document.select.Result;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingContext;

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * This policy is responsible for selecting among the given recipient routes according to the configured document
 * selection properties. To facilitate this the "routing" plugin in the vespa model builds a mapping from the route
 * names to a document selector and a feed name of every search cluster. This can very well be extended to include
 * storage at a later time.
 *
 * @author Simon Thoresen Hult
 */
public class DocumentRouteSelectorPolicy
        implements DocumentProtocolRoutingPolicy, ConfigSubscriber.SingleSubscriber<DocumentrouteselectorpolicyConfig> {

    private static final Logger log = Logger.getLogger(DocumentRouteSelectorPolicy.class.getName());
    private Map<String, DocumentSelector> config;
    private String error = "Not configured.";
    private ConfigSubscriber subscriber;

    /** This policy is constructed with the proper config at its time of creation. */
    public DocumentRouteSelectorPolicy(DocumentProtocolPoliciesConfig config) {
        Map<String, DocumentSelector> selectors = new HashMap<>();
        config.cluster().forEach((name, cluster) -> {
            try {
                selectors.put(name, new DocumentSelector(cluster.selector()));
            }
            catch (ParseException e) {
                throw new IllegalArgumentException("Error parsing selector '" + cluster.selector() +
                                                   "' for route '" + name +"'", e);
            }
        });
        this.config = Map.copyOf(selectors);
        this.error = null;
    }

    /**
     * This policy is constructed with a configuration identifier that can be subscribed to for the document selector
     * config. If the string is either null or empty it will default to the proper one.
     *
     * @param configId The configuration identifier to subscribe to.
     */
    public DocumentRouteSelectorPolicy(String configId) {
        subscriber = new ConfigSubscriber();
        subscriber.subscribe(this, DocumentrouteselectorpolicyConfig.class, configId);
    }

    /**
     * This is a safety mechanism to allow the constructor to fail and signal that it can not be used.
     *
     * @return The error string, or null if no error.
     */
    public synchronized String getError() {
        return error;
    }

    /**
     * This method is called when configuration arrives from the config server. The received config object is traversed
     * and a local map is constructed and swapped with the current {@link #config} map.
     *
     * @param cfg The configuration object given by subscription.
     */
    @Override
    public void configure(DocumentrouteselectorpolicyConfig cfg) {
        String error = null;
        Map<String, DocumentSelector> config = new HashMap<>();
        for (int i = 0; i < cfg.route().size(); i++) {
            DocumentrouteselectorpolicyConfig.Route route = cfg.route(i);
            if (route.selector().isEmpty()) {
                continue;
            }
            DocumentSelector selector;
            try {
                selector = new DocumentSelector(route.selector());
                log.log(Level.CONFIG, "Selector for route '" + route.name() + "' is '" + selector + "'");
            } catch (com.yahoo.document.select.parser.ParseException e) {
                error = "Error parsing selector '" + route.selector() + "' for route '" + route.name() + ": " +
                        e.getMessage();
                break;
            }
            config.put(route.name(), selector);
        }
        synchronized (this) {
            this.config = config;
            this.error = error;
        }
    }

    @Override
    public void select(RoutingContext context) {
        // Require that recipients have been configured.
        if (context.getNumRecipients() == 0) {
            context.setError(DocumentProtocol.ERROR_POLICY_FAILURE,
                             "No recipients configured.");
            return;
        }

        // Invoke private select method for each candidate recipient.
        synchronized (this) {
            if (error != null) {
                context.setError(DocumentProtocol.ERROR_POLICY_FAILURE, error);
                return;
            }
            for (int i = 0; i < context.getNumRecipients(); ++i) {
                Route recipient = context.getRecipient(i);
                String routeName = recipient.toString();
                if (select(context, routeName)) {
                    Route route = context.getMessageBus().getRoutingTable(DocumentProtocol.NAME).getRoute(routeName);
                    context.addChild(route != null ? route : recipient);
                }
            }
        }
        context.setSelectOnRetry(false);

        // Notify that no children were selected, this is to differentiate this from the NO_RECIPIENTS_FOR_ROUTE error
        // that message bus will generate if there are no recipients and no reply.
        if (context.getNumChildren() == 0) {
            context.setReply(new DocumentIgnoredReply());
        }
    }

    /**
     * This method runs the selector associated with the given location on the content of the message. If the selector
     * validates the location, this method returns true.
     *
     * @param context   the routing context that contains the necessary data.
     * @param routeName the candidate route whose selector to run.
     * @return whether or not to send to the given recipient.
     */
    private boolean select(RoutingContext context, String routeName) {
        if (config == null) {
            return true;
        }
        DocumentSelector selector = config.get(routeName);
        if (selector == null) {
            return true;
        }

        // Select based on message content.
        Message msg = context.getMessage();
        switch (msg.getType()) {

        case DocumentProtocol.MESSAGE_PUTDOCUMENT:
            return selector.accepts(((PutDocumentMessage)msg).getDocumentPut()) == Result.TRUE;

        case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
            return selector.accepts(((UpdateDocumentMessage)msg).getDocumentUpdate()) != Result.FALSE;

        case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: {
            RemoveDocumentMessage removeMsg = (RemoveDocumentMessage)msg;
            if (removeMsg.getDocumentId().hasDocType()) {
                return selector.accepts(removeMsg.getDocumentRemove()) != Result.FALSE;
            } else {
                return true;
            }
        }
        case DocumentProtocol.MESSAGE_GETDOCUMENT: {
            GetDocumentMessage getMsg = (GetDocumentMessage)msg;
            if (getMsg.getDocumentId().hasDocType()) {
                DocumentGet getOp = new DocumentGet(getMsg.getDocumentId());
                return selector.accepts(getOp) != Result.FALSE;
            } else {
                return true;
            }
        }

        default:
            return true;
        }
    }

    @Override
    public void merge(RoutingContext context) {
        DocumentProtocol.merge(context);
    }

    @Override
    public void destroy() {
        if (subscriber != null) {
            subscriber.close();
        }
    }

}