diff options
author | Harald Musum <musum@vespa.ai> | 2024-03-15 06:38:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-15 06:38:43 +0100 |
commit | ecc649e623765cd889435bd2165b1ebdc9b813f9 (patch) | |
tree | cea3c6db996f1dbf2948963d0fbaef82ededc91b /config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java | |
parent | 81f67c1769f29de49aefbb1f3065f02be04676a2 (diff) |
Revert "Single searchcluster take 2"
Diffstat (limited to 'config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java')
-rw-r--r-- | config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java | 142 |
1 files changed, 109 insertions, 33 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java index 469901bb542..4d6f454f7e0 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java @@ -21,6 +21,7 @@ import com.yahoo.vespa.model.search.NodeSpec; import com.yahoo.vespa.model.search.SchemaDefinitionXMLHandler; import com.yahoo.vespa.model.search.SearchCluster; import com.yahoo.vespa.model.search.SearchNode; +import com.yahoo.vespa.model.search.StreamingSearchCluster; import com.yahoo.vespa.model.search.TransactionLogServer; import com.yahoo.vespa.model.search.Tuning; import org.w3c.dom.Element; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.LinkedHashMap; +import java.util.Optional; import java.util.Objects; import java.util.TreeMap; import java.util.function.Predicate; @@ -102,12 +104,12 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> Boolean syncTransactionLog = clusterElem.childAsBoolean("engine.proton.sync-transactionlog"); var search = new ContentSearchCluster(ancestor, clusterName, - deployState.getProperties().featureFlags(), - documentDefinitions, - globallyDistributedDocuments, - getFlushOnShutdown(flushOnShutdownElem), - syncTransactionLog, - fractionOfMemoryReserved); + deployState.getProperties().featureFlags(), + documentDefinitions, + globallyDistributedDocuments, + getFlushOnShutdown(flushOnShutdownElem), + syncTransactionLog, + fractionOfMemoryReserved); ModelElement tuning = clusterElem.childByPath("engine.proton.tuning"); if (tuning != null) { @@ -115,7 +117,8 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> } search.setResourceLimits(resourceLimits); - buildSearchCluster(deployState, clusterElem, clusterName, search); + buildAllStreamingSearchClusters(deployState, clusterElem, clusterName, search); + buildIndexedSearchCluster(deployState, clusterElem, clusterName, search); return search; } @@ -127,18 +130,73 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> return clusterElem.childAsDouble("engine.proton.query-timeout"); } - private void buildSearchCluster(DeployState deployState, ModelElement clusterElem, - String clusterName, ContentSearchCluster search) { + private static Schema findResponsibleSchema(DeployState deployState, String docTypeName) { + var schemas = deployState.getSchemas(); + for (var candidate : schemas) { + if (candidate.getName().equals(docTypeName)) { + return candidate; + } + } + return null; + } + + private void buildAllStreamingSearchClusters(DeployState deployState, ModelElement clusterElem, String clusterName, ContentSearchCluster search) { ModelElement docElem = clusterElem.child("documents"); - if (docElem == null) return; - Double visibilityDelay = clusterElem.childAsDouble("engine.proton.visibility-delay"); - if (visibilityDelay != null) { - search.setVisibilityDelay(visibilityDelay); + if (docElem == null) { + return; } - IndexedSearchCluster isc = new IndexedSearchCluster(search, clusterName, 0, search, deployState.featureFlags()); - search.addSearchCluster(deployState, isc, getQueryTimeout(clusterElem), docElem.subElements("document")); + for (ModelElement docType : docElem.subElements("document")) { + String docTypeName = docType.stringAttribute("type"); + String mode = docType.stringAttribute("mode"); + var schema = findResponsibleSchema(deployState, docTypeName); + if ("streaming".equals(mode) && schema != null && !schema.isDocumentsOnly()) { + buildStreamingSearchCluster(deployState, clusterElem, clusterName, search, docType); + } + } + } + + private void buildStreamingSearchCluster(DeployState deployState, ModelElement clusterElem, String clusterName, + ContentSearchCluster search, ModelElement docType) { + String docTypeName = docType.stringAttribute("type"); + StreamingSearchCluster cluster = new StreamingSearchCluster(search, + clusterName + "." + docTypeName, + 0, + docTypeName, + clusterName); + search.addSearchCluster(deployState, cluster, getQueryTimeout(clusterElem), List.of(docType)); + } + + private void buildIndexedSearchCluster(DeployState deployState, ModelElement clusterElem, + String clusterName, ContentSearchCluster search) { + List<ModelElement> indexedDefs = getIndexedSchemas(clusterElem); + if (!indexedDefs.isEmpty()) { + IndexedSearchCluster isc = new IndexedSearchCluster(search, clusterName, 0, search, deployState.featureFlags()); + + Double visibilityDelay = clusterElem.childAsDouble("engine.proton.visibility-delay"); + if (visibilityDelay != null) { + search.setVisibilityDelay(visibilityDelay); + } + + search.addSearchCluster(deployState, isc, getQueryTimeout(clusterElem), indexedDefs); + } + } + + private List<ModelElement> getIndexedSchemas(ModelElement clusterElem) { + List<ModelElement> indexedDefs = new ArrayList<>(); + ModelElement docElem = clusterElem.child("documents"); + if (docElem == null) { + return indexedDefs; + } + + for (ModelElement docType : docElem.subElements("document")) { + String mode = docType.stringAttribute("mode"); + if ("index".equals(mode)) { + indexedDefs.add(docType); + } + } + return indexedDefs; } } @@ -189,12 +247,9 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> cluster.setQueryTimeout(queryTimeout); } cluster.deriveFromSchemas(deployState); - if ( ! cluster.schemas().values().stream().allMatch(schemaInfo -> schemaInfo.getIndexMode() == SchemaInfo.IndexMode.STORE_ONLY)) { - addCluster(cluster); - } + addCluster(cluster); } - private void addSchemas(DeployState deployState, List<ModelElement> searchDefs, SearchCluster sc) { for (ModelElement e : searchDefs) { SchemaDefinitionXMLHandler schemaDefinitionXMLHandler = new SchemaDefinitionXMLHandler(e); @@ -202,7 +257,6 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> if (schema == null) throw new IllegalArgumentException("Schema '" + schemaDefinitionXMLHandler.getName() + "' referenced in " + this + " does not exist"); - if (schema.isDocumentsOnly()) continue; sc.add(new SchemaInfo(schema, e.stringAttribute("mode"), deployState.rankProfileRegistry(), null)); } @@ -228,9 +282,15 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> * with indexing, null if it has both or none. */ public Boolean isStreaming() { - if (indexedCluster == null) return false; - boolean hasStreaming = indexedCluster.hasStreaming(); - if (indexedCluster.hasIndexed() == hasStreaming) return null; + boolean hasStreaming = false; + boolean hasIndexed = false; + for (var cluster : clusters.values()) { + if (cluster.hasStreaming()) + hasStreaming = true; + else + hasIndexed = true; + } + if (hasIndexed == hasStreaming) return null; return hasStreaming; } @@ -296,6 +356,21 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> this.redundancy = redundancy; } + private Optional<StreamingSearchCluster> findStreamingCluster(String docType) { + return getClusters().values().stream() + .filter(StreamingSearchCluster.class::isInstance) + .map(StreamingSearchCluster.class::cast) + .filter(ssc -> ssc.schemas().get(docType) != null) + .findFirst(); + } + + public List<StreamingSearchCluster> getStreamingClusters() { + return getClusters().values().stream() + .filter(StreamingSearchCluster.class::isInstance) + .map(StreamingSearchCluster.class::cast) + .toList(); + } + public List<NewDocumentType> getDocumentTypesWithStreamingCluster() { return documentTypes(this::hasIndexingModeStreaming); } public List<NewDocumentType> getDocumentTypesWithIndexedCluster() { return documentTypes(this::hasIndexingModeIndexed); } public List<NewDocumentType> getDocumentTypesWithStoreOnly() { return documentTypes(this::hasIndexingModeStoreOnly); } @@ -307,13 +382,13 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> } private boolean hasIndexingModeStreaming(NewDocumentType type) { - if (indexedCluster == null) return false; - return indexedCluster.schemas().get(type.getName()).getIndexMode() == SchemaInfo.IndexMode.STREAMING; + return findStreamingCluster(type.getFullName().getName()).isPresent(); } private boolean hasIndexingModeIndexed(NewDocumentType type) { - if (indexedCluster == null) return false; - return indexedCluster.schemas().get(type.getName()).getIndexMode() == SchemaInfo.IndexMode.INDEX; + return !hasIndexingModeStreaming(type) + && hasIndexedCluster() + && getIndexed().hasDocumentDB(type.getFullName().getName()); } private boolean hasIndexingModeStoreOnly(NewDocumentType type) { @@ -322,7 +397,7 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> @Override public void getConfig(ProtonConfig.Builder builder) { - boolean hasAnyNonIndexedSchema = false; + boolean hasAnyNonIndexedCluster = false; for (NewDocumentType type : TopologicalDocumentTypeSorter.sort(documentDefinitions.values())) { ProtonConfig.Documentdb.Builder ddbB = new ProtonConfig.Documentdb.Builder(); String docTypeName = type.getFullName().getName(); @@ -334,13 +409,13 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> ddbB.allocation.max_compact_buffers(defaultMaxCompactBuffers); if (hasIndexingModeStreaming(type)) { - hasAnyNonIndexedSchema = true; - indexedCluster.fillDocumentDBConfig(type.getFullName().getName(), ddbB); + hasAnyNonIndexedCluster = true; + findStreamingCluster(docTypeName).get().fillDocumentDBConfig(type.getFullName().getName(), ddbB); ddbB.mode(ProtonConfig.Documentdb.Mode.Enum.STREAMING); } else if (hasIndexingModeIndexed(type)) { - indexedCluster.fillDocumentDBConfig(type.getFullName().getName(), ddbB); + getIndexed().fillDocumentDBConfig(type.getFullName().getName(), ddbB); } else { - hasAnyNonIndexedSchema = true; + hasAnyNonIndexedCluster = true; ddbB.mode(ProtonConfig.Documentdb.Mode.Enum.STORE_ONLY); } if (globalDocType) { @@ -349,7 +424,7 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> builder.documentdb(ddbB); } - if (hasAnyNonIndexedSchema) { + if (hasAnyNonIndexedCluster) { builder.feeding.concurrency(Math.min(1.0, defaultFeedConcurrency*2)); } else { builder.feeding.concurrency(defaultFeedConcurrency); @@ -403,6 +478,7 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer> getIndexed().getConfig(builder); } } + public Map<String, SearchCluster> getClusters() { return clusters; } public IndexedSearchCluster getIndexed() { return indexedCluster; } public boolean hasIndexedCluster() { return indexedCluster != null; } public IndexingDocproc getIndexingDocproc() { return indexingDocproc; } |