From 6788825d4f9f2a092af45bdf14447fa9a762151a Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Sun, 17 Mar 2024 12:34:31 +0100 Subject: Revert "Single searchcluster take 5" --- .../vespa/model/content/ContentSearchCluster.java | 150 +++++++++++++++------ 1 file changed, 110 insertions(+), 40 deletions(-) (limited to 'config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java') diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java index 01708333ed5..4d6f454f7e0 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java @@ -21,6 +21,7 @@ import com.yahoo.vespa.model.search.NodeSpec; import com.yahoo.vespa.model.search.SchemaDefinitionXMLHandler; import com.yahoo.vespa.model.search.SearchCluster; import com.yahoo.vespa.model.search.SearchNode; +import com.yahoo.vespa.model.search.StreamingSearchCluster; import com.yahoo.vespa.model.search.TransactionLogServer; import com.yahoo.vespa.model.search.Tuning; import org.w3c.dom.Element; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.LinkedHashMap; +import java.util.Optional; import java.util.Objects; import java.util.TreeMap; import java.util.function.Predicate; @@ -87,8 +89,7 @@ public class ContentSearchCluster extends TreeConfigProducer public Builder(Map documentDefinitions, Set globallyDistributedDocuments, - double fractionOfMemoryReserved, ResourceLimits resourceLimits) - { + double fractionOfMemoryReserved, ResourceLimits resourceLimits) { this.documentDefinitions = documentDefinitions; this.globallyDistributedDocuments = globallyDistributedDocuments; this.fractionOfMemoryReserved = fractionOfMemoryReserved; @@ -102,9 +103,12 @@ public class ContentSearchCluster extends TreeConfigProducer Boolean flushOnShutdownElem = clusterElem.childAsBoolean("engine.proton.flush-on-shutdown"); Boolean syncTransactionLog = clusterElem.childAsBoolean("engine.proton.sync-transactionlog"); - var search = new ContentSearchCluster(ancestor, clusterName, deployState.getProperties().featureFlags(), - documentDefinitions, globallyDistributedDocuments, - getFlushOnShutdown(flushOnShutdownElem), syncTransactionLog, + var search = new ContentSearchCluster(ancestor, clusterName, + deployState.getProperties().featureFlags(), + documentDefinitions, + globallyDistributedDocuments, + getFlushOnShutdown(flushOnShutdownElem), + syncTransactionLog, fractionOfMemoryReserved); ModelElement tuning = clusterElem.childByPath("engine.proton.tuning"); @@ -113,7 +117,8 @@ public class ContentSearchCluster extends TreeConfigProducer } search.setResourceLimits(resourceLimits); - buildSearchCluster(deployState, clusterElem, clusterName, search); + buildAllStreamingSearchClusters(deployState, clusterElem, clusterName, search); + buildIndexedSearchCluster(deployState, clusterElem, clusterName, search); return search; } @@ -125,18 +130,73 @@ public class ContentSearchCluster extends TreeConfigProducer return clusterElem.childAsDouble("engine.proton.query-timeout"); } - private void buildSearchCluster(DeployState deployState, ModelElement clusterElem, - String clusterName, ContentSearchCluster search) { + private static Schema findResponsibleSchema(DeployState deployState, String docTypeName) { + var schemas = deployState.getSchemas(); + for (var candidate : schemas) { + if (candidate.getName().equals(docTypeName)) { + return candidate; + } + } + return null; + } + + private void buildAllStreamingSearchClusters(DeployState deployState, ModelElement clusterElem, String clusterName, ContentSearchCluster search) { ModelElement docElem = clusterElem.child("documents"); - if (docElem == null) return; - Double visibilityDelay = clusterElem.childAsDouble("engine.proton.visibility-delay"); - if (visibilityDelay != null) { - search.setVisibilityDelay(visibilityDelay); + if (docElem == null) { + return; + } + + for (ModelElement docType : docElem.subElements("document")) { + String docTypeName = docType.stringAttribute("type"); + String mode = docType.stringAttribute("mode"); + var schema = findResponsibleSchema(deployState, docTypeName); + if ("streaming".equals(mode) && schema != null && !schema.isDocumentsOnly()) { + buildStreamingSearchCluster(deployState, clusterElem, clusterName, search, docType); + } } + } - IndexedSearchCluster isc = new IndexedSearchCluster(search, clusterName, 0, search, deployState.featureFlags()); - search.addSearchCluster(deployState, isc, getQueryTimeout(clusterElem), docElem.subElements("document")); + private void buildStreamingSearchCluster(DeployState deployState, ModelElement clusterElem, String clusterName, + ContentSearchCluster search, ModelElement docType) { + String docTypeName = docType.stringAttribute("type"); + StreamingSearchCluster cluster = new StreamingSearchCluster(search, + clusterName + "." + docTypeName, + 0, + docTypeName, + clusterName); + search.addSearchCluster(deployState, cluster, getQueryTimeout(clusterElem), List.of(docType)); + } + + private void buildIndexedSearchCluster(DeployState deployState, ModelElement clusterElem, + String clusterName, ContentSearchCluster search) { + List indexedDefs = getIndexedSchemas(clusterElem); + if (!indexedDefs.isEmpty()) { + IndexedSearchCluster isc = new IndexedSearchCluster(search, clusterName, 0, search, deployState.featureFlags()); + + Double visibilityDelay = clusterElem.childAsDouble("engine.proton.visibility-delay"); + if (visibilityDelay != null) { + search.setVisibilityDelay(visibilityDelay); + } + + search.addSearchCluster(deployState, isc, getQueryTimeout(clusterElem), indexedDefs); + } + } + + private List getIndexedSchemas(ModelElement clusterElem) { + List indexedDefs = new ArrayList<>(); + ModelElement docElem = clusterElem.child("documents"); + if (docElem == null) { + return indexedDefs; + } + + for (ModelElement docType : docElem.subElements("document")) { + String mode = docType.stringAttribute("mode"); + if ("index".equals(mode)) { + indexedDefs.add(docType); + } + } + return indexedDefs; } } @@ -187,12 +247,9 @@ public class ContentSearchCluster extends TreeConfigProducer cluster.setQueryTimeout(queryTimeout); } cluster.deriveFromSchemas(deployState); - if ( ! cluster.schemas().values().stream().allMatch(schemaInfo -> schemaInfo.getIndexMode() == SchemaInfo.IndexMode.STORE_ONLY)) { - addCluster(cluster); - } + addCluster(cluster); } - private void addSchemas(DeployState deployState, List searchDefs, SearchCluster sc) { for (ModelElement e : searchDefs) { SchemaDefinitionXMLHandler schemaDefinitionXMLHandler = new SchemaDefinitionXMLHandler(e); @@ -200,7 +257,6 @@ public class ContentSearchCluster extends TreeConfigProducer if (schema == null) throw new IllegalArgumentException("Schema '" + schemaDefinitionXMLHandler.getName() + "' referenced in " + this + " does not exist"); - if (schema.isDocumentsOnly()) continue; sc.add(new SchemaInfo(schema, e.stringAttribute("mode"), deployState.rankProfileRegistry(), null)); } @@ -226,20 +282,18 @@ public class ContentSearchCluster extends TreeConfigProducer * with indexing, null if it has both or none. */ public Boolean isStreaming() { - if (indexedCluster == null) return false; - boolean hasStreaming = indexedCluster.hasStreaming(); - if (indexedCluster.hasIndexed() == hasStreaming) return null; + boolean hasStreaming = false; + boolean hasIndexed = false; + for (var cluster : clusters.values()) { + if (cluster.hasStreaming()) + hasStreaming = true; + else + hasIndexed = true; + } + if (hasIndexed == hasStreaming) return null; return hasStreaming; } - public boolean hasStreaming() { - return (indexedCluster != null) && indexedCluster.hasStreaming(); - } - - public boolean hasIndexed() { - return (indexedCluster != null) && indexedCluster.hasIndexed(); - } - public List getSearchNodes() { return hasIndexedCluster() ? getIndexed().getSearchNodes() : nonIndexed; } @@ -302,6 +356,21 @@ public class ContentSearchCluster extends TreeConfigProducer this.redundancy = redundancy; } + private Optional findStreamingCluster(String docType) { + return getClusters().values().stream() + .filter(StreamingSearchCluster.class::isInstance) + .map(StreamingSearchCluster.class::cast) + .filter(ssc -> ssc.schemas().get(docType) != null) + .findFirst(); + } + + public List getStreamingClusters() { + return getClusters().values().stream() + .filter(StreamingSearchCluster.class::isInstance) + .map(StreamingSearchCluster.class::cast) + .toList(); + } + public List getDocumentTypesWithStreamingCluster() { return documentTypes(this::hasIndexingModeStreaming); } public List getDocumentTypesWithIndexedCluster() { return documentTypes(this::hasIndexingModeIndexed); } public List getDocumentTypesWithStoreOnly() { return documentTypes(this::hasIndexingModeStoreOnly); } @@ -313,13 +382,13 @@ public class ContentSearchCluster extends TreeConfigProducer } private boolean hasIndexingModeStreaming(NewDocumentType type) { - if (indexedCluster == null) return false; - return indexedCluster.schemas().get(type.getName()).getIndexMode() == SchemaInfo.IndexMode.STREAMING; + return findStreamingCluster(type.getFullName().getName()).isPresent(); } private boolean hasIndexingModeIndexed(NewDocumentType type) { - if (indexedCluster == null) return false; - return indexedCluster.schemas().get(type.getName()).getIndexMode() == SchemaInfo.IndexMode.INDEX; + return !hasIndexingModeStreaming(type) + && hasIndexedCluster() + && getIndexed().hasDocumentDB(type.getFullName().getName()); } private boolean hasIndexingModeStoreOnly(NewDocumentType type) { @@ -328,7 +397,7 @@ public class ContentSearchCluster extends TreeConfigProducer @Override public void getConfig(ProtonConfig.Builder builder) { - boolean hasAnyNonIndexedSchema = false; + boolean hasAnyNonIndexedCluster = false; for (NewDocumentType type : TopologicalDocumentTypeSorter.sort(documentDefinitions.values())) { ProtonConfig.Documentdb.Builder ddbB = new ProtonConfig.Documentdb.Builder(); String docTypeName = type.getFullName().getName(); @@ -340,13 +409,13 @@ public class ContentSearchCluster extends TreeConfigProducer ddbB.allocation.max_compact_buffers(defaultMaxCompactBuffers); if (hasIndexingModeStreaming(type)) { - hasAnyNonIndexedSchema = true; - indexedCluster.fillDocumentDBConfig(type.getFullName().getName(), ddbB); + hasAnyNonIndexedCluster = true; + findStreamingCluster(docTypeName).get().fillDocumentDBConfig(type.getFullName().getName(), ddbB); ddbB.mode(ProtonConfig.Documentdb.Mode.Enum.STREAMING); } else if (hasIndexingModeIndexed(type)) { - indexedCluster.fillDocumentDBConfig(type.getFullName().getName(), ddbB); + getIndexed().fillDocumentDBConfig(type.getFullName().getName(), ddbB); } else { - hasAnyNonIndexedSchema = true; + hasAnyNonIndexedCluster = true; ddbB.mode(ProtonConfig.Documentdb.Mode.Enum.STORE_ONLY); } if (globalDocType) { @@ -355,7 +424,7 @@ public class ContentSearchCluster extends TreeConfigProducer builder.documentdb(ddbB); } - if (hasAnyNonIndexedSchema) { + if (hasAnyNonIndexedCluster) { builder.feeding.concurrency(Math.min(1.0, defaultFeedConcurrency*2)); } else { builder.feeding.concurrency(defaultFeedConcurrency); @@ -409,6 +478,7 @@ public class ContentSearchCluster extends TreeConfigProducer getIndexed().getConfig(builder); } } + public Map getClusters() { return clusters; } public IndexedSearchCluster getIndexed() { return indexedCluster; } public boolean hasIndexedCluster() { return indexedCluster != null; } public IndexingDocproc getIndexingDocproc() { return indexingDocproc; } -- cgit v1.2.3