aboutsummaryrefslogtreecommitdiffstats
path: root/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2024-03-17 12:35:09 +0100
committerGitHub <noreply@github.com>2024-03-17 12:35:09 +0100
commit8e2d974cc20b9b5922f93fc06d93c930bcf485fb (patch)
tree9701d8227774f4f957729b5185c9bdd51c99ac3b /config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java
parent2e3832e4cf34f65ed24485f08b24a3370b97ee07 (diff)
parent6788825d4f9f2a092af45bdf14447fa9a762151a (diff)
Merge pull request #30666 from vespa-engine/revert-30658-revert-30652-revert-30644-revert-30643-revert-30642-revert-30640-revert-30620-revert-30616-revert-30615-balder/single-searchcluster
Revert "Single searchcluster take 5"
Diffstat (limited to 'config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java')
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java150
1 files changed, 110 insertions, 40 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java
index 01708333ed5..4d6f454f7e0 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java
@@ -21,6 +21,7 @@ import com.yahoo.vespa.model.search.NodeSpec;
import com.yahoo.vespa.model.search.SchemaDefinitionXMLHandler;
import com.yahoo.vespa.model.search.SearchCluster;
import com.yahoo.vespa.model.search.SearchNode;
+import com.yahoo.vespa.model.search.StreamingSearchCluster;
import com.yahoo.vespa.model.search.TransactionLogServer;
import com.yahoo.vespa.model.search.Tuning;
import org.w3c.dom.Element;
@@ -30,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.LinkedHashMap;
+import java.util.Optional;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Predicate;
@@ -87,8 +89,7 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
public Builder(Map<String, NewDocumentType> documentDefinitions,
Set<NewDocumentType> globallyDistributedDocuments,
- double fractionOfMemoryReserved, ResourceLimits resourceLimits)
- {
+ double fractionOfMemoryReserved, ResourceLimits resourceLimits) {
this.documentDefinitions = documentDefinitions;
this.globallyDistributedDocuments = globallyDistributedDocuments;
this.fractionOfMemoryReserved = fractionOfMemoryReserved;
@@ -102,9 +103,12 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
Boolean flushOnShutdownElem = clusterElem.childAsBoolean("engine.proton.flush-on-shutdown");
Boolean syncTransactionLog = clusterElem.childAsBoolean("engine.proton.sync-transactionlog");
- var search = new ContentSearchCluster(ancestor, clusterName, deployState.getProperties().featureFlags(),
- documentDefinitions, globallyDistributedDocuments,
- getFlushOnShutdown(flushOnShutdownElem), syncTransactionLog,
+ var search = new ContentSearchCluster(ancestor, clusterName,
+ deployState.getProperties().featureFlags(),
+ documentDefinitions,
+ globallyDistributedDocuments,
+ getFlushOnShutdown(flushOnShutdownElem),
+ syncTransactionLog,
fractionOfMemoryReserved);
ModelElement tuning = clusterElem.childByPath("engine.proton.tuning");
@@ -113,7 +117,8 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
}
search.setResourceLimits(resourceLimits);
- buildSearchCluster(deployState, clusterElem, clusterName, search);
+ buildAllStreamingSearchClusters(deployState, clusterElem, clusterName, search);
+ buildIndexedSearchCluster(deployState, clusterElem, clusterName, search);
return search;
}
@@ -125,18 +130,73 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
return clusterElem.childAsDouble("engine.proton.query-timeout");
}
- private void buildSearchCluster(DeployState deployState, ModelElement clusterElem,
- String clusterName, ContentSearchCluster search) {
+ private static Schema findResponsibleSchema(DeployState deployState, String docTypeName) {
+ var schemas = deployState.getSchemas();
+ for (var candidate : schemas) {
+ if (candidate.getName().equals(docTypeName)) {
+ return candidate;
+ }
+ }
+ return null;
+ }
+
+ private void buildAllStreamingSearchClusters(DeployState deployState, ModelElement clusterElem, String clusterName, ContentSearchCluster search) {
ModelElement docElem = clusterElem.child("documents");
- if (docElem == null) return;
- Double visibilityDelay = clusterElem.childAsDouble("engine.proton.visibility-delay");
- if (visibilityDelay != null) {
- search.setVisibilityDelay(visibilityDelay);
+ if (docElem == null) {
+ return;
+ }
+
+ for (ModelElement docType : docElem.subElements("document")) {
+ String docTypeName = docType.stringAttribute("type");
+ String mode = docType.stringAttribute("mode");
+ var schema = findResponsibleSchema(deployState, docTypeName);
+ if ("streaming".equals(mode) && schema != null && !schema.isDocumentsOnly()) {
+ buildStreamingSearchCluster(deployState, clusterElem, clusterName, search, docType);
+ }
}
+ }
- IndexedSearchCluster isc = new IndexedSearchCluster(search, clusterName, 0, search, deployState.featureFlags());
- search.addSearchCluster(deployState, isc, getQueryTimeout(clusterElem), docElem.subElements("document"));
+ private void buildStreamingSearchCluster(DeployState deployState, ModelElement clusterElem, String clusterName,
+ ContentSearchCluster search, ModelElement docType) {
+ String docTypeName = docType.stringAttribute("type");
+ StreamingSearchCluster cluster = new StreamingSearchCluster(search,
+ clusterName + "." + docTypeName,
+ 0,
+ docTypeName,
+ clusterName);
+ search.addSearchCluster(deployState, cluster, getQueryTimeout(clusterElem), List.of(docType));
+ }
+
+ private void buildIndexedSearchCluster(DeployState deployState, ModelElement clusterElem,
+ String clusterName, ContentSearchCluster search) {
+ List<ModelElement> indexedDefs = getIndexedSchemas(clusterElem);
+ if (!indexedDefs.isEmpty()) {
+ IndexedSearchCluster isc = new IndexedSearchCluster(search, clusterName, 0, search, deployState.featureFlags());
+
+ Double visibilityDelay = clusterElem.childAsDouble("engine.proton.visibility-delay");
+ if (visibilityDelay != null) {
+ search.setVisibilityDelay(visibilityDelay);
+ }
+
+ search.addSearchCluster(deployState, isc, getQueryTimeout(clusterElem), indexedDefs);
+ }
+ }
+
+ private List<ModelElement> getIndexedSchemas(ModelElement clusterElem) {
+ List<ModelElement> indexedDefs = new ArrayList<>();
+ ModelElement docElem = clusterElem.child("documents");
+ if (docElem == null) {
+ return indexedDefs;
+ }
+
+ for (ModelElement docType : docElem.subElements("document")) {
+ String mode = docType.stringAttribute("mode");
+ if ("index".equals(mode)) {
+ indexedDefs.add(docType);
+ }
+ }
+ return indexedDefs;
}
}
@@ -187,12 +247,9 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
cluster.setQueryTimeout(queryTimeout);
}
cluster.deriveFromSchemas(deployState);
- if ( ! cluster.schemas().values().stream().allMatch(schemaInfo -> schemaInfo.getIndexMode() == SchemaInfo.IndexMode.STORE_ONLY)) {
- addCluster(cluster);
- }
+ addCluster(cluster);
}
-
private void addSchemas(DeployState deployState, List<ModelElement> searchDefs, SearchCluster sc) {
for (ModelElement e : searchDefs) {
SchemaDefinitionXMLHandler schemaDefinitionXMLHandler = new SchemaDefinitionXMLHandler(e);
@@ -200,7 +257,6 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
if (schema == null)
throw new IllegalArgumentException("Schema '" + schemaDefinitionXMLHandler.getName() + "' referenced in " +
this + " does not exist");
- if (schema.isDocumentsOnly()) continue;
sc.add(new SchemaInfo(schema, e.stringAttribute("mode"), deployState.rankProfileRegistry(), null));
}
@@ -226,20 +282,18 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
* with indexing, null if it has both or none.
*/
public Boolean isStreaming() {
- if (indexedCluster == null) return false;
- boolean hasStreaming = indexedCluster.hasStreaming();
- if (indexedCluster.hasIndexed() == hasStreaming) return null;
+ boolean hasStreaming = false;
+ boolean hasIndexed = false;
+ for (var cluster : clusters.values()) {
+ if (cluster.hasStreaming())
+ hasStreaming = true;
+ else
+ hasIndexed = true;
+ }
+ if (hasIndexed == hasStreaming) return null;
return hasStreaming;
}
- public boolean hasStreaming() {
- return (indexedCluster != null) && indexedCluster.hasStreaming();
- }
-
- public boolean hasIndexed() {
- return (indexedCluster != null) && indexedCluster.hasIndexed();
- }
-
public List<SearchNode> getSearchNodes() {
return hasIndexedCluster() ? getIndexed().getSearchNodes() : nonIndexed;
}
@@ -302,6 +356,21 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
this.redundancy = redundancy;
}
+ private Optional<StreamingSearchCluster> findStreamingCluster(String docType) {
+ return getClusters().values().stream()
+ .filter(StreamingSearchCluster.class::isInstance)
+ .map(StreamingSearchCluster.class::cast)
+ .filter(ssc -> ssc.schemas().get(docType) != null)
+ .findFirst();
+ }
+
+ public List<StreamingSearchCluster> getStreamingClusters() {
+ return getClusters().values().stream()
+ .filter(StreamingSearchCluster.class::isInstance)
+ .map(StreamingSearchCluster.class::cast)
+ .toList();
+ }
+
public List<NewDocumentType> getDocumentTypesWithStreamingCluster() { return documentTypes(this::hasIndexingModeStreaming); }
public List<NewDocumentType> getDocumentTypesWithIndexedCluster() { return documentTypes(this::hasIndexingModeIndexed); }
public List<NewDocumentType> getDocumentTypesWithStoreOnly() { return documentTypes(this::hasIndexingModeStoreOnly); }
@@ -313,13 +382,13 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
}
private boolean hasIndexingModeStreaming(NewDocumentType type) {
- if (indexedCluster == null) return false;
- return indexedCluster.schemas().get(type.getName()).getIndexMode() == SchemaInfo.IndexMode.STREAMING;
+ return findStreamingCluster(type.getFullName().getName()).isPresent();
}
private boolean hasIndexingModeIndexed(NewDocumentType type) {
- if (indexedCluster == null) return false;
- return indexedCluster.schemas().get(type.getName()).getIndexMode() == SchemaInfo.IndexMode.INDEX;
+ return !hasIndexingModeStreaming(type)
+ && hasIndexedCluster()
+ && getIndexed().hasDocumentDB(type.getFullName().getName());
}
private boolean hasIndexingModeStoreOnly(NewDocumentType type) {
@@ -328,7 +397,7 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
@Override
public void getConfig(ProtonConfig.Builder builder) {
- boolean hasAnyNonIndexedSchema = false;
+ boolean hasAnyNonIndexedCluster = false;
for (NewDocumentType type : TopologicalDocumentTypeSorter.sort(documentDefinitions.values())) {
ProtonConfig.Documentdb.Builder ddbB = new ProtonConfig.Documentdb.Builder();
String docTypeName = type.getFullName().getName();
@@ -340,13 +409,13 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
ddbB.allocation.max_compact_buffers(defaultMaxCompactBuffers);
if (hasIndexingModeStreaming(type)) {
- hasAnyNonIndexedSchema = true;
- indexedCluster.fillDocumentDBConfig(type.getFullName().getName(), ddbB);
+ hasAnyNonIndexedCluster = true;
+ findStreamingCluster(docTypeName).get().fillDocumentDBConfig(type.getFullName().getName(), ddbB);
ddbB.mode(ProtonConfig.Documentdb.Mode.Enum.STREAMING);
} else if (hasIndexingModeIndexed(type)) {
- indexedCluster.fillDocumentDBConfig(type.getFullName().getName(), ddbB);
+ getIndexed().fillDocumentDBConfig(type.getFullName().getName(), ddbB);
} else {
- hasAnyNonIndexedSchema = true;
+ hasAnyNonIndexedCluster = true;
ddbB.mode(ProtonConfig.Documentdb.Mode.Enum.STORE_ONLY);
}
if (globalDocType) {
@@ -355,7 +424,7 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
builder.documentdb(ddbB);
}
- if (hasAnyNonIndexedSchema) {
+ if (hasAnyNonIndexedCluster) {
builder.feeding.concurrency(Math.min(1.0, defaultFeedConcurrency*2));
} else {
builder.feeding.concurrency(defaultFeedConcurrency);
@@ -409,6 +478,7 @@ public class ContentSearchCluster extends TreeConfigProducer<AnyConfigProducer>
getIndexed().getConfig(builder);
}
}
+ public Map<String, SearchCluster> getClusters() { return clusters; }
public IndexedSearchCluster getIndexed() { return indexedCluster; }
public boolean hasIndexedCluster() { return indexedCluster != null; }
public IndexingDocproc getIndexingDocproc() { return indexingDocproc; }