aboutsummaryrefslogtreecommitdiffstats
path: root/config-model/src/main/java/com/yahoo/vespa/model/container/docproc/ContainerDocproc.java
blob: 6dda17ecdd8417fa1143ca1b2acf95d7cc12d9d7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.model.container.docproc;

import com.yahoo.collections.Pair;
import com.yahoo.config.docproc.DocprocConfig;
import com.yahoo.config.docproc.SchemamappingConfig;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.container.jdisc.config.SessionConfig;
import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext;
import com.yahoo.vespa.model.container.ContainerCluster;
import com.yahoo.vespa.model.container.component.ContainerSubsystem;
import com.yahoo.vespa.model.container.component.SystemBindingPattern;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Einar M R Rosenvinge
 * @author gjoranv
 */
public class ContainerDocproc extends ContainerSubsystem<DocprocChains> implements
        ContainerMbusConfig.Producer,
        SchemamappingConfig.Producer,
        DocprocConfig.Producer {

    public final Options options;
    private final Map<Pair<String, String>, String> fieldNameSchemaMap = new HashMap<>();

    public ContainerDocproc(ContainerCluster<?> cluster, DocprocChains chains) {
        this(cluster, chains, Options.empty());
    }

    public ContainerDocproc(ContainerCluster<?> cluster, DocprocChains chains, Options options) {
        this(cluster, chains, options, true);
    }

    private void addSource(ContainerCluster<?> cluster, String name, SessionConfig.Type.Enum type) {
        final MbusClient mbusClient = new MbusClient(name, type);
        mbusClient.addClientBindings(SystemBindingPattern.fromPattern("mbus://*/" + mbusClient.getSessionName()));
        cluster.addComponent(mbusClient);
    }

    public ContainerDocproc(ContainerCluster<?> cluster, DocprocChains chains, Options options, boolean addSourceClientProvider) {
        super(chains);
        assert (options != null) : "Null Options for " + this + " under cluster " + cluster.getName();
        this.options = options;

        if (addSourceClientProvider) {
            addSource(cluster, "source", SessionConfig.Type.SOURCE);
            addSource(cluster, MbusRequestContext.internalNoThrottledSource, SessionConfig.Type.INTERNAL);
        }
        cluster.addSearchAndDocprocBundles();
    }

    @Override
    public void getConfig(ContainerMbusConfig.Builder builder) {
        builder.maxpendingcount(getMaxMessagesInQueue());
    }

    private int getMaxMessagesInQueue() {
        if (options.maxMessagesInQueue != null) {
            return options.maxMessagesInQueue;
        }

        // maxmessagesinqueue has not been set for this node. let's try to give a good value anyway:
        return 2048 * getChains().allChains().allComponents().size();
        // intentionally high, getMaxQueueMbSize() will probably kick in before this one!
    }

    private Integer getMaxQueueTimeMs() {
        return options.maxQueueTimeMs;
    }

    @Override
    public void getConfig(DocprocConfig.Builder builder) {
        if (getMaxQueueTimeMs() != null) {
            builder.maxqueuetimems(getMaxQueueTimeMs());
        }
    }
    
    @Override
    public void getConfig(SchemamappingConfig.Builder builder) {
        Map<Pair<String, String>, String> allMappings = new HashMap<>();
        for (DocprocChain chain : getChains().allChains().allComponents()) {
            for (DocumentProcessor processor : chain.getInnerComponents()) {
                allMappings.putAll(fieldNameSchemaMap());
                allMappings.putAll(chain.fieldNameSchemaMap());
                allMappings.putAll(processor.fieldNameSchemaMap());
                for (Map.Entry<Pair<String,String>, String> e : allMappings.entrySet()) {
                    String doctype = e.getKey().getFirst();
                    String from = e.getKey().getSecond();
                    String to = e.getValue();
                    builder.fieldmapping(new SchemamappingConfig.Fieldmapping.Builder().
                            chain(chain.getId().stringValue()).
                            docproc(processor.getGlobalComponentId().stringValue()).
                            indocument(from).
                            inprocessor(to).
                            doctype(doctype!=null?doctype:""));
                }
                allMappings.clear();
            }
        }
    }
    
    /**
     * The field name schema map that applies to this whole chain
     * @return doctype,from → to
     */
    public Map<Pair<String,String>,String> fieldNameSchemaMap() {
        return fieldNameSchemaMap;
    }

    public static class Options {

        public final Integer maxMessagesInQueue;
        public final Integer maxQueueTimeMs;

        public final Double maxConcurrentFactor;
        public final Double documentExpansionFactor;
        public final Integer containerCoreMemory;

        public Options(Integer maxMessagesInQueue, Integer maxQueueTimeMs, Double maxConcurrentFactor, Double documentExpansionFactor, Integer containerCoreMemory) {
            this.maxMessagesInQueue = maxMessagesInQueue;
            this.maxQueueTimeMs = maxQueueTimeMs;
            this.maxConcurrentFactor = maxConcurrentFactor;
            this.documentExpansionFactor = documentExpansionFactor;
            this.containerCoreMemory = containerCoreMemory;
        }

        static Options empty() { return new Options(null, null, null, null, null); }

    }

}