summaryrefslogtreecommitdiffstats
path: root/config-model/src/main/java/com/yahoo/vespa/model/container/docproc/ContainerDocproc.java
blob: f2233680e9b0d08d7b0454ac55e13b1a7d5d1022 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.model.container.docproc;

import com.yahoo.collections.Pair;
import com.yahoo.config.docproc.DocprocConfig;
import com.yahoo.config.docproc.SchemamappingConfig;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.container.jdisc.config.SessionConfig;
import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext;
import com.yahoo.vespa.model.container.ContainerCluster;
import com.yahoo.vespa.model.container.component.ContainerSubsystem;
import com.yahoo.vespa.model.container.component.SystemBindingPattern;

import java.util.HashMap;
import java.util.Map;

/**
 * @author einarmr
 * @author gjoranv
 */
public class ContainerDocproc extends ContainerSubsystem<DocprocChains>
        implements 
            ContainerMbusConfig.Producer,
            SchemamappingConfig.Producer,
            DocprocConfig.Producer
{
    public final Options options;

    // Whether or not to prefer sending to a local node.
    private boolean preferLocalNode = false;

    // The number of nodes to use per client.
    private int numNodesPerClient = 0;

    private Map<Pair<String, String>, String> fieldNameSchemaMap = new HashMap<>();

    public ContainerDocproc(ContainerCluster cluster, DocprocChains chains) {
        this(cluster, chains, new Options( null, null, null, null, null, null));
    }

    public ContainerDocproc(ContainerCluster cluster, DocprocChains chains, Options options) {
        this(cluster, chains, options, true);
    }

    private void addSource(
            final ContainerCluster cluster, final String name, final SessionConfig.Type.Enum type) {
        final MbusClient mbusClient = new MbusClient(name, type);
        mbusClient.addClientBindings(SystemBindingPattern.fromPattern("mbus://*/" + mbusClient.getSessionName()));
        cluster.addComponent(mbusClient);
    }

    public ContainerDocproc(ContainerCluster cluster, DocprocChains chains, Options options, boolean addSourceClientProvider) {
        super(chains);
        assert (options != null) : "Null Options for " + this + " under cluster " + cluster.getName();
        this.options = options;

        if (addSourceClientProvider) {
            addSource(cluster, "source", SessionConfig.Type.SOURCE);
            addSource(cluster, MbusRequestContext.internalNoThrottledSource, SessionConfig.Type.INTERNAL);
        }
    }

    public boolean isPreferLocalNode() {
        return preferLocalNode;
    }

    public int getNumNodesPerClient() {
        return numNodesPerClient;
    }

    @Override
    public void getConfig(ContainerMbusConfig.Builder builder) {
        builder.maxpendingcount(getMaxMessagesInQueue());
    }

    private int getMaxMessagesInQueue() {
        if (options.maxMessagesInQueue != null) {
            return options.maxMessagesInQueue;
        }

        //maxmessagesinqueue has not been set for this node. let's try to give a good value anyway:
        return 2048 * getChains().allChains().allComponents().size();
        //intentionally high, getMaxQueueMbSize() will probably kick in before this one!
    }

    private Integer getMaxQueueMbSize() {
        return options.maxQueueMbSize;
    }

    private Integer getMaxQueueTimeMs() {
        return options.maxQueueTimeMs;
    }

    @Override
    public void getConfig(DocprocConfig.Builder builder) {
        if (getMaxQueueTimeMs() != null) {
            builder.maxqueuetimems(getMaxQueueTimeMs());
        }
    }
    
    @Override
    public void getConfig(SchemamappingConfig.Builder builder) {
        Map<Pair<String, String>, String> allMappings = new HashMap<>();
        for (DocprocChain chain : getChains().allChains().allComponents()) {
            for (DocumentProcessor processor : chain.getInnerComponents()) {
                allMappings.putAll(fieldNameSchemaMap());
                allMappings.putAll(chain.fieldNameSchemaMap());
                allMappings.putAll(processor.fieldNameSchemaMap());
                for (Map.Entry<Pair<String,String>, String> e : allMappings.entrySet()) {
                    String doctype = e.getKey().getFirst();
                    String from = e.getKey().getSecond();
                    String to = e.getValue();
                    builder.fieldmapping(new SchemamappingConfig.Fieldmapping.Builder().
                            chain(chain.getId().stringValue()).
                            docproc(processor.getGlobalComponentId().stringValue()).
                            indocument(from).
                            inprocessor(to).
                            doctype(doctype!=null?doctype:""));
                }
                allMappings.clear();
            }
        }
    }
    
    /**
     * The field name schema map that applies to this whole chain
     * @return doctype,from → to
     */
    public Map<Pair<String,String>,String> fieldNameSchemaMap() {
        return fieldNameSchemaMap;
    }

    public static class Options {

        public final Integer maxMessagesInQueue;
        public final Integer maxQueueMbSize;
        public final Integer maxQueueTimeMs;

        public final Double maxConcurrentFactor;
        public final Double documentExpansionFactor;
        public final Integer containerCoreMemory;

        public Options(Integer maxMessagesInQueue, Integer maxQueueMbSize, Integer maxQueueTimeMs, Double maxConcurrentFactor, Double documentExpansionFactor, Integer containerCoreMemory) {
            this.maxMessagesInQueue = maxMessagesInQueue;
            this.maxQueueMbSize = maxQueueMbSize;
            this.maxQueueTimeMs = maxQueueTimeMs;
            this.maxConcurrentFactor = maxConcurrentFactor;
            this.documentExpansionFactor = documentExpansionFactor;
            this.containerCoreMemory = containerCoreMemory;
        }
    }

}