summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md2
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/ClusterControllerConfig.java14
-rw-r--r--config-model/src/main/python/ES_Vespa_parser.py4
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java22
-rw-r--r--configdefinitions/src/vespa/fleetcontroller.def16
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/PreparedModelsBuilder.java25
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java16
-rw-r--r--docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Container.java1
-rw-r--r--docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerName.java1
-rw-r--r--docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerResources.java3
-rw-r--r--docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ProcessResult.java1
-rw-r--r--docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/RegistryCredentials.java1
-rw-r--r--document/src/tests/repo/documenttyperepo_test.cpp10
-rw-r--r--document/src/vespa/document/fieldset/fieldsetrepo.cpp4
-rw-r--r--document/src/vespa/document/repo/documenttyperepo.cpp4
-rw-r--r--document/src/vespa/document/repo/documenttyperepo.h32
-rw-r--r--document/src/vespa/document/select/bodyfielddetector.cpp4
-rw-r--r--jdisc_http_service/pom.xml5
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java76
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java13
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java46
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java25
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java117
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java655
-rw-r--r--jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java32
-rw-r--r--jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java36
-rw-r--r--jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java16
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperations.java14
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImpl.java8
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java2
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java7
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImplTest.java2
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java22
-rw-r--r--searchcore/src/apps/tests/persistenceconformance_test.cpp7
-rw-r--r--searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp30
-rw-r--r--searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp49
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ibucketmodifiedhandler.h15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ibucketstatechangedhandler.h16
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ibucketstatechangednotifier.h15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/iclusterstatechangedhandler.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/thread_utils.h12
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h41
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/lambdaflushtask.h31
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp108
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h19
57 files changed, 855 insertions, 817 deletions
diff --git a/README.md b/README.md
index 19d87fbad98..a2b64f20fc3 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@ Vespa is an engine for low-latency computation over large data sets.
It stores and indexes your data and executes distributed queries including evaluation of
machine-learned models over many data points in real time.
-Travis-CI build status: [![Build Status](https://travis-ci.org/vespa-engine/vespa.svg?branch=master)](https://travis-ci.org/vespa-engine/vespa)
+Screwdriver build status: [![Build Status](https://cd.screwdriver.cd/pipelines/6386/badge)](https://cd.screwdriver.cd/pipelines/6386)
## Table of contents
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterControllerConfig.java b/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterControllerConfig.java
index 882e56b82a3..ad5b472387f 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterControllerConfig.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/ClusterControllerConfig.java
@@ -131,5 +131,19 @@ public class ClusterControllerConfig extends AbstractConfigProducer<ClusterContr
if (minNodeRatioPerGroup != null) {
builder.min_node_ratio_per_group(minNodeRatioPerGroup);
}
+
+ setDefaultClusterFeedBlockLimits(builder);
+ }
+
+ private static void setDefaultClusterFeedBlockLimits(FleetcontrollerConfig.Builder builder) {
+ // TODO: Override these based on resource-limits in services.xml (if they are specified).
+ // TODO: Choose other defaults when this is default enabled.
+ // Note: The resource categories must match the ones used in host info reporting
+ // between content nodes and cluster controller:
+ // storage/src/vespa/storage/persistence/filestorage/service_layer_host_info_reporter.cpp
+ builder.cluster_feed_block_limit.put("memory", 0.79);
+ builder.cluster_feed_block_limit.put("disk", 0.79);
+ builder.cluster_feed_block_limit.put("attribute-enum-store", 0.89);
+ builder.cluster_feed_block_limit.put("attribute-multi-value", 0.89);
}
}
diff --git a/config-model/src/main/python/ES_Vespa_parser.py b/config-model/src/main/python/ES_Vespa_parser.py
index c695aae9fcb..b3398fd0403 100644
--- a/config-model/src/main/python/ES_Vespa_parser.py
+++ b/config-model/src/main/python/ES_Vespa_parser.py
@@ -55,11 +55,11 @@ class ElasticSearchParser:
for line in unparsed_mapping_file:
data = json.loads(line)
index = list(data.keys())[0]
- mappings = data[index]["mappings"][type]["properties"]
+ mappings = data[index]["mappings"]["properties"]
# Checking if some fields could be no-index
try:
- _all_enabled = data[index]["mappings"][type]["_all"]["enabled"]
+ _all_enabled = data[index]["mappings"]["_all"]["enabled"]
if not _all_enabled:
self._all = False
print(" > Not all fields in the document type '" + type + "' are searchable. Edit " + self.path + "schemas/" + type + ".sd to control which fields are searchable")
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java
index 41fb4a3d35b..137f5351299 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/FleetControllerClusterTest.java
@@ -81,13 +81,27 @@ public class FleetControllerClusterTest {
@Test
public void min_node_ratio_per_group_is_implicitly_zero_when_omitted() {
- FleetcontrollerConfig.Builder builder = new FleetcontrollerConfig.Builder();
+ var config = getConfigForBasicCluster();
+ assertEquals(0.0, config.min_node_ratio_per_group(), 0.01);
+ }
+
+ @Test
+ public void default_cluster_feed_block_limits_are_set() {
+ var config = getConfigForBasicCluster();
+ var limits = config.cluster_feed_block_limit();
+ assertEquals(4, limits.size());
+ assertEquals(0.79, limits.get("memory"), 0.0001);
+ assertEquals(0.79, limits.get("disk"), 0.0001);
+ assertEquals(0.89, limits.get("attribute-enum-store"), 0.0001);
+ assertEquals(0.89, limits.get("attribute-multi-value"), 0.0001);
+ }
+
+ FleetcontrollerConfig getConfigForBasicCluster() {
+ var builder = new FleetcontrollerConfig.Builder();
parse("<cluster id=\"storage\">\n" +
" <documents/>\n" +
"</cluster>").
getConfig(builder);
-
- FleetcontrollerConfig config = new FleetcontrollerConfig(builder);
- assertEquals(0.0, config.min_node_ratio_per_group(), 0.01);
+ return new FleetcontrollerConfig(builder);
}
}
diff --git a/configdefinitions/src/vespa/fleetcontroller.def b/configdefinitions/src/vespa/fleetcontroller.def
index 96b43a15c5e..3c88639d09d 100644
--- a/configdefinitions/src/vespa/fleetcontroller.def
+++ b/configdefinitions/src/vespa/fleetcontroller.def
@@ -181,3 +181,19 @@ enable_two_phase_cluster_state_transitions bool default=false
## Deprecated - not used
determine_buckets_from_bucket_space_metric bool default=true
+
+# If enabled, the cluster controller observes reported (categorized) resource usage from content nodes (via host info),
+# and decides whether external feed should be blocked (or unblocked) in the entire cluster.
+#
+# Each resource category has a limit, which is specified in cluster_feed_block_limit.
+# If one resource category from one content node is above the configured limit, feed is blocked.
+# This information is pushed to all distributor nodes via a new cluster state bundle.
+# The distributor nodes handle the actual feed blocking based on this information.
+enable_cluster_feed_block bool default=false
+
+# Contains all resource categories (key) with its corresponding feed block limit (value)
+# used when the cluster controller decides whether external feed should be blocked (or unblocked) in the entire cluster.
+#
+# The keys used must match the similar keys in the host info JSON structure.
+# All limits are numbers between 0.0 and 1.0.
+cluster_feed_block_limit{} double
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/PreparedModelsBuilder.java b/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/PreparedModelsBuilder.java
index 7606aacff15..f5cf7f0dc70 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/PreparedModelsBuilder.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/PreparedModelsBuilder.java
@@ -34,6 +34,8 @@ import com.yahoo.vespa.curator.Curator;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
@@ -153,9 +155,26 @@ public class PreparedModelsBuilder extends ModelsBuilder<PreparedModelsBuilder.P
}
private void validateModelHosts(HostValidator<ApplicationId> hostValidator, ApplicationId applicationId, Model model) {
- hostValidator.verifyHosts(applicationId, model.getHosts().stream()
- .map(HostInfo::getHostname)
- .collect(Collectors.toList()));
+ // Will retry here, since hosts used might not be in sync on all config servers (we wait for 2/3 servers
+ // to respond to deployments and deletions).
+ Instant end = Instant.now().plus(Duration.ofSeconds(1));
+ IllegalArgumentException exception;
+ do {
+ try {
+ hostValidator.verifyHosts(applicationId, model.getHosts().stream()
+ .map(HostInfo::getHostname)
+ .collect(Collectors.toList()));
+ return;
+ } catch (IllegalArgumentException e) {
+ exception = e;
+ log.log(Level.INFO, "Verifying hosts failed, will retry: " + e.getMessage());
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException interruptedException) {/* ignore */}
+ }
+ } while (Instant.now().isBefore(end));
+
+ throw exception;
}
/** The result of preparing a single model version */
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index 1851fb1ba23..ca28b04264d 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -188,7 +188,8 @@ public class SessionRepository {
public void addLocalSession(LocalSession session) {
long sessionId = session.getSessionId();
localSessionCache.put(sessionId, session);
- remoteSessionCache.putIfAbsent(sessionId, createRemoteSession(sessionId));
+ if (remoteSessionCache.get(sessionId) == null)
+ createRemoteSession(sessionId);
}
public LocalSession getLocalSession(long sessionId) {
@@ -315,10 +316,10 @@ public class SessionRepository {
public RemoteSession createRemoteSession(long sessionId) {
SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId);
RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient);
- remoteSessionCache.put(sessionId, session);
- loadSessionIfActive(session);
- updateSessionStateWatcher(sessionId, session);
- return session;
+ RemoteSession newSession = loadSessionIfActive(session).orElse(session);
+ remoteSessionCache.put(sessionId, newSession);
+ updateSessionStateWatcher(sessionId, newSession);
+ return newSession;
}
public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) {
@@ -423,15 +424,16 @@ public class SessionRepository {
}
}
- private void loadSessionIfActive(RemoteSession session) {
+ private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) {
for (ApplicationId applicationId : applicationRepo.activeApplications()) {
if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) {
log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it");
applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId());
log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")");
- return;
+ return Optional.ofNullable(remoteSessionCache.get(session.getSessionId()));
}
}
+ return Optional.empty();
}
void prepareRemoteSession(RemoteSession session) {
diff --git a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Container.java b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Container.java
index 1e861f230ce..277880dfe4e 100644
--- a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Container.java
+++ b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/Container.java
@@ -8,6 +8,7 @@ import java.util.Objects;
/**
* @author stiankri
*/
+// TODO: Move this to node-admin when docker-api module can be removed
public class Container {
public final String hostname;
public final DockerImage image;
diff --git a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerName.java b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerName.java
index b29ceb69ce1..53bfc59652c 100644
--- a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerName.java
+++ b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerName.java
@@ -9,6 +9,7 @@ import java.util.regex.Pattern;
*
* @author bakksjo
*/
+// TODO: Move this to node-admin when docker-api module can be removed
public class ContainerName {
private static final Pattern LEGAL_CONTAINER_NAME_PATTERN = Pattern.compile("^[a-zA-Z0-9-]+$");
private final String name;
diff --git a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerResources.java b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerResources.java
index c7b9d342043..c61bc3ed919 100644
--- a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerResources.java
+++ b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ContainerResources.java
@@ -6,6 +6,7 @@ import java.util.Objects;
/**
* @author valerijf
*/
+// TODO: Move this to node-admin when docker-api module can be removed
public class ContainerResources {
public static final ContainerResources UNLIMITED = ContainerResources.from(0, 0, 0);
@@ -28,7 +29,7 @@ public class ContainerResources {
/** The maximum amount, in bytes, of memory the container can use. */
private final long memoryBytes;
- ContainerResources(double cpus, int cpuShares, long memoryBytes) {
+ public ContainerResources(double cpus, int cpuShares, long memoryBytes) {
this.cpus = cpus;
this.cpuShares = cpuShares;
this.memoryBytes = memoryBytes;
diff --git a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ProcessResult.java b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ProcessResult.java
index dc49b806ee8..eb81b40434a 100644
--- a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ProcessResult.java
+++ b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/ProcessResult.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.hosted.dockerapi;
import java.util.Objects;
+// TODO: Consider replacing usages of this with CommandResult when docker-api module can be removed
public class ProcessResult {
private final int exitStatus;
private final String output;
diff --git a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/RegistryCredentials.java b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/RegistryCredentials.java
index 39a000a633f..130519ecfd6 100644
--- a/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/RegistryCredentials.java
+++ b/docker-api/src/main/java/com/yahoo/vespa/hosted/dockerapi/RegistryCredentials.java
@@ -8,6 +8,7 @@ import java.util.Objects;
*
* @author mpolden
*/
+// TODO: Move this to node-admin when docker-api module can be removed
public class RegistryCredentials {
public static final RegistryCredentials none = new RegistryCredentials("", "", "");
diff --git a/document/src/tests/repo/documenttyperepo_test.cpp b/document/src/tests/repo/documenttyperepo_test.cpp
index 0bc80ebcd16..7d17a3cfa11 100644
--- a/document/src/tests/repo/documenttyperepo_test.cpp
+++ b/document/src/tests/repo/documenttyperepo_test.cpp
@@ -392,10 +392,6 @@ TEST("requireThatDocumentsCanUseOtherDocumentTypes") {
EXPECT_TRUE(dynamic_cast<const DocumentType *>(&type));
}
-void storeId(set<int> *s, const DocumentType &type) {
- s->insert(type.getId());
-}
-
TEST("requireThatDocumentTypesCanBeIterated") {
DocumenttypesConfigBuilderHelper builder;
builder.document(doc_type_id, type_name,
@@ -405,7 +401,8 @@ TEST("requireThatDocumentTypesCanBeIterated") {
DocumentTypeRepo repo(builder.config());
set<int> ids;
- repo.forEachDocumentType(*makeClosure(storeId, &ids));
+ repo.forEachDocumentType(*DocumentTypeRepo::makeLambda(
+ [&ids](const DocumentType &type) { ids.insert(type.getId()); }));
EXPECT_EQUAL(3u, ids.size());
ASSERT_TRUE(ids.count(DataType::T_DOCUMENT));
@@ -436,8 +433,7 @@ TEST("requireThatBuildFromConfigWorks") {
TEST("requireThatStructsCanBeRecursive") {
DocumenttypesConfigBuilderHelper builder;
builder.document(doc_type_id, type_name,
- Struct(header_name).setId(header_id).addField(field_name,
- header_id),
+ Struct(header_name).setId(header_id).addField(field_name, header_id),
Struct(body_name));
DocumentTypeRepo repo(builder.config());
diff --git a/document/src/vespa/document/fieldset/fieldsetrepo.cpp b/document/src/vespa/document/fieldset/fieldsetrepo.cpp
index bf9c9923572..f16387810ce 100644
--- a/document/src/vespa/document/fieldset/fieldsetrepo.cpp
+++ b/document/src/vespa/document/fieldset/fieldsetrepo.cpp
@@ -119,7 +119,9 @@ FieldSetRepo::FieldSetRepo(const DocumentTypeRepo& repo)
: _doumentTyperepo(repo),
_configuredFieldSets()
{
- repo.forEachDocumentType(*vespalib::makeClosure(this, &FieldSetRepo::configureDocumentType));
+ repo.forEachDocumentType(*DocumentTypeRepo::makeLambda([&](const DocumentType &type) {
+ configureDocumentType(type);
+ }));
}
FieldSetRepo::~FieldSetRepo() = default;
diff --git a/document/src/vespa/document/repo/documenttyperepo.cpp b/document/src/vespa/document/repo/documenttyperepo.cpp
index 578d2999038..50ab6aaa646 100644
--- a/document/src/vespa/document/repo/documenttyperepo.cpp
+++ b/document/src/vespa/document/repo/documenttyperepo.cpp
@@ -587,9 +587,9 @@ DocumentTypeRepo::getAnnotationType(const DocumentType &doc_type, int32_t id) co
}
void
-DocumentTypeRepo::forEachDocumentType(Closure1<const DocumentType &> &c) const {
+DocumentTypeRepo::forEachDocumentType(Handler & handler) const {
for (const auto & entry : *_doc_types) {
- c.call(*entry.second->doc_type);
+ handler.handle(*entry.second->doc_type);
}
}
diff --git a/document/src/vespa/document/repo/documenttyperepo.h b/document/src/vespa/document/repo/documenttyperepo.h
index fd17bd5640a..4e3a1b07619 100644
--- a/document/src/vespa/document/repo/documenttyperepo.h
+++ b/document/src/vespa/document/repo/documenttyperepo.h
@@ -19,11 +19,21 @@ struct DataTypeRepo;
class DocumentType;
class DocumentTypeRepo {
- std::unique_ptr<internal::DocumentTypeMap> _doc_types;
- const DocumentType * _default;
-
public:
using DocumenttypesConfig = const internal::InternalDocumenttypesType;
+ struct Handler {
+ virtual ~Handler() = default;
+ virtual void handle(const DocumentType & type) = 0;
+ };
+
+
+ template <class FunctionType>
+ static std::unique_ptr<Handler>
+ makeLambda(FunctionType &&function)
+ {
+ return std::make_unique<LambdaHandler<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function));
+ }
// This one should only be used for testing. If you do not have any config.
explicit DocumentTypeRepo(const DocumentType & docType);
@@ -39,8 +49,22 @@ public:
const DataType *getDataType(const DocumentType &doc_type, int32_t id) const;
const DataType *getDataType(const DocumentType &doc_type, vespalib::stringref name) const;
const AnnotationType *getAnnotationType(const DocumentType &doc_type, int32_t id) const;
- void forEachDocumentType(vespalib::Closure1<const DocumentType &> &c) const;
+ void forEachDocumentType(Handler & handler) const;
const DocumentType *getDefaultDocType() const { return _default; }
+private:
+ template <class FunctionType>
+ class LambdaHandler : public Handler {
+ FunctionType _func;
+ public:
+ LambdaHandler(FunctionType &&func) : _func(std::move(func)) {}
+ LambdaHandler(const LambdaHandler &) = delete;
+ LambdaHandler & operator = (const LambdaHandler &) = delete;
+ ~LambdaHandler() override = default;
+ void handle(const DocumentType & type) override { _func(type); }
+ };
+
+ std::unique_ptr<internal::DocumentTypeMap> _doc_types;
+ const DocumentType * _default;
};
} // namespace document
diff --git a/document/src/vespa/document/select/bodyfielddetector.cpp b/document/src/vespa/document/select/bodyfielddetector.cpp
index 3d32813621d..d1961810c7a 100644
--- a/document/src/vespa/document/select/bodyfielddetector.cpp
+++ b/document/src/vespa/document/select/bodyfielddetector.cpp
@@ -28,7 +28,9 @@ BodyFieldDetector::detectFieldType(const FieldValueNode *expr, const DocumentTyp
void
BodyFieldDetector::visitFieldValueNode(const FieldValueNode& expr)
{
- _repo.forEachDocumentType(*makeClosure(this, &BodyFieldDetector::detectFieldType, &expr));
+ _repo.forEachDocumentType(*DocumentTypeRepo::makeLambda([&](const DocumentType &type) {
+ detectFieldType(&expr, type);
+ }));
}
diff --git a/jdisc_http_service/pom.xml b/jdisc_http_service/pom.xml
index 68a0f0636d3..2baba974b03 100644
--- a/jdisc_http_service/pom.xml
+++ b/jdisc_http_service/pom.xml
@@ -151,6 +151,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java
index 15a38f6eeae..a6b800a9279 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java
@@ -2,13 +2,6 @@
package com.yahoo.container.logging;
-import com.yahoo.slime.Cursor;
-import com.yahoo.slime.Slime;
-import com.yahoo.slime.SlimeUtils;
-import com.yahoo.slime.Type;
-import com.yahoo.yolean.Exceptions;
-
-import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
@@ -69,75 +62,6 @@ public class ConnectionLogEntry {
this.sslHandshakeFailureType = builder.sslHandshakeFailureType;
}
- public String toJson() {
- Slime slime = new Slime();
- Cursor cursor = slime.setObject();
- cursor.setString("id", id.toString());
- setTimestamp(cursor, timestamp, "timestamp");
-
- setDouble(cursor, durationSeconds, "duration");
- setString(cursor, peerAddress, "peerAddress");
- setInteger(cursor, peerPort, "peerPort");
- setString(cursor, localAddress, "localAddress");
- setInteger(cursor, localPort, "localPort");
- setString(cursor, remoteAddress, "remoteAddress");
- setInteger(cursor, remotePort, "remotePort");
- setLong(cursor, httpBytesReceived, "httpBytesReceived");
- setLong(cursor, httpBytesSent, "httpBytesSent");
- setLong(cursor, requests, "requests");
- setLong(cursor, responses, "responses");
- setString(cursor, sslProtocol, "ssl", "protocol");
- setString(cursor, sslSessionId, "ssl", "sessionId");
- setString(cursor, sslCipherSuite, "ssl", "cipherSuite");
- setString(cursor, sslPeerSubject, "ssl", "peerSubject");
- setTimestamp(cursor, sslPeerNotBefore, "ssl", "peerNotBefore");
- setTimestamp(cursor, sslPeerNotAfter, "ssl", "peerNotAfter");
- setString(cursor, sslSniServerName, "ssl", "sniServerName");
- setString(cursor, sslHandshakeFailureException, "ssl", "handshake-failure", "exception");
- setString(cursor, sslHandshakeFailureMessage, "ssl", "handshake-failure", "message");
- setString(cursor, sslHandshakeFailureType, "ssl", "handshake-failure", "type");
- return new String(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(slime)), StandardCharsets.UTF_8);
- }
-
- private void setString(Cursor cursor, String value, String... keys) {
- if(value != null) {
- subCursor(cursor, keys).setString(keys[keys.length - 1], value);
- }
- }
-
- private void setLong(Cursor cursor, Long value, String... keys) {
- if (value != null) {
- subCursor(cursor, keys).setLong(keys[keys.length - 1], value);
- }
- }
-
- private void setInteger(Cursor cursor, Integer value, String... keys) {
- if (value != null) {
- subCursor(cursor, keys).setLong(keys[keys.length - 1], value);
- }
- }
-
- private void setTimestamp(Cursor cursor, Instant timestamp, String... keys) {
- if (timestamp != null) {
- subCursor(cursor, keys).setString(keys[keys.length - 1], timestamp.toString());
- }
- }
-
- private void setDouble(Cursor cursor, Double value, String... keys) {
- if (value != null) {
- subCursor(cursor, keys).setDouble(keys[keys.length - 1], value);
- }
- }
-
- private static Cursor subCursor(Cursor cursor, String... keys) {
- Cursor subCursor = cursor;
- for (int i = 0; i < keys.length - 1; ++i) {
- Cursor field = subCursor.field(keys[i]);
- subCursor = field.type() != Type.NIX ? field : subCursor.setObject(keys[i]);
- }
- return subCursor;
- }
-
public static Builder builder(UUID id, Instant timestamp) {
return new Builder(id, timestamp);
}
diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java
index 62e53a5a514..968ba74b4f2 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java
@@ -5,23 +5,19 @@ package com.yahoo.container.logging;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
* @author mortent
*/
-public class FileConnectionLog extends AbstractComponent implements ConnectionLog, LogWriter<ConnectionLogEntry> {
+public class FileConnectionLog extends AbstractComponent implements ConnectionLog {
private static final Logger logger = Logger.getLogger(FileConnectionLog.class.getName());
private final ConnectionLogHandler logHandler;
@Inject
public FileConnectionLog(ConnectionLogConfig config) {
- logHandler = new ConnectionLogHandler(config.cluster(), this);
+ logHandler = new ConnectionLogHandler(config.cluster(), new JsonConnectionLogWriter());
}
@Override
@@ -34,9 +30,4 @@ public class FileConnectionLog extends AbstractComponent implements ConnectionLo
logHandler.shutdown();
}
- @Override
- // TODO serialize directly to outputstream
- public void write(ConnectionLogEntry entry, OutputStream outputStream) throws IOException {
- outputStream.write(entry.toJson().getBytes(StandardCharsets.UTF_8));
- }
} \ No newline at end of file
diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java
new file mode 100644
index 00000000000..ee780ad2a83
--- /dev/null
+++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java
@@ -0,0 +1,46 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.logging;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * @author bjorncs
+ */
+class FormatUtil {
+
+ private FormatUtil() {}
+
+ static void writeSecondsField(JsonGenerator generator, String fieldName, Instant instant) throws IOException {
+ writeSecondsField(generator, fieldName, instant.toEpochMilli());
+ }
+
+ static void writeSecondsField(JsonGenerator generator, String fieldName, Duration duration) throws IOException {
+ writeSecondsField(generator, fieldName, duration.toMillis());
+ }
+
+ static void writeSecondsField(JsonGenerator generator, String fieldName, double seconds) throws IOException {
+ writeSecondsField(generator, fieldName, (long)(seconds * 1000));
+ }
+
+ static void writeSecondsField(JsonGenerator generator, String fieldName, long milliseconds) throws IOException {
+ generator.writeFieldName(fieldName);
+ generator.writeRawValue(toSecondsString(milliseconds));
+ }
+
+ /** @return a string with number of seconds with 3 decimals */
+ static String toSecondsString(long milliseconds) {
+ StringBuilder builder = new StringBuilder().append(milliseconds / 1000L).append('.');
+ long decimals = milliseconds % 1000;
+ if (decimals < 100) {
+ builder.append('0');
+ if (decimals < 10) {
+ builder.append('0');
+ }
+ }
+ return builder.append(decimals).toString();
+ }
+}
diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java
index c6d177684ac..441e139bc67 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java
@@ -15,6 +15,8 @@ import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
+import static com.yahoo.container.logging.FormatUtil.writeSecondsField;
+
/**
* Formatting of an {@link AccessLogEntry} in the Vespa JSON access log format.
*
@@ -45,8 +47,8 @@ public class JSONFormatter implements LogWriter<RequestLogEntry> {
String peerAddress = entry.peerAddress().get();
generator.writeStringField("ip", peerAddress);
long time = entry.timestamp().get().toEpochMilli();
- writeSeconds(generator, "time", time);
- writeSeconds(generator, "duration", entry.duration().get().toMillis());
+ FormatUtil.writeSecondsField(generator, "time", time);
+ FormatUtil.writeSecondsField(generator, "duration", entry.duration().get());
generator.writeNumberField("responsesize", entry.contentSize().orElse(0));
generator.writeNumberField("code", entry.statusCode().orElse(0));
generator.writeStringField("method", entry.httpMethod().orElse(""));
@@ -185,23 +187,4 @@ public class JSONFormatter implements LogWriter<RequestLogEntry> {
if (rawPath == null) return null;
return rawQuery != null ? rawPath + "?" + rawQuery : rawPath;
}
-
- private static void writeSeconds(JsonGenerator generator, String fieldName, long milliseconds) throws IOException {
- generator.writeFieldName(fieldName);
- generator.writeRawValue(toSecondsString(milliseconds));
- }
-
- /** @return a string with number of seconds with 3 decimals */
- private static String toSecondsString(long milliseconds) {
- StringBuilder builder = new StringBuilder().append(milliseconds / 1000L).append('.');
- long decimals = milliseconds % 1000;
- if (decimals < 100) {
- builder.append('0');
- if (decimals < 10) {
- builder.append('0');
- }
- }
- return builder.append(decimals).toString();
- }
-
}
diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java
new file mode 100644
index 00000000000..394f87c07cc
--- /dev/null
+++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java
@@ -0,0 +1,117 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.logging;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * @author bjorncs
+ */
+class JsonConnectionLogWriter implements LogWriter<ConnectionLogEntry> {
+
+ private final JsonFactory jsonFactory = new JsonFactory(new ObjectMapper());
+
+ @Override
+ public void write(ConnectionLogEntry record, OutputStream outputStream) throws IOException {
+ try (JsonGenerator generator = createJsonGenerator(outputStream)) {
+ generator.writeStartObject();
+ generator.writeStringField("id", record.id());
+ generator.writeStringField("timestamp", record.timestamp().toString());
+
+ writeOptionalSeconds(generator, "duration", unwrap(record.durationSeconds()));
+ writeOptionalString(generator, "peerAddress", unwrap(record.peerAddress()));
+ writeOptionalInteger(generator, "peerPort", unwrap(record.peerPort()));
+ writeOptionalString(generator, "localAddress", unwrap(record.localAddress()));
+ writeOptionalInteger(generator, "localPort", unwrap(record.localPort()));
+ writeOptionalString(generator, "remoteAddress", unwrap(record.remoteAddress()));
+ writeOptionalInteger(generator, "remotePort", unwrap(record.remotePort()));
+ writeOptionalLong(generator, "httpBytesReceived", unwrap(record.httpBytesReceived()));
+ writeOptionalLong(generator, "httpBytesSent", unwrap(record.httpBytesSent()));
+ writeOptionalLong(generator, "requests", unwrap(record.requests()));
+ writeOptionalLong(generator, "responses", unwrap(record.responses()));
+
+ String sslProtocol = unwrap(record.sslProtocol());
+ String sslSessionId = unwrap(record.sslSessionId());
+ String sslCipherSuite = unwrap(record.sslCipherSuite());
+ String sslPeerSubject = unwrap(record.sslPeerSubject());
+ Instant sslPeerNotBefore = unwrap(record.sslPeerNotBefore());
+ Instant sslPeerNotAfter = unwrap(record.sslPeerNotAfter());
+ String sslSniServerName = unwrap(record.sslSniServerName());
+ String sslHandshakeFailureException = unwrap(record.sslHandshakeFailureException());
+ String sslHandshakeFailureMessage = unwrap(record.sslHandshakeFailureMessage());
+ String sslHandshakeFailureType = unwrap(record.sslHandshakeFailureType());
+
+ if (isAnyValuePresent(
+ sslProtocol, sslSessionId, sslCipherSuite, sslPeerSubject, sslPeerNotBefore, sslPeerNotAfter,
+ sslSniServerName, sslHandshakeFailureException, sslHandshakeFailureMessage, sslHandshakeFailureType)) {
+ generator.writeObjectFieldStart("ssl");
+
+ writeOptionalString(generator, "protocol", sslProtocol);
+ writeOptionalString(generator, "sessionId", sslSessionId);
+ writeOptionalString(generator, "cipherSuite", sslCipherSuite);
+ writeOptionalString(generator, "peerSubject", sslPeerSubject);
+ writeOptionalTimestamp(generator, "peerNotBefore", sslPeerNotBefore);
+ writeOptionalTimestamp(generator, "peerNotAfter", sslPeerNotAfter);
+ writeOptionalString(generator, "sniServerName", sslSniServerName);
+
+ if (isAnyValuePresent(sslHandshakeFailureException, sslHandshakeFailureMessage, sslHandshakeFailureType)) {
+ generator.writeObjectFieldStart("handshake-failure");
+ writeOptionalString(generator, "exception", sslHandshakeFailureException);
+ writeOptionalString(generator, "message", sslHandshakeFailureMessage);
+ writeOptionalString(generator, "type", sslHandshakeFailureType);
+ generator.writeEndObject();
+ }
+
+ generator.writeEndObject();
+ }
+ }
+ }
+
+ private void writeOptionalString(JsonGenerator generator, String name, String value) throws IOException {
+ if (value != null) {
+ generator.writeStringField(name, value);
+ }
+ }
+
+ private void writeOptionalInteger(JsonGenerator generator, String name, Integer value) throws IOException {
+ if (value != null) {
+ generator.writeNumberField(name, value);
+ }
+ }
+
+ private void writeOptionalLong(JsonGenerator generator, String name, Long value) throws IOException {
+ if (value != null) {
+ generator.writeNumberField(name, value);
+ }
+ }
+
+ private void writeOptionalTimestamp(JsonGenerator generator, String name, Instant value) throws IOException {
+ if (value != null) {
+ generator.writeStringField(name, value.toString());
+ }
+ }
+
+ private void writeOptionalSeconds(JsonGenerator generator, String name, Double value) throws IOException {
+ if (value != null) {
+ FormatUtil.writeSecondsField(generator, name, value);
+ }
+ }
+
+ private static boolean isAnyValuePresent(Object... values) { return Arrays.stream(values).anyMatch(Objects::nonNull); }
+ private static <T> T unwrap(Optional<T> maybeValue) { return maybeValue.orElse(null); }
+
+ private JsonGenerator createJsonGenerator(OutputStream outputStream) throws IOException {
+ return jsonFactory.createGenerator(outputStream, JsonEncoding.UTF8)
+ .configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false)
+ .configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false);
+ }
+}
diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java
index d0f31a6b866..bfb51d21c6c 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java
@@ -5,7 +5,6 @@ import com.yahoo.compress.ZstdOuputStream;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.io.NativeIO;
import com.yahoo.log.LogFileDb;
-import com.yahoo.protect.Process;
import com.yahoo.system.ProcessExecuter;
import com.yahoo.yolean.Exceptions;
@@ -19,13 +18,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
@@ -36,15 +33,74 @@ import java.util.zip.GZIPOutputStream;
* @author Bob Travis
* @author bjorncs
*/
-class LogFileHandler <LOGTYPE> {
+class LogFileHandler <LOGTYPE> {
- enum Compression {NONE, GZIP, ZSTD}
+ enum Compression { NONE, GZIP, ZSTD }
private final static Logger logger = Logger.getLogger(LogFileHandler.class.getName());
- private final ArrayBlockingQueue<Operation<LOGTYPE>> logQueue = new ArrayBlockingQueue<>(10000);
- final LogThread<LOGTYPE> logThread;
- @FunctionalInterface private interface Pollable<T> { Operation<T> poll() throws InterruptedException; }
+ private final Compression compression;
+ private final long[] rotationTimes;
+ private final String filePattern; // default to current directory, ms time stamp
+ private final String symlinkName;
+ private final ArrayBlockingQueue<LOGTYPE> logQueue = new ArrayBlockingQueue<>(100000);
+ private final AtomicBoolean rotate = new AtomicBoolean(false);
+ private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("logfilehandler.compression"));
+ private final NativeIO nativeIO = new NativeIO();
+ private final LogThread<LOGTYPE> logThread;
+
+ private volatile FileOutputStream currentOutputStream = null;
+ private volatile long nextRotationTime = 0;
+ private volatile String fileName;
+ private volatile long lastDropPosition = 0;
+
+ private final LogWriter<LOGTYPE> logWriter;
+
+ static private class LogThread<LOGTYPE> extends Thread {
+ final LogFileHandler<LOGTYPE> logFileHandler;
+ long lastFlush = 0;
+ LogThread(LogFileHandler<LOGTYPE> logFile) {
+ super("Logger");
+ setDaemon(true);
+ logFileHandler = logFile;
+ }
+ @Override
+ public void run() {
+ try {
+ storeLogRecords();
+ } catch (InterruptedException e) {
+ } catch (Exception e) {
+ com.yahoo.protect.Process.logAndDie("Failed storing log records", e);
+ }
+
+ logFileHandler.flush();
+ }
+
+ private void storeLogRecords() throws InterruptedException {
+ while (!isInterrupted()) {
+ LOGTYPE r = logFileHandler.logQueue.poll(100, TimeUnit.MILLISECONDS);
+ if(logFileHandler.rotate.get()) {
+ logFileHandler.internalRotateNow();
+ lastFlush = System.nanoTime();
+ logFileHandler.rotate.set(false);
+ }
+ if (r != null) {
+ logFileHandler.internalPublish(r);
+ flushIfOld(3, TimeUnit.SECONDS);
+ } else {
+ flushIfOld(100, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private void flushIfOld(long age, TimeUnit unit) {
+ long now = System.nanoTime();
+ if (TimeUnit.NANOSECONDS.toMillis(now - lastFlush) > unit.toMillis(age)) {
+ logFileHandler.flush();
+ lastFlush = now;
+ }
+ }
+ }
LogFileHandler(Compression compression, String filePattern, String rotationTimes, String symlinkName, LogWriter<LOGTYPE> logWriter) {
this(compression, filePattern, calcTimesMinutes(rotationTimes), symlinkName, logWriter);
@@ -54,433 +110,320 @@ class LogFileHandler <LOGTYPE> {
Compression compression,
String filePattern,
long[] rotationTimes,
- String symlinkName,
- LogWriter<LOGTYPE> logWriter) {
- this.logThread = new LogThread<LOGTYPE>(logWriter, filePattern, compression, rotationTimes, symlinkName, this::poll);
+ String symlinkName, LogWriter<LOGTYPE> logWriter) {
+ this.compression = compression;
+ this.filePattern = filePattern;
+ this.rotationTimes = rotationTimes;
+ this.symlinkName = (symlinkName != null && !symlinkName.isBlank()) ? symlinkName : null;
+ this.logWriter = logWriter;
+ this.logThread = new LogThread<>(this);
this.logThread.start();
}
- private Operation<LOGTYPE> poll() throws InterruptedException {
- return logQueue.poll(100, TimeUnit.MILLISECONDS);
- }
-
/**
* Sends logrecord to file, first rotating file if needed.
*
* @param r logrecord to publish
*/
public void publish(LOGTYPE r) {
- addOperation(new Operation<>(r));
- }
-
- public void flush() {
- addOperationAndWait(new Operation<>(Operation.Type.flush));
- }
-
- /**
- * Force file rotation now, independent of schedule.
- */
- void rotateNow() {
- addOperationAndWait(new Operation<>(Operation.Type.rotate));
- }
-
- public void close() {
- addOperationAndWait(new Operation<>(Operation.Type.close));
+ try {
+ logQueue.put(r);
+ } catch (InterruptedException e) {
+ }
}
- private void addOperation(Operation<LOGTYPE> op) {
+ public synchronized void flush() {
try {
- logQueue.put(op);
- } catch (InterruptedException e) {
+ FileOutputStream currentOut = this.currentOutputStream;
+ if (currentOut != null) {
+ if (compression == Compression.GZIP) {
+ long newPos = currentOut.getChannel().position();
+ if (newPos > lastDropPosition + 102400) {
+ nativeIO.dropPartialFileFromCache(currentOut.getFD(), lastDropPosition, newPos, true);
+ lastDropPosition = newPos;
+ }
+ } else {
+ currentOut.flush();
+ }
+ }
+ } catch (IOException e) {
+ logger.warning("Failed dropping from cache : " + Exceptions.toMessageString(e));
}
}
- private void addOperationAndWait(Operation<LOGTYPE> op) {
+ public void close() {
try {
- logQueue.put(op);
- op.countDownLatch.await();
- } catch (InterruptedException e) {
+ flush();
+ FileOutputStream currentOut = this.currentOutputStream;
+ if (currentOut != null) currentOut.close();
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Got error while closing log file", e);
}
}
- /**
- * Flushes all queued messages, interrupts the log thread in this and
- * waits for it to end before returning
- */
- void shutdown() {
- logThread.interrupt();
+ private void internalPublish(LOGTYPE r) {
+ // first check to see if new file needed.
+ // if so, use this.internalRotateNow() to do it
+
+ long now = System.currentTimeMillis();
+ if (nextRotationTime <= 0) {
+ nextRotationTime = getNextRotationTime(now); // lazy initialization
+ }
+ if (now > nextRotationTime || currentOutputStream == null) {
+ internalRotateNow();
+ }
try {
- logThread.executor.shutdownNow();
- logThread.executor.awaitTermination(600, TimeUnit.SECONDS);
- logThread.join();
- } catch (InterruptedException e) {
+ FileOutputStream out = this.currentOutputStream;
+ logWriter.write(r, out);
+ out.write('\n');
+ } catch (IOException e) {
+ logger.warning("Failed writing log record: " + Exceptions.toMessageString(e));
}
}
/**
- * Calculate rotation times array, given times in minutes, as "0 60 ..."
+ * Find next rotation after specified time.
+ *
+ * @param now the specified time; if zero, current time is used.
+ * @return the next rotation time
*/
- private static long[] calcTimesMinutes(String times) {
- ArrayList<Long> list = new ArrayList<>(50);
- int i = 0;
- boolean etc = false;
-
- while (i < times.length()) {
- if (times.charAt(i) == ' ') {
- i++;
- continue;
- } // skip spaces
- int j = i; // start of string
- i = times.indexOf(' ', i);
- if (i == -1) i = times.length();
- if (times.charAt(j) == '.' && times.substring(j, i).equals("...")) { // ...
- etc = true;
+ long getNextRotationTime (long now) {
+ if (now <= 0) {
+ now = System.currentTimeMillis();
+ }
+ long nowTod = timeOfDayMillis(now);
+ long next = 0;
+ for (long rotationTime : rotationTimes) {
+ if (nowTod < rotationTime) {
+ next = rotationTime-nowTod + now;
break;
}
- list.add(Long.valueOf(times.substring(j, i)));
}
-
- int size = list.size();
- long[] longtimes = new long[size];
- for (i = 0; i < size; i++) {
- longtimes[i] = list.get(i) // pick up value in minutes past midnight
- * 60000; // and multiply to get millis
+ if (next == 0) { // didn't find one -- use 1st time 'tomorrow'
+ next = rotationTimes[0]+lengthOfDayMillis-nowTod + now;
}
- if (etc) { // fill out rest of day, same as final interval
- long endOfDay = 24 * 60 * 60 * 1000;
- long lasttime = longtimes[size - 1];
- long interval = lasttime - longtimes[size - 2];
- long moreneeded = (endOfDay - lasttime) / interval;
- if (moreneeded > 0) {
- int newsize = size + (int) moreneeded;
- long[] temp = new long[newsize];
- for (i = 0; i < size; i++) {
- temp[i] = longtimes[i];
- }
- while (size < newsize) {
- lasttime += interval;
- temp[size++] = lasttime;
- }
- longtimes = temp;
+ return next;
+ }
+
+ void waitDrained() {
+ while(! logQueue.isEmpty()) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
}
}
+ flush();
+ }
- return longtimes;
+ private void checkAndCreateDir(String pathname) {
+ int lastSlash = pathname.lastIndexOf("/");
+ if (lastSlash > -1) {
+ String pathExcludingFilename = pathname.substring(0, lastSlash);
+ File filepath = new File(pathExcludingFilename);
+ if (!filepath.exists()) {
+ filepath.mkdirs();
+ }
+ }
}
/**
- * Only for unit testing. Do not use.
+ * Force file rotation now, independent of schedule.
*/
- String getFileName() {
- return logThread.fileName;
+ void rotateNow () {
+ rotate.set(true);
}
- /**
- * Handle logging and file operations
- */
- static class LogThread<LOGTYPE> extends Thread {
- private final Pollable<LOGTYPE> operationProvider;
- long lastFlush = 0;
- private FileOutputStream currentOutputStream = null;
- private long nextRotationTime = 0;
- private final String filePattern; // default to current directory, ms time stamp
- private volatile String fileName;
- private long lastDropPosition = 0;
- private final LogWriter<LOGTYPE> logWriter;
- private final Compression compression;
- private final long[] rotationTimes;
- private final String symlinkName;
- private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("logfilehandler.compression"));
- private final NativeIO nativeIO = new NativeIO();
-
-
- LogThread(LogWriter<LOGTYPE> logWriter,
- String filePattern,
- Compression compression,
- long[] rotationTimes,
- String symlinkName,
- Pollable<LOGTYPE> operationProvider) {
- super("Logger");
- setDaemon(true);
- this.logWriter = logWriter;
- this.filePattern = filePattern;
- this.compression = compression;
- this.rotationTimes = rotationTimes;
- this.symlinkName = (symlinkName != null && !symlinkName.isBlank()) ? symlinkName : null;
- this.operationProvider = operationProvider;
- }
+ // Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as
+ // isInterrupted() returns false after interruption in p.waitFor
+ private void internalRotateNow() {
+ // figure out new file name, then
+ // use super.setOutputStream to switch to a new file
- @Override
- public void run() {
- try {
- handleLogOperations();
- } catch (InterruptedException e) {
- } catch (Exception e) {
- Process.logAndDie("Failed storing log records", e);
- }
+ String oldFileName = fileName;
+ long now = System.currentTimeMillis();
+ fileName = LogFormatter.insertDate(filePattern, now);
+ flush();
- internalFlush();
+ try {
+ checkAndCreateDir(fileName);
+ FileOutputStream os = new FileOutputStream(fileName, true); // append mode, for safety
+ currentOutputStream = os;
+ lastDropPosition = 0;
+ LogFileDb.nowLoggingTo(fileName);
}
-
- private void handleLogOperations() throws InterruptedException {
- while (!isInterrupted()) {
- Operation<LOGTYPE> r = operationProvider.poll();
- if (r != null) {
- if (r.type == Operation.Type.flush) {
- internalFlush();
- } else if (r.type == Operation.Type.close) {
- internalClose();
- } else if (r.type == Operation.Type.rotate) {
- internalRotateNow();
- lastFlush = System.nanoTime();
- } else if (r.type == Operation.Type.log) {
- internalPublish(r.log.get());
- flushIfOld(3, TimeUnit.SECONDS);
- }
- r.countDownLatch.countDown();
- } else {
- flushIfOld(100, TimeUnit.MILLISECONDS);
- }
- }
+ catch (IOException e) {
+ throw new RuntimeException("Couldn't open log file '" + fileName + "'", e);
}
- private void flushIfOld(long age, TimeUnit unit) {
- long now = System.nanoTime();
- if (TimeUnit.NANOSECONDS.toMillis(now - lastFlush) > unit.toMillis(age)) {
- internalFlush();
- lastFlush = now;
- }
- }
+ createSymlinkToCurrentFile();
- private synchronized void internalFlush() {
- try {
- FileOutputStream currentOut = this.currentOutputStream;
- if (currentOut != null) {
- if (compression == Compression.GZIP) {
- long newPos = currentOut.getChannel().position();
- if (newPos > lastDropPosition + 102400) {
- nativeIO.dropPartialFileFromCache(currentOut.getFD(), lastDropPosition, newPos, true);
- lastDropPosition = newPos;
- }
- } else {
- currentOut.flush();
- }
+ nextRotationTime = 0; //figure it out later (lazy evaluation)
+ if ((oldFileName != null)) {
+ File oldFile = new File(oldFileName);
+ if (oldFile.exists()) {
+ if (compression != Compression.NONE) {
+ executor.execute(() -> runCompression(oldFile, compression));
+ } else {
+ nativeIO.dropFileFromCache(oldFile);
}
- } catch (IOException e) {
- logger.warning("Failed dropping from cache : " + Exceptions.toMessageString(e));
- }
- }
-
- private void internalClose() {
- try {
- internalFlush();
- FileOutputStream currentOut = this.currentOutputStream;
- if (currentOut != null) currentOut.close();
- } catch (Exception e) {
- logger.log(Level.WARNING, "Got error while closing log file", e);
- }
- }
-
- private void internalPublish(LOGTYPE r) {
- // first check to see if new file needed.
- // if so, use this.internalRotateNow() to do it
-
- long now = System.currentTimeMillis();
- if (nextRotationTime <= 0) {
- nextRotationTime = getNextRotationTime(now); // lazy initialization
- }
- if (now > nextRotationTime || currentOutputStream == null) {
- internalRotateNow();
- }
- try {
- FileOutputStream out = this.currentOutputStream;
- logWriter.write(r, out);
- out.write('\n');
- } catch (IOException e) {
- logger.warning("Failed writing log record: " + Exceptions.toMessageString(e));
}
}
+ }
- /**
- * Find next rotation after specified time.
- *
- * @param now the specified time; if zero, current time is used.
- * @return the next rotation time
- */
- long getNextRotationTime(long now) {
- if (now <= 0) {
- now = System.currentTimeMillis();
- }
- long nowTod = timeOfDayMillis(now);
- long next = 0;
- for (long rotationTime : rotationTimes) {
- if (nowTod < rotationTime) {
- next = rotationTime - nowTod + now;
- break;
- }
- }
- if (next == 0) { // didn't find one -- use 1st time 'tomorrow'
- next = rotationTimes[0] + lengthOfDayMillis - nowTod + now;
- }
- return next;
+ private static void runCompression(File oldFile, Compression compression) {
+ switch (compression) {
+ case ZSTD:
+ runCompressionZstd(oldFile.toPath());
+ break;
+ case GZIP:
+ runCompressionGzip(oldFile);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression " + compression);
}
+ }
- private void checkAndCreateDir(String pathname) {
- int lastSlash = pathname.lastIndexOf("/");
- if (lastSlash > -1) {
- String pathExcludingFilename = pathname.substring(0, lastSlash);
- File filepath = new File(pathExcludingFilename);
- if (!filepath.exists()) {
- filepath.mkdirs();
+ private static void runCompressionZstd(Path oldFile) {
+ try {
+ Path compressedFile = Paths.get(oldFile.toString() + ".zst");
+ Files.createFile(compressedFile);
+ int bufferSize = 0x400000; // 4M
+ byte[] buffer = new byte[bufferSize];
+ try (ZstdOuputStream out = new ZstdOuputStream(Files.newOutputStream(compressedFile), bufferSize);
+ InputStream in = Files.newInputStream(oldFile)) {
+ int read;
+ while ((read = in.read(buffer)) >= 0) {
+ out.write(buffer, 0, read);
}
+ out.flush();
}
+ Files.delete(oldFile);
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "Failed to compress log file with zstd: " + oldFile, e);
}
+ }
+ private static void runCompressionGzip(File oldFile) {
+ File gzippedFile = new File(oldFile.getPath() + ".gz");
+ try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000);
+ FileInputStream inputStream = new FileInputStream(oldFile))
+ {
+ byte [] buffer = new byte[0x400000]; // 4M buffer
- // Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as
- // isInterrupted() returns false after interruption in p.waitFor
- private void internalRotateNow() {
- // figure out new file name, then
- // use super.setOutputStream to switch to a new file
-
- String oldFileName = fileName;
- long now = System.currentTimeMillis();
- fileName = LogFormatter.insertDate(filePattern, now);
- internalFlush();
-
- try {
- checkAndCreateDir(fileName);
- FileOutputStream os = new FileOutputStream(fileName, true); // append mode, for safety
- currentOutputStream = os;
- lastDropPosition = 0;
- LogFileDb.nowLoggingTo(fileName);
- } catch (IOException e) {
- throw new RuntimeException("Couldn't open log file '" + fileName + "'", e);
+ long totalBytesRead = 0;
+ NativeIO nativeIO = new NativeIO();
+ for (int read = inputStream.read(buffer); read > 0; read = inputStream.read(buffer)) {
+ compressor.write(buffer, 0, read);
+ nativeIO.dropPartialFileFromCache(inputStream.getFD(), totalBytesRead, read, false);
+ totalBytesRead += read;
}
+ compressor.finish();
+ compressor.flush();
- createSymlinkToCurrentFile();
+ oldFile.delete();
+ nativeIO.dropFileFromCache(gzippedFile);
+ } catch (IOException e) {
+ logger.warning("Got '" + e + "' while compressing '" + oldFile.getPath() + "'.");
+ }
+ }
- nextRotationTime = 0; //figure it out later (lazy evaluation)
- if ((oldFileName != null)) {
- File oldFile = new File(oldFileName);
- if (oldFile.exists()) {
- if (compression != Compression.NONE) {
- executor.execute(() -> runCompression(oldFile, compression));
- } else {
- nativeIO.dropFileFromCache(oldFile);
- }
- }
+ /** Name files by date - create a symlink with a constant name to the newest file */
+ private void createSymlinkToCurrentFile() {
+ if (symlinkName == null) return;
+ File f = new File(fileName);
+ File f2 = new File(f.getParent(), symlinkName);
+ String [] cmd = new String[]{"/bin/ln", "-sf", f.getName(), f2.getPath()};
+ try {
+ int retval = new ProcessExecuter().exec(cmd).getFirst();
+ // Detonator pattern: Think of all the fun we can have if ln isn't what we
+ // think it is, if it doesn't return, etc, etc
+ if (retval != 0) {
+ logger.warning("Command '" + Arrays.toString(cmd) + "' + failed with exitcode=" + retval);
}
+ } catch (IOException e) {
+ logger.warning("Got '" + e + "' while doing'" + Arrays.toString(cmd) + "'.");
}
+ }
+ /**
+ * Calculate rotation times array, given times in minutes, as "0 60 ..."
+ *
+ */
+ private static long[] calcTimesMinutes(String times) {
+ ArrayList<Long> list = new ArrayList<>(50);
+ int i = 0;
+ boolean etc = false;
- private static void runCompression(File oldFile, Compression compression) {
- switch (compression) {
- case ZSTD:
- runCompressionZstd(oldFile.toPath());
- break;
- case GZIP:
- runCompressionGzip(oldFile);
- break;
- default:
- throw new IllegalArgumentException("Unknown compression " + compression);
+ while (i < times.length()) {
+ if (times.charAt(i) == ' ') { i++; continue; } // skip spaces
+ int j = i; // start of string
+ i = times.indexOf(' ', i);
+ if (i == -1) i = times.length();
+ if (times.charAt(j) == '.' && times.substring(j,i).equals("...")) { // ...
+ etc = true;
+ break;
}
+ list.add(Long.valueOf(times.substring(j,i)));
}
- private static void runCompressionZstd(Path oldFile) {
- try {
- Path compressedFile = Paths.get(oldFile.toString() + ".zst");
- Files.createFile(compressedFile);
- int bufferSize = 0x400000; // 4M
- byte[] buffer = new byte[bufferSize];
- try (ZstdOuputStream out = new ZstdOuputStream(Files.newOutputStream(compressedFile), bufferSize);
- InputStream in = Files.newInputStream(oldFile)) {
- int read;
- while ((read = in.read(buffer)) >= 0) {
- out.write(buffer, 0, read);
- }
- out.flush();
- }
- Files.delete(oldFile);
- } catch (IOException e) {
- logger.log(Level.WARNING, "Failed to compress log file with zstd: " + oldFile, e);
- }
+ int size = list.size();
+ long[] longtimes = new long[size];
+ for (i = 0; i<size; i++) {
+ longtimes[i] = list.get(i) // pick up value in minutes past midnight
+ * 60000; // and multiply to get millis
}
- private static void runCompressionGzip(File oldFile) {
- File gzippedFile = new File(oldFile.getPath() + ".gz");
- NativeIO nativeIO = new NativeIO();
- try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000);
- FileInputStream inputStream = new FileInputStream(oldFile)) {
- byte[] buffer = new byte[0x400000]; // 4M buffer
-
- long totalBytesRead = 0;
- for (int read = inputStream.read(buffer); read > 0; read = inputStream.read(buffer)) {
- compressor.write(buffer, 0, read);
- nativeIO.dropPartialFileFromCache(inputStream.getFD(), totalBytesRead, read, false);
- totalBytesRead += read;
+ if (etc) { // fill out rest of day, same as final interval
+ long endOfDay = 24*60*60*1000;
+ long lasttime = longtimes[size-1];
+ long interval = lasttime - longtimes[size-2];
+ long moreneeded = (endOfDay - lasttime)/interval;
+ if (moreneeded > 0) {
+ int newsize = size + (int)moreneeded;
+ long[] temp = new long[newsize];
+ for (i=0; i<size; i++) {
+ temp[i] = longtimes[i];
}
- compressor.finish();
- compressor.flush();
-
- } catch (IOException e) {
- logger.warning("Got '" + e + "' while compressing '" + oldFile.getPath() + "'.");
- }
- oldFile.delete();
- nativeIO.dropFileFromCache(gzippedFile);
- }
-
- /**
- * Name files by date - create a symlink with a constant name to the newest file
- */
- private void createSymlinkToCurrentFile() {
- if (symlinkName == null) return;
- File f = new File(fileName);
- File f2 = new File(f.getParent(), symlinkName);
- String[] cmd = new String[]{"/bin/ln", "-sf", f.getName(), f2.getPath()};
- try {
- int retval = new ProcessExecuter().exec(cmd).getFirst();
- // Detonator pattern: Think of all the fun we can have if ln isn't what we
- // think it is, if it doesn't return, etc, etc
- if (retval != 0) {
- logger.warning("Command '" + Arrays.toString(cmd) + "' + failed with exitcode=" + retval);
+ while (size < newsize) {
+ lasttime += interval;
+ temp[size++] = lasttime;
}
- } catch (IOException e) {
- logger.warning("Got '" + e + "' while doing'" + Arrays.toString(cmd) + "'.");
+ longtimes = temp;
}
}
- private static final long lengthOfDayMillis = 24 * 60 * 60 * 1000;
- private static long timeOfDayMillis(long time) {
- return time % lengthOfDayMillis;
- }
-
+ return longtimes;
}
- private static class Operation<LOGTYPE> {
- enum Type {log, flush, close, rotate}
-
- ;
+ // Support staff :-)
+ private static final long lengthOfDayMillis = 24*60*60*1000; // ? is this close enough ?
- final Type type;
-
- final Optional<LOGTYPE> log;
- final CountDownLatch countDownLatch = new CountDownLatch(1);
+ private static long timeOfDayMillis ( long time ) {
+ return time % lengthOfDayMillis;
+ }
- Operation(Type type) {
- this(type, Optional.empty());
+ /**
+ * Flushes all queued messages, interrupts the log thread in this and
+ * waits for it to end before returning
+ */
+ void shutdown() {
+ logThread.interrupt();
+ try {
+ logThread.join();
+ executor.shutdown();
+ executor.awaitTermination(600, TimeUnit.SECONDS);
}
-
- Operation(LOGTYPE log) {
- this(Type.log, Optional.of(log));
+ catch (InterruptedException e) {
}
+ }
- private Operation(Type type, Optional<LOGTYPE> log) {
- this.type = type;
- this.log = log;
- }
+ /**
+ * Only for unit testing. Do not use.
+ */
+ String getFileName() {
+ return fileName;
}
-}
+}
diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java
deleted file mode 100644
index fbf9bd1dc23..00000000000
--- a/jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-package com.yahoo.container.logging;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.util.UUID;
-
-/**
- * @author mortent
- */
-public class ConnectionLogEntryTest {
-
- @Test
- public void test_serialization () throws IOException {
- var id = UUID.randomUUID();
- var instant = Instant.parse("2021-01-13T12:12:12Z");
- ConnectionLogEntry entry = ConnectionLogEntry.builder(id, instant)
- .withPeerPort(1234)
- .build();
-
- String expected = "{" +
- "\"id\":\""+id.toString()+"\"," +
- "\"timestamp\":\"2021-01-13T12:12:12Z\"," +
- "\"peerPort\":1234" +
- "}";
- Assert.assertEquals(expected, entry.toJson());
- }
-}
diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java
new file mode 100644
index 00000000000..b8978fe489c
--- /dev/null
+++ b/jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java
@@ -0,0 +1,36 @@
+package com.yahoo.container.logging;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+import com.yahoo.test.json.JsonTestHelper;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.UUID;
+
+/**
+ * @author bjorncs
+ */
+class JsonConnectionLogWriterTest {
+
+ @Test
+ void test_serialization() throws IOException {
+ var id = UUID.randomUUID();
+ var instant = Instant.parse("2021-01-13T12:12:12Z");
+ ConnectionLogEntry entry = ConnectionLogEntry.builder(id, instant)
+ .withPeerPort(1234)
+ .build();
+ String expectedJson = "{" +
+ "\"id\":\""+id.toString()+"\"," +
+ "\"timestamp\":\"2021-01-13T12:12:12Z\"," +
+ "\"peerPort\":1234" +
+ "}";
+
+ JsonConnectionLogWriter writer = new JsonConnectionLogWriter();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ writer.write(entry, out);
+ String actualJson = out.toString(StandardCharsets.UTF_8);
+ JsonTestHelper.assertJsonEquals(actualJson, expectedJson);
+ }
+} \ No newline at end of file
diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java
index cd3c174a12e..f76312af61e 100644
--- a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java
+++ b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java
@@ -42,14 +42,20 @@ public class LogFileHandlerTestCase {
String pattern = root.getAbsolutePath() + "/logfilehandlertest.%Y%m%d%H%M%S";
long[] rTimes = {1000, 2000, 10000};
+ Formatter formatter = new Formatter() {
+ public String format(LogRecord r) {
+ DateFormat df = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss.SSS");
+ String timeStamp = df.format(new Date(r.getMillis()));
+ return ("["+timeStamp+"]" + " " + formatMessage(r) + "\n");
+ }
+ };
LogFileHandler<String> h = new LogFileHandler<>(Compression.NONE, pattern, rTimes, null, new StringLogWriter());
long now = System.currentTimeMillis();
long millisPerDay = 60*60*24*1000;
long tomorrowDays = (now / millisPerDay) +1;
long tomorrowMillis = tomorrowDays * millisPerDay;
-
- assertThat(tomorrowMillis+1000).isEqualTo(h.logThread.getNextRotationTime(tomorrowMillis));
- assertThat(tomorrowMillis+10000).isEqualTo(h.logThread.getNextRotationTime(tomorrowMillis+3000));
+ assertThat(tomorrowMillis+1000).isEqualTo(h.getNextRotationTime(tomorrowMillis));
+ assertThat(tomorrowMillis+10000).isEqualTo(h.getNextRotationTime(tomorrowMillis+3000));
String message = "test";
h.publish(message);
h.publish( "another test");
@@ -121,7 +127,7 @@ public class LogFileHandlerTestCase {
String longMessage = formatter.format(new LogRecord(Level.INFO, "string which is way longer than the word test"));
handler.publish(longMessage);
- handler.flush();
+ handler.waitDrained();
assertThat(Files.size(Paths.get(firstFile))).isEqualTo(31);
final long expectedSecondFileLength = 72;
long secondFileLength;
@@ -166,7 +172,7 @@ public class LogFileHandlerTestCase {
for (int i = 0; i < logEntries; i++) {
h.publish("test");
}
- h.flush();
+ h.waitDrained();
String f1 = h.getFileName();
assertThat(f1).startsWith(root.getAbsolutePath() + "/logfilehandlertest.");
File uncompressed = new File(f1);
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperations.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperations.java
index c8fd4c077ce..255c04e0072 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperations.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperations.java
@@ -8,6 +8,7 @@ import com.yahoo.vespa.hosted.dockerapi.ContainerResources;
import com.yahoo.vespa.hosted.dockerapi.ContainerStats;
import com.yahoo.vespa.hosted.dockerapi.ProcessResult;
import com.yahoo.vespa.hosted.dockerapi.RegistryCredentials;
+import com.yahoo.vespa.hosted.node.admin.component.TaskContext;
import com.yahoo.vespa.hosted.node.admin.nodeagent.ContainerData;
import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext;
import com.yahoo.vespa.hosted.node.admin.task.util.process.CommandResult;
@@ -20,6 +21,7 @@ import java.util.Set;
/**
* @author hakonhall
*/
+// TODO(mpolden): Clean up this interface when ContainerOperationsImpl, DockerEngine and friends can be removed
public interface ContainerOperations {
void createContainer(NodeAgentContext context, ContainerData containerData, ContainerResources containerResources);
@@ -32,7 +34,7 @@ public interface ContainerOperations {
Optional<Container> getContainer(NodeAgentContext context);
- boolean pullImageAsyncIfNeeded(DockerImage dockerImage, RegistryCredentials registryCredentials);
+ boolean pullImageAsyncIfNeeded(TaskContext context, DockerImage dockerImage, RegistryCredentials registryCredentials);
ProcessResult executeCommandInContainerAsRoot(NodeAgentContext context, String... command);
@@ -59,10 +61,14 @@ public interface ContainerOperations {
Optional<ContainerStats> getContainerStats(NodeAgentContext context);
- boolean noManagedContainersRunning();
+ boolean noManagedContainersRunning(TaskContext context);
- /** Stops and removes all managed containers except the ones given in {@code containerNames} */
- boolean retainManagedContainers(Set<ContainerName> containerNames);
+ /**
+ * Stops and removes all managed containers except the ones given in {@code containerNames}.
+ *
+ * @return true if any containers were removed
+ */
+ boolean retainManagedContainers(TaskContext context, Set<ContainerName> containerNames);
/** Deletes the local images that are currently not in use by any container and not recently used. */
boolean deleteUnusedContainerImages(List<DockerImage> excludes, Duration minImageAgeToDelete);
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImpl.java
index 40e1622fa0d..abbee452313 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImpl.java
@@ -13,6 +13,7 @@ import com.yahoo.vespa.hosted.dockerapi.ContainerResources;
import com.yahoo.vespa.hosted.dockerapi.ContainerStats;
import com.yahoo.vespa.hosted.dockerapi.ProcessResult;
import com.yahoo.vespa.hosted.dockerapi.RegistryCredentials;
+import com.yahoo.vespa.hosted.node.admin.component.TaskContext;
import com.yahoo.vespa.hosted.node.admin.nodeadmin.ConvergenceException;
import com.yahoo.vespa.hosted.node.admin.nodeagent.ContainerData;
import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext;
@@ -40,6 +41,7 @@ import java.util.stream.Stream;
*
* @author Haakon Dybdahl
*/
+// TODO: Remove when Podman becomes the only implementation in use
public class ContainerOperationsImpl implements ContainerOperations {
private static final Logger logger = Logger.getLogger(ContainerOperationsImpl.class.getName());
@@ -211,7 +213,7 @@ public class ContainerOperationsImpl implements ContainerOperations {
}
@Override
- public boolean pullImageAsyncIfNeeded(DockerImage dockerImage, RegistryCredentials registryCredentials) {
+ public boolean pullImageAsyncIfNeeded(TaskContext context, DockerImage dockerImage, RegistryCredentials registryCredentials) {
return containerEngine.pullImageAsyncIfNeeded(dockerImage, registryCredentials);
}
@@ -321,12 +323,12 @@ public class ContainerOperationsImpl implements ContainerOperations {
}
@Override
- public boolean noManagedContainersRunning() {
+ public boolean noManagedContainersRunning(TaskContext context) {
return containerEngine.noManagedContainersRunning(MANAGER_NAME);
}
@Override
- public boolean retainManagedContainers(Set<ContainerName> containerNames) {
+ public boolean retainManagedContainers(TaskContext context, Set<ContainerName> containerNames) {
return containerEngine.listManagedContainers(MANAGER_NAME).stream()
.filter(containerName -> ! containerNames.contains(containerName))
.peek(containerName -> {
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java
index f93cd005fae..5ea0a5d12c3 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java
@@ -244,7 +244,7 @@ public class NodeAgentContextImpl implements NodeAgentContext {
return this;
}
- public Builder dockerNetworking(ContainerNetworkMode containerNetworkMode) {
+ public Builder networkMode(ContainerNetworkMode containerNetworkMode) {
this.containerNetworkMode = containerNetworkMode;
return this;
}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
index 9fa21e5a676..dc0b6dc9d85 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
@@ -393,12 +393,13 @@ public class NodeAgentImpl implements NodeAgent {
return zone.getEnvironment() == Environment.dev || zone.getSystemName().isCd();
}
- private boolean downloadImageIfNeeded(NodeSpec node, Optional<Container> container) {
+ private boolean downloadImageIfNeeded(NodeAgentContext context, Optional<Container> container) {
+ NodeSpec node = context.node();
if (node.wantedDockerImage().equals(container.map(c -> c.image))) return false;
RegistryCredentials credentials = registryCredentialsProvider.get();
return node.wantedDockerImage()
- .map(image -> containerOperations.pullImageAsyncIfNeeded(image, credentials))
+ .map(image -> containerOperations.pullImageAsyncIfNeeded(context, image, credentials))
.orElse(false);
}
@@ -454,7 +455,7 @@ public class NodeAgentImpl implements NodeAgent {
storageMaintainer.cleanDiskIfFull(context);
storageMaintainer.handleCoreDumpsForContainer(context, container);
- if (downloadImageIfNeeded(node, container)) {
+ if (downloadImageIfNeeded(context, container)) {
context.log(logger, "Waiting for image to download " + context.node().wantedDockerImage().get().asString());
return;
}
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImplTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImplTest.java
index 5f870664970..240fb492aff 100644
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImplTest.java
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/docker/ContainerOperationsImplTest.java
@@ -130,7 +130,7 @@ public class ContainerOperationsImplTest {
public void retainContainersTest() {
when(containerEngine.listManagedContainers(ContainerOperationsImpl.MANAGER_NAME))
.thenReturn(List.of(new ContainerName("cnt1"), new ContainerName("cnt2"), new ContainerName("cnt3")));
- containerOperations.retainManagedContainers(Set.of(new ContainerName("cnt2"), new ContainerName("cnt4")));
+ containerOperations.retainManagedContainers(any(), Set.of(new ContainerName("cnt2"), new ContainerName("cnt4")));
verify(containerEngine).stopContainer(eq(new ContainerName("cnt1")));
verify(containerEngine).deleteContainer(eq(new ContainerName("cnt1")));
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java
index eea775f9a63..fcd5e8cc187 100644
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java
@@ -85,7 +85,7 @@ public class NodeAgentImplTest {
verify(containerOperations, never()).removeContainer(eq(context), any());
verify(orchestrator, never()).suspend(any(String.class));
- verify(containerOperations, never()).pullImageAsyncIfNeeded(any(), any());
+ verify(containerOperations, never()).pullImageAsyncIfNeeded(any(), any(), any());
final InOrder inOrder = inOrder(containerOperations, orchestrator, nodeRepository);
// TODO: Verify this isn't run unless 1st time
@@ -155,7 +155,7 @@ public class NodeAgentImplTest {
NodeAgentImpl nodeAgent = makeNodeAgent(null, false);
when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node));
- when(containerOperations.pullImageAsyncIfNeeded(eq(dockerImage), any())).thenReturn(false);
+ when(containerOperations.pullImageAsyncIfNeeded(any(), eq(dockerImage), any())).thenReturn(false);
nodeAgent.doConverge(context);
@@ -164,7 +164,7 @@ public class NodeAgentImplTest {
verify(orchestrator, never()).suspend(any(String.class));
final InOrder inOrder = inOrder(containerOperations, orchestrator, nodeRepository, aclMaintainer, healthChecker);
- inOrder.verify(containerOperations, times(1)).pullImageAsyncIfNeeded(eq(dockerImage), any());
+ inOrder.verify(containerOperations, times(1)).pullImageAsyncIfNeeded(any(), eq(dockerImage), any());
inOrder.verify(containerOperations, times(1)).createContainer(eq(context), any(), any());
inOrder.verify(containerOperations, times(1)).startContainer(eq(context));
inOrder.verify(aclMaintainer, times(1)).converge(eq(context));
@@ -187,7 +187,7 @@ public class NodeAgentImplTest {
NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true);
when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node));
- when(containerOperations.pullImageAsyncIfNeeded(any(), any())).thenReturn(true);
+ when(containerOperations.pullImageAsyncIfNeeded(any(), any(), any())).thenReturn(true);
nodeAgent.doConverge(context);
@@ -196,7 +196,7 @@ public class NodeAgentImplTest {
verify(containerOperations, never()).removeContainer(eq(context), any());
final InOrder inOrder = inOrder(containerOperations);
- inOrder.verify(containerOperations, times(1)).pullImageAsyncIfNeeded(eq(newDockerImage), any());
+ inOrder.verify(containerOperations, times(1)).pullImageAsyncIfNeeded(any(), eq(newDockerImage), any());
}
@Test
@@ -209,7 +209,7 @@ public class NodeAgentImplTest {
NodeAgentContext firstContext = createContext(specBuilder.build());
NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true);
- when(containerOperations.pullImageAsyncIfNeeded(any(), any())).thenReturn(true);
+ when(containerOperations.pullImageAsyncIfNeeded(any(), any(), any())).thenReturn(true);
InOrder inOrder = inOrder(orchestrator, containerOperations);
@@ -256,7 +256,7 @@ public class NodeAgentImplTest {
NodeAgentContext firstContext = createContext(specBuilder.build());
NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true);
- when(containerOperations.pullImageAsyncIfNeeded(any(), any())).thenReturn(true);
+ when(containerOperations.pullImageAsyncIfNeeded(any(), any(), any())).thenReturn(true);
nodeAgent.doConverge(firstContext);
NodeAgentContext secondContext = createContext(specBuilder.memoryGb(20).build());
@@ -316,7 +316,7 @@ public class NodeAgentImplTest {
NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true);
when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node));
- when(containerOperations.pullImageAsyncIfNeeded(eq(dockerImage), any())).thenReturn(false);
+ when(containerOperations.pullImageAsyncIfNeeded(any(), eq(dockerImage), any())).thenReturn(false);
doThrow(new ConvergenceException("Connection refused")).doNothing()
.when(healthChecker).verifyHealth(eq(context));
@@ -543,7 +543,7 @@ public class NodeAgentImplTest {
NodeAgentContext context = createContext(node);
NodeAgentImpl nodeAgent = spy(makeNodeAgent(null, false));
- when(containerOperations.pullImageAsyncIfNeeded(eq(dockerImage), any())).thenReturn(false);
+ when(containerOperations.pullImageAsyncIfNeeded(any(), eq(dockerImage), any())).thenReturn(false);
doThrow(new DockerException("Failed to set up network")).doNothing().when(containerOperations).startContainer(eq(context));
try {
@@ -580,7 +580,7 @@ public class NodeAgentImplTest {
NodeAgentImpl nodeAgent = makeNodeAgent(null, false);
when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node));
- when(containerOperations.pullImageAsyncIfNeeded(eq(dockerImage), any())).thenReturn(false);
+ when(containerOperations.pullImageAsyncIfNeeded(any(), eq(dockerImage), any())).thenReturn(false);
nodeAgent.doConverge(context);
@@ -588,7 +588,7 @@ public class NodeAgentImplTest {
verify(orchestrator, never()).suspend(any(String.class));
final InOrder inOrder = inOrder(containerOperations, orchestrator, nodeRepository, aclMaintainer);
- inOrder.verify(containerOperations, times(1)).pullImageAsyncIfNeeded(eq(dockerImage), any());
+ inOrder.verify(containerOperations, times(1)).pullImageAsyncIfNeeded(any(), eq(dockerImage), any());
inOrder.verify(containerOperations, times(1)).createContainer(eq(context), any(), any());
inOrder.verify(containerOperations, times(1)).startContainer(eq(context));
inOrder.verify(aclMaintainer, times(1)).converge(eq(context));
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp
index 6b4061081ea..4715ff80d03 100644
--- a/searchcore/src/apps/tests/persistenceconformance_test.cpp
+++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp
@@ -109,12 +109,13 @@ public:
DocumenttypesConfigSP getTypeCfg() const { return _typeCfg; }
DocTypeVector getDocTypes() const {
DocTypeVector types;
- _repo->forEachDocumentType(*makeClosure(storeDocType, &types));
+ _repo->forEachDocumentType(*DocumentTypeRepo::makeLambda([&types](const DocumentType &type) {
+ types.push_back(DocTypeName(type.getName()));
+ }));
return types;
}
DocumentDBConfig::SP create(const DocTypeName &docTypeName) const {
- const DocumentType *docType =
- _repo->getDocumentType(docTypeName.getName());
+ const DocumentType *docType = _repo->getDocumentType(docTypeName.getName());
if (docType == nullptr) {
return DocumentDBConfig::SP();
}
diff --git a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp
index 2352fda65a0..e6bcbf18495 100644
--- a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp
@@ -43,12 +43,12 @@ makeBaseConfigSnapshot()
DBCM dbcm(spec, "test");
DocumenttypesConfigSP dtcfg(config::ConfigGetter<DocumenttypesConfig>::getConfig("", spec).release());
- BootstrapConfig::SP b(new BootstrapConfig(1, dtcfg,
- std::shared_ptr<const DocumentTypeRepo>(new DocumentTypeRepo(*dtcfg)),
- std::make_shared<ProtonConfig>(),
- std::make_shared<FiledistributorrpcConfig>(),
- std::make_shared<BucketspacesConfig>(),
- std::make_shared<TuneFileDocumentDB>(), HwInfo()));
+ auto b = std::make_shared<BootstrapConfig>(1, dtcfg,
+ std::make_shared<DocumentTypeRepo>(*dtcfg),
+ std::make_shared<ProtonConfig>(),
+ std::make_shared<FiledistributorrpcConfig>(),
+ std::make_shared<BucketspacesConfig>(),
+ std::make_shared<TuneFileDocumentDB>(), HwInfo());
dbcm.forwardConfig(b);
dbcm.nextGeneration(0ms);
DocumentDBConfig::SP snap = dbcm.getConfig();
@@ -71,8 +71,6 @@ makeEmptyConfigSnapshot()
return test::DocumentDBConfigBuilder(0, std::make_shared<Schema>(), "client", "test").build();
}
-void incInt(int *i, const DocumentType&) { ++*i; }
-
void
assertEqualSnapshot(const DocumentDBConfig &exp, const DocumentDBConfig &act)
{
@@ -91,10 +89,12 @@ assertEqualSnapshot(const DocumentDBConfig &exp, const DocumentDBConfig &act)
int expTypeCount = 0;
int actTypeCount = 0;
- exp.getDocumentTypeRepoSP()->forEachDocumentType(
- *vespalib::makeClosure(incInt, &expTypeCount));
- act.getDocumentTypeRepoSP()->forEachDocumentType(
- *vespalib::makeClosure(incInt, &actTypeCount));
+ exp.getDocumentTypeRepoSP()->forEachDocumentType(*DocumentTypeRepo::makeLambda([&expTypeCount](const DocumentType &) {
+ expTypeCount++;
+ }));
+ act.getDocumentTypeRepoSP()->forEachDocumentType(*DocumentTypeRepo::makeLambda([&actTypeCount](const DocumentType &) {
+ actTypeCount++;
+ }));
EXPECT_EQUAL(expTypeCount, actTypeCount);
EXPECT_TRUE(*exp.getSchemaSP() == *act.getSchemaSP());
EXPECT_EQUAL(expTypeCount, actTypeCount);
@@ -164,8 +164,7 @@ TEST_F("requireThatConfigCanBeLoadedWithoutExtraConfigsDataFile", DocumentDBConf
}
-TEST_F("requireThatVisibilityDelayIsPropagated",
- DocumentDBConfig::SP(makeBaseConfigSnapshot()))
+TEST_F("requireThatVisibilityDelayIsPropagated", DocumentDBConfig::SP(makeBaseConfigSnapshot()))
{
saveBaseConfigSnapshot(*f, 80);
DocumentDBConfig::SP esnap(makeEmptyConfigSnapshot());
@@ -177,8 +176,7 @@ TEST_F("requireThatVisibilityDelayIsPropagated",
protonConfigBuilder.documentdb.push_back(ddb);
protonConfigBuilder.maxvisibilitydelay = 100.0;
FileConfigManager cm("out", myId, "dummy");
- using ProtonConfigSP = BootstrapConfig::ProtonConfigSP;
- cm.setProtonConfig(ProtonConfigSP(new ProtonConfig(protonConfigBuilder)));
+ cm.setProtonConfig(std::make_shared<ProtonConfig>(protonConfigBuilder));
cm.loadConfig(*esnap, 70, esnap);
}
EXPECT_EQUAL(61s, esnap->getMaintenanceConfigSP()->getVisibilityDelay());
diff --git a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp
index 23267e0628b..4bf8f36caa3 100644
--- a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp
+++ b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp
@@ -113,8 +113,7 @@ public:
if (!EXPECT_EQUAL(expRemoveCompleteCount, _removeCompleteCount)) {
return false;
}
- if (!EXPECT_EQUAL(expRemoveBatchCompleteCount,
- _removeBatchCompleteCount)) {
+ if (!EXPECT_EQUAL(expRemoveBatchCompleteCount, _removeBatchCompleteCount)) {
return false;
}
if (!EXPECT_EQUAL(expRemoveCompleteLids, _removeCompleteLids)) {
@@ -152,9 +151,7 @@ public:
test::runInMaster(_writeService, func);
}
- void
- cycledLids(const std::vector<uint32_t> &lids)
- {
+ void cycledLids(const std::vector<uint32_t> &lids) {
if (lids.size() == 1) {
_store.removeComplete(lids[0]);
} else {
@@ -162,33 +159,23 @@ public:
}
}
- void
- performCycleLids(const std::vector<uint32_t> &lids)
- {
- _writeService.master().execute(
- makeLambdaTask([this, lids]() { cycledLids(lids);}));
+ void performCycleLids(const std::vector<uint32_t> &lids) {
+ _writeService.master().execute(makeLambdaTask([this, lids]() { cycledLids(lids);}));
}
- void
- cycleLids(const std::vector<uint32_t> &lids)
- {
+ void cycleLids(const std::vector<uint32_t> &lids) {
if (lids.empty())
return;
- _writeService.index().execute(
- makeLambdaTask([this, lids]() { performCycleLids(lids);}));
+ _writeService.index().execute(makeLambdaTask([this, lids]() { performCycleLids(lids);}));
}
- bool
- delayReuse(uint32_t lid)
- {
+ bool delayReuse(uint32_t lid) {
bool res = false;
runInMaster([&] () { res = _lidReuseDelayer->delayReuse(lid); } );
return res;
}
- bool
- delayReuse(const std::vector<uint32_t> &lids)
- {
+ bool delayReuse(const std::vector<uint32_t> &lids) {
bool res = false;
runInMaster([&] () { res = _lidReuseDelayer->delayReuse(lids); });
return res;
@@ -198,26 +185,10 @@ public:
runInMaster([&] () { cycleLids(_lidReuseDelayer->getReuseLids()); });
}
- void
- sync()
- {
- _writeService.sync();
- }
+ void sync() { _writeService.sync(); }
- void
- scheduleDelayReuseLid(uint32_t lid)
- {
- runInMaster([&] () { cycleLids({ lid }); });
- }
-
- void
- scheduleDelayReuseLids(const std::vector<uint32_t> &lids)
- {
- runInMaster([&] () { cycleLids(lids); });
- }
};
-
TEST_F("require that nothing happens before free list is active", Fixture)
{
EXPECT_FALSE(f.delayReuse(4));
@@ -226,7 +197,6 @@ TEST_F("require that nothing happens before free list is active", Fixture)
EXPECT_TRUE(assertThreadObserver(2, 0, 0, f._writeService));
}
-
TEST_F("require that reuse can be batched", Fixture)
{
f._store._freeListActive = true;
@@ -243,7 +213,6 @@ TEST_F("require that reuse can be batched", Fixture)
EXPECT_TRUE(assertThreadObserver(6, 1, 0, f._writeService));
}
-
TEST_F("require that single element array is optimized", Fixture)
{
f._store._freeListActive = true;
diff --git a/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp b/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp
index f730d286b34..577bd32ca1f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp
@@ -69,8 +69,7 @@ ClusterStateHandler::performSetClusterState(const ClusterState *calc, IGenericRe
}
void
-ClusterStateHandler::performGetModifiedBuckets(
- IBucketIdListResultHandler *resultHandler)
+ClusterStateHandler::performGetModifiedBuckets(IBucketIdListResultHandler *resultHandler)
{
storage::spi::BucketIdListResult::List modifiedBuckets;
modifiedBuckets.resize(_modifiedBuckets.size());
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index e10f7937dd0..6951125f408 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -75,8 +75,7 @@ DocumentBucketMover::setupForBucket(const BucketId &bucket,
}
-namespace
-{
+namespace {
class MoveKey
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 7fb16e851fa..9df81e5ceec 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -33,7 +33,6 @@
#include <vespa/searchlib/engine/docsumreply.h>
#include <vespa/searchlib/engine/searchreply.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
-#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/metrics/updatehook.h>
@@ -217,10 +216,6 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
// Forward changes of cluster state to bucket handler
_clusterStateHandler.addClusterStateChangedHandler(&_bucketHandler);
- _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getReadySubDB(), _docTypeName.getName()));
- _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getRemSubDB(), _docTypeName.getName()));
- _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getNotReadySubDB(), _docTypeName.getName()));
-
_writeFilter.setConfig(loaded_config->getMaintenanceConfigSP()->getAttributeUsageFilterConfig());
}
@@ -416,6 +411,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
LOG(error, "Applying config to closed document db");
return;
}
+
ConfigComparisonResult cmpres;
Schema::SP oldSchema;
int64_t generation = configSnapshot->getGeneration();
@@ -436,6 +432,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
cmpres.importedFieldsChanged = true;
}
const ReconfigParams params(cmpres);
+
// Save config via config manager if replay is done.
bool equalReplayConfig =
*DocumentDBConfig::makeReplayConfig(configSnapshot) ==
@@ -457,6 +454,10 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
_feedView.get()->forceCommit(elidedConfigSave ? serialNum : serialNum - 1, std::make_shared<vespalib::KeepAlive<FeedHandler::CommitResult>>(std::move(commit_result)));
_writeService.sync();
}
+ if (params.shouldMaintenanceControllerChange()) {
+ _maintenanceController.killJobs();
+ }
+
if (_state.getState() >= DDBState::State::APPLY_LIVE_CONFIG) {
_writeServiceConfig.update(configSnapshot->get_threading_service_config());
}
@@ -483,7 +484,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
_state.clearDelayedConfig();
}
setActiveConfig(configSnapshot, generation);
- if (params.shouldMaintenanceControllerChange()) {
+ if (params.shouldMaintenanceControllerChange() || _maintenanceController.getPaused()) {
forwardMaintenanceConfig();
}
_writeFilter.setConfig(configSnapshot->getMaintenanceConfigSP()->getAttributeUsageFilterConfig());
@@ -699,9 +700,7 @@ DocumentDB::getAllowPrune() const
void
DocumentDB::start()
{
- LOG(debug,
- "DocumentDB(%s): Database starting.",
- _docTypeName.toString().c_str());
+ LOG(debug, "DocumentDB(%s): Database starting.", _docTypeName.toString().c_str());
internalInit();
}
@@ -934,6 +933,10 @@ DocumentDB::injectMaintenanceJobs(const DocumentDBMaintenanceConfig &config, std
{
// Called by executor thread
_maintenanceController.killJobs();
+ _lidSpaceCompactionHandlers.clear();
+ _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getReadySubDB(), _docTypeName.getName()));
+ _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getRemSubDB(), _docTypeName.getName()));
+ _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getNotReadySubDB(), _docTypeName.getName()));
MaintenanceJobsInjector::injectJobs(_maintenanceController,
config,
_bucketExecutor,
@@ -997,13 +1000,11 @@ DocumentDB::forwardMaintenanceConfig()
// Called by executor thread
DocumentDBConfig::SP activeConfig = getActiveConfig();
assert(activeConfig);
- DocumentDBMaintenanceConfig::SP
- maintenanceConfig(activeConfig->getMaintenanceConfigSP());
+ auto maintenanceConfig(activeConfig->getMaintenanceConfigSP());
const auto &attributes_config = activeConfig->getAttributesConfig();
auto attribute_config_inspector = std::make_unique<AttributeConfigInspector>(attributes_config);
if (!_state.getClosed()) {
- if (_maintenanceController.getStarted() &&
- !_maintenanceController.getStopping()) {
+ if (_maintenanceController.getPaused()) {
injectMaintenanceJobs(*maintenanceConfig, std::move(attribute_config_inspector));
}
_maintenanceController.newConfig(maintenanceConfig);
diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketmodifiedhandler.h b/searchcore/src/vespa/searchcore/proton/server/ibucketmodifiedhandler.h
index 96f70d32dc5..652c303283f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ibucketmodifiedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ibucketmodifiedhandler.h
@@ -2,16 +2,9 @@
#pragma once
-namespace document
-{
-
-class BucketId;
-
-}
-
-namespace proton
-{
+namespace document { class BucketId;}
+namespace proton {
class IBucketModifiedHandler
{
@@ -20,6 +13,4 @@ public:
virtual ~IBucketModifiedHandler() {}
};
-
-} // namespace proton
-
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h b/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h
index 10c0b194aac..15211c8ceb7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h
@@ -16,7 +16,7 @@ struct IBucketStateCalculator
virtual bool nodeUp() const = 0;
virtual bool nodeInitializing() const = 0;
virtual bool nodeRetired() const = 0;
- virtual ~IBucketStateCalculator() {}
+ virtual ~IBucketStateCalculator() = default;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangedhandler.h b/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangedhandler.h
index 1ed0359042e..f97d1b697cb 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangedhandler.h
@@ -4,16 +4,9 @@
#include <vespa/persistence/spi/bucketinfo.h>
-namespace document
-{
-
-class BucketId;
-
-}
-
-namespace proton
-{
+namespace document { class BucketId; }
+namespace proton {
/**
* Interface used to notify when bucket state has changed.
@@ -24,8 +17,7 @@ public:
virtual void notifyBucketStateChanged(const document::BucketId &bucketId,
storage::spi::BucketInfo::ActiveState newState) = 0;
- virtual ~IBucketStateChangedHandler() {}
+ virtual ~IBucketStateChangedHandler() = default;
};
-
-} // namespace proton
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangednotifier.h b/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangednotifier.h
index b21c184d431..0f1f03370d0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangednotifier.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangednotifier.h
@@ -2,8 +2,7 @@
#pragma once
-namespace proton
-{
+namespace proton {
class IBucketStateChangedHandler;
@@ -13,14 +12,10 @@ class IBucketStateChangedHandler;
class IBucketStateChangedNotifier
{
public:
- virtual void
- addBucketStateChangedHandler(IBucketStateChangedHandler *handler) = 0;
-
- virtual void
- removeBucketStateChangedHandler(IBucketStateChangedHandler *handler) = 0;
+ virtual void addBucketStateChangedHandler(IBucketStateChangedHandler *handler) = 0;
+ virtual void removeBucketStateChangedHandler(IBucketStateChangedHandler *handler) = 0;
- virtual ~IBucketStateChangedNotifier() {}
+ virtual ~IBucketStateChangedNotifier() = default;
};
-
-} // namespace proton
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/iclusterstatechangedhandler.h b/searchcore/src/vespa/searchcore/proton/server/iclusterstatechangedhandler.h
index f50e591d0e8..16c98054a11 100644
--- a/searchcore/src/vespa/searchcore/proton/server/iclusterstatechangedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/iclusterstatechangedhandler.h
@@ -4,8 +4,7 @@
#include "ibucketstatecalculator.h"
-namespace proton
-{
+namespace proton {
/**
* Interface used to notify when cluster state has changed.
@@ -13,10 +12,9 @@ namespace proton
class IClusterStateChangedHandler
{
public:
- virtual ~IClusterStateChangedHandler() { }
+ virtual ~IClusterStateChangedHandler() = default;
- virtual void
- notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) = 0;
+ virtual void notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) = 0;
};
-} // namespace proton
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index aa04cc89f52..c05c25990a6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -117,14 +117,14 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
IAttributeManagerSP notReadyAttributeManager,
std::unique_ptr<const AttributeConfigInspector> attribute_config_inspector,
std::shared_ptr<TransientMemoryUsageProvider> transient_memory_usage_provider,
- AttributeUsageFilter &attributeUsageFilter) {
+ AttributeUsageFilter &attributeUsageFilter)
+{
controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig()));
controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval()));
const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB());
auto pruneRDjob = std::make_unique<PruneRemovedDocumentsJob>(config.getPruneRemovedDocumentsConfig(), *mRemSubDB.meta_store(),
- mRemSubDB.sub_db_id(), docTypeName, prdHandler, fbHandler);
- controller.registerJobInMasterThread(
- trackJob(jobTrackers.getRemovedDocumentsPrune(), std::move(pruneRDjob)));
+ mRemSubDB.sub_db_id(), docTypeName, prdHandler, fbHandler);
+ controller.registerJobInMasterThread(trackJob(jobTrackers.getRemovedDocumentsPrune(), std::move(pruneRDjob)));
if (!config.getLidSpaceCompactionConfig().isDisabled()) {
injectLidSpaceCompactionJobs(controller, config, bucketExecutor, lscHandlers, opStorer, fbHandler,
jobTrackers.getLidSpaceCompact(), diskMemUsageNotifier,
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h
index e35f9e1aa48..8ec479084d2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h
@@ -46,10 +46,8 @@ struct MaintenanceJobsInjector
IPruneRemovedDocumentsHandler &prdHandler,
IDocumentMoveHandler &moveHandler,
IBucketModifiedHandler &bucketModifiedHandler,
- IClusterStateChangedNotifier &
- clusterStateChangedNotifier,
- IBucketStateChangedNotifier &
- bucketStateChangedNotifier,
+ IClusterStateChangedNotifier & clusterStateChangedNotifier,
+ IBucketStateChangedNotifier & bucketStateChangedNotifier,
const std::shared_ptr<IBucketStateCalculator> &calc,
IDiskMemUsageNotifier &diskMemUsageNotifier,
DocumentDBJobTrackers &jobTrackers,
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
index 9d148a43964..5cedcaf1c3d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
@@ -25,7 +25,7 @@ private:
MaintenanceJobRunner *_job;
public:
JobWrapperTask(MaintenanceJobRunner *job) : _job(job) {}
- virtual void run() override { _job->run(); }
+ void run() override { _job->run(); }
};
}
@@ -42,8 +42,7 @@ MaintenanceController::MaintenanceController(IThreadService &masterThread,
_periodicTimer(),
_config(),
_frozenBuckets(masterThread),
- _started(false),
- _stopping(false),
+ _state(State::INITIALIZING),
_docTypeName(docTypeName),
_jobs(),
_jobsLock()
@@ -83,6 +82,9 @@ MaintenanceController::registerJob(Executor & executor, IMaintenanceJob::UP job)
void
MaintenanceController::killJobs()
{
+ if (_state == State::STARTED) {
+ _state = State::PAUSED;
+ }
// Called by master write thread
assert(_masterThread.isCurrentThread());
LOG(debug, "killJobs(): threadId=%zu", (size_t)FastOS_Thread::GetCurrentThreadId());
@@ -116,7 +118,7 @@ void
MaintenanceController::stop()
{
assert(!_masterThread.isCurrentThread());
- _masterThread.execute(makeLambdaTask([this]() { _stopping = true; killJobs(); }));
+ _masterThread.execute(makeLambdaTask([this]() { _state = State::STOPPING; killJobs(); }));
_masterThread.sync(); // Wait for killJobs()
_masterThread.sync(); // Wait for already scheduled maintenance jobs and performHoldJobs
}
@@ -134,9 +136,9 @@ void
MaintenanceController::start(const DocumentDBMaintenanceConfig::SP &config)
{
// Called by master write thread
- assert(!_started);
+ assert(_state == State::INITIALIZING);
_config = config;
- _started = true;
+ _state = State::STARTED;
restart();
}
@@ -145,7 +147,7 @@ void
MaintenanceController::restart()
{
// Called by master write thread
- if (!_started || _stopping || !_readySubDB.valid()) {
+ if (!getStarted() || getStopping() || !_readySubDB.valid()) {
return;
}
_periodicTimer = std::make_unique<vespalib::ScheduledExecutor>();
@@ -207,7 +209,7 @@ MaintenanceController::syncSubDBs(const MaintenanceDocumentSubDB &readySubDB,
_readySubDB = readySubDB;
_remSubDB = remSubDB;
_notReadySubDB = notReadySubDB;
- if (!oldValid && _started) {
+ if (!oldValid && getStarted()) {
restart();
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index bbaeb176a17..3c8510e6c66 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -34,6 +34,7 @@ public:
using DocumentDBMaintenanceConfigSP = std::shared_ptr<DocumentDBMaintenanceConfig>;
using JobList = std::vector<std::shared_ptr<MaintenanceJobRunner>>;
using UP = std::unique_ptr<MaintenanceController>;
+ enum class State {INITIALIZING, STARTED, PAUSED, STOPPING};
MaintenanceController(IThreadService &masterThread, vespalib::SyncableThreadExecutor & defaultExecutor, const DocTypeName &docTypeName);
@@ -63,8 +64,9 @@ public:
operator const IFrozenBucketHandler &() const { return _frozenBuckets; }
operator IFrozenBucketHandler &() { return _frozenBuckets; }
- bool getStarted() const { return _started; }
- bool getStopping() const { return _stopping; }
+ bool getStarted() const { return _state >= State::STARTED; }
+ bool getStopping() const { return _state == State::STOPPING; }
+ bool getPaused() const { return _state == State::PAUSED; }
const MaintenanceDocumentSubDB & getReadySubDB() const { return _readySubDB; }
const MaintenanceDocumentSubDB & getRemSubDB() const { return _remSubDB; }
@@ -82,8 +84,7 @@ private:
std::unique_ptr<vespalib::ScheduledExecutor> _periodicTimer;
DocumentDBMaintenanceConfigSP _config;
FrozenBuckets _frozenBuckets;
- bool _started;
- bool _stopping;
+ State _state;
const DocTypeName &_docTypeName;
JobList _jobs;
mutable Mutex _jobsLock;
@@ -95,6 +96,4 @@ private:
void registerJob(vespalib::Executor & executor, IMaintenanceJob::UP job);
};
-
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp
index 43be4e9accd..dfb84af5da5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp
@@ -23,7 +23,7 @@ PruneRemovedDocumentsJob(const Config &config,
IPruneRemovedDocumentsHandler &handler,
IFrozenBucketHandler &frozenHandler)
: BlockableMaintenanceJob("prune_removed_documents." + docTypeName,
- config.getDelay(), config.getInterval()),
+ config.getDelay(), config.getInterval()),
_metaStore(metaStore),
_subDbId(subDbId),
_cfgAgeLimit(config.getAge()),
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp
index 020111fbf58..38478b4ff20 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp
@@ -16,7 +16,6 @@
#include <vespa/searchlib/fef/indexproperties.h>
#include <vespa/searchlib/fef/properties.h>
#include <vespa/eval/eval/fast_value.h>
-#include <vespa/vespalib/util/closuretask.h>
using vespa::config::search::RankProfilesConfig;
using proton::matching::MatchingStats;
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
index 7dbf54cfd6c..95051341e8c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
@@ -23,7 +23,6 @@
#include <vespa/searchlib/docstore/document_store_visitor_progress.h>
#include <vespa/searchlib/util/fileheadertk.h>
#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/log/log.h>
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
index 070cfd8085e..9c928f49ca8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
@@ -4,7 +4,6 @@
#include "transactionlogmanager.h"
#include <vespa/searchlib/transactionlog/translogclient.h>
#include <vespa/searchcore/proton/common/eventlogger.h>
-#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/log/log.h>
diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_utils.h b/searchcore/src/vespa/searchcore/proton/test/thread_utils.h
index 340d3b08441..43033d35185 100644
--- a/searchcore/src/vespa/searchcore/proton/test/thread_utils.h
+++ b/searchcore/src/vespa/searchcore/proton/test/thread_utils.h
@@ -2,17 +2,10 @@
#pragma once
#include <vespa/searchcorespi/index/ithreadingservice.h>
-#include <vespa/vespalib/util/closuretask.h>
+#include <vespa/vespalib/util/lambdatask.h>
namespace proton::test {
-template <typename FunctionType>
-void
-runFunction(FunctionType *func)
-{
- (*func)();
-}
-
/**
* Run the given function in the master thread and wait until done.
*/
@@ -20,8 +13,7 @@ template <typename FunctionType>
void
runInMaster(searchcorespi::index::IThreadingService &writeService, FunctionType func)
{
- writeService.master().execute(vespalib::makeTask
- (vespalib::makeClosure(&runFunction<FunctionType>, &func)));
+ writeService.master().execute(vespalib::makeLambdaTask(std::move(func)));
writeService.sync();
}
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h b/searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h
deleted file mode 100644
index 8d35a07806e..00000000000
--- a/searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "flushtask.h"
-#include <vespa/vespalib/util/closure.h>
-
-namespace searchcorespi {
-
-class ClosureFlushTask : public FlushTask
-{
- std::unique_ptr<vespalib::Closure> _closure;
- search::SerialNum _flushSerial;
-
-public:
- ClosureFlushTask(std::unique_ptr<vespalib::Closure> closure,
- search::SerialNum flushSerial)
- : _closure(std::move(closure)),
- _flushSerial(flushSerial)
- {
- }
-
- search::SerialNum getFlushSerial() const override {
- return _flushSerial;
- }
-
- void run() override {
- _closure->call();
- }
-};
-
-/**
- * Wraps a Closure as a FlushTask
- **/
-static inline FlushTask::UP
-makeFlushTask(std::unique_ptr<vespalib::Closure> closure,
- search::SerialNum flushSerial)
-{
- return FlushTask::UP(new ClosureFlushTask(std::move(closure), flushSerial));
-}
-
-} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/lambdaflushtask.h b/searchcorespi/src/vespa/searchcorespi/flush/lambdaflushtask.h
new file mode 100644
index 00000000000..fc860a4292f
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/flush/lambdaflushtask.h
@@ -0,0 +1,31 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "flushtask.h"
+
+namespace searchcorespi {
+
+template <class FunctionType>
+class LambdaFlushTask : public FlushTask {
+ FunctionType _func;
+ search::SerialNum _flushSerial;
+
+public:
+ LambdaFlushTask(FunctionType &&func, search::SerialNum flushSerial)
+ : _func(std::move(func)),
+ _flushSerial(flushSerial)
+ {}
+ ~LambdaFlushTask() override = default;
+ search::SerialNum getFlushSerial() const override { return _flushSerial; }
+ void run() override { _func(); }
+};
+
+template <class FunctionType>
+std::unique_ptr<FlushTask>
+makeLambdaFlushTask(FunctionType &&function, search::SerialNum flushSerial)
+{
+ return std::make_unique<LambdaFlushTask<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function), flushSerial);
+}
+
+}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
index 1fe23dd16ae..348dbc97f25 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -9,7 +9,7 @@
#include "indexreadutilities.h"
#include "indexwriteutilities.h"
#include <vespa/fastos/file.h>
-#include <vespa/searchcorespi/flush/closureflushtask.h>
+#include <vespa/searchcorespi/flush/lambdaflushtask.h>
#include <vespa/searchlib/common/i_flush_token.h>
#include <vespa/searchlib/index/schemautil.h>
#include <vespa/searchlib/util/dirtraverse.h>
@@ -20,6 +20,7 @@
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/time.h>
#include <sstream>
@@ -36,6 +37,7 @@ using search::queryeval::ISourceSelector;
using search::queryeval::Source;
using search::SerialNum;
using vespalib::makeLambdaTask;
+using vespalib::makeLambdaCallback;
using std::ostringstream;
using vespalib::makeClosure;
using vespalib::makeTask;
@@ -43,6 +45,7 @@ using vespalib::string;
using vespalib::Closure0;
using vespalib::Executor;
using vespalib::Runnable;
+using vespalib::IDestructorCallback;
namespace searchcorespi::index {
@@ -62,7 +65,7 @@ public:
_closure(std::move(closure))
{ }
- virtual void run() override {
+ void run() override {
_result = _reconfigurer.reconfigure(std::move(_closure));
}
};
@@ -76,7 +79,7 @@ public:
_reconfigurer(reconfigurer),
_closure(std::move(closure))
{ }
- virtual void run() override {
+ void run() override {
_reconfigurer.reconfigure(std::move(_closure));
}
};
@@ -84,18 +87,18 @@ public:
SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max();
-class DiskIndexWithDestructorClosure : public IDiskIndex {
+class DiskIndexWithDestructorCallback : public IDiskIndex {
private:
- vespalib::AutoClosureCaller _caller;
- IDiskIndex::SP _index;
+ std::shared_ptr<IDestructorCallback> _callback;
+ IDiskIndex::SP _index;
public:
- DiskIndexWithDestructorClosure(const IDiskIndex::SP &index,
- vespalib::Closure::UP closure)
- : _caller(std::move(closure)),
- _index(index)
+ DiskIndexWithDestructorCallback(IDiskIndex::SP index,
+ std::shared_ptr<IDestructorCallback> callback) noexcept
+ : _callback(std::move(callback)),
+ _index(std::move(index))
{ }
- ~DiskIndexWithDestructorClosure();
+ ~DiskIndexWithDestructorCallback() override;
const IDiskIndex &getWrapped() const { return *_index; }
/**
@@ -140,15 +143,22 @@ public:
};
-DiskIndexWithDestructorClosure::~DiskIndexWithDestructorClosure() {}
+DiskIndexWithDestructorCallback::~DiskIndexWithDestructorCallback() = default;
} // namespace
-IndexMaintainer::FusionArgs::~FusionArgs() {
-}
+IndexMaintainer::FusionArgs::FusionArgs()
+ : _new_fusion_id(0u),
+ _changeGens(),
+ _schema(),
+ _prunedSchema(),
+ _old_source_list()
+{ }
-IndexMaintainer::SetSchemaArgs::~SetSchemaArgs() {
-}
+IndexMaintainer::FusionArgs::~FusionArgs() = default;
+
+IndexMaintainer::SetSchemaArgs::SetSchemaArgs() = default;
+IndexMaintainer::SetSchemaArgs::~SetSchemaArgs() = default;
uint32_t
IndexMaintainer::getNewAbsoluteId()
@@ -176,9 +186,8 @@ IndexMaintainer::reopenDiskIndexes(ISearchableIndexCollection &coll)
uint32_t count = coll.getSourceCount();
for (uint32_t i = 0; i < count; ++i) {
IndexSearchable &is = coll.getSearchable(i);
- const DiskIndexWithDestructorClosure *const d =
- dynamic_cast<const DiskIndexWithDestructorClosure *>(&is);
- if (d == NULL) {
+ const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is);
+ if (d == nullptr) {
continue; // not a disk index
}
const string indexDir = d->getIndexDir();
@@ -215,11 +224,10 @@ IndexMaintainer::updateIndexSchemas(IIndexCollection &coll,
uint32_t count = coll.getSourceCount();
for (uint32_t i = 0; i < count; ++i) {
IndexSearchable &is = coll.getSearchable(i);
- const DiskIndexWithDestructorClosure *const d =
- dynamic_cast<const DiskIndexWithDestructorClosure *>(&is);
- if (d == NULL) {
+ const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is);
+ if (d == nullptr) {
IMemoryIndex *const m = dynamic_cast<IMemoryIndex *>(&is);
- if (m != NULL) {
+ if (m != nullptr) {
m->pruneRemovedFields(schema);
}
continue;
@@ -245,10 +253,10 @@ IndexMaintainer::updateActiveFusionPrunedSchema(const Schema &schema)
return; // No active fusion
if (!activeFusionPrunedSchema) {
Schema::UP newSchema = Schema::intersect(*activeFusionSchema, schema);
- newActiveFusionPrunedSchema.reset(newSchema.release());
+ newActiveFusionPrunedSchema = std::move(newSchema);
} else {
Schema::UP newSchema = Schema::intersect(*activeFusionPrunedSchema, schema);
- newActiveFusionPrunedSchema.reset(newSchema.release());
+ newActiveFusionPrunedSchema = std::move(newSchema);
}
{
LockGuard slock(_state_lock);
@@ -279,9 +287,9 @@ IndexMaintainer::loadDiskIndex(const string &indexDir)
}
vespalib::Timer timer;
_active_indexes->setActive(indexDir);
- IDiskIndex::SP retval(new DiskIndexWithDestructorClosure
- (_operations.loadDiskIndex(indexDir),
- makeClosure(this, &IndexMaintainer::deactivateDiskIndexes, indexDir)));
+ auto retval = std::make_shared<DiskIndexWithDestructorCallback>(
+ _operations.loadDiskIndex(indexDir),
+ makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }));
if (LOG_WOULD_LOG(event)) {
EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed()));
}
@@ -298,11 +306,10 @@ IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex)
}
vespalib::Timer timer;
_active_indexes->setActive(indexDir);
- const IDiskIndex &wrappedDiskIndex =
- (dynamic_cast<const DiskIndexWithDestructorClosure &>(oldIndex)).getWrapped();
- IDiskIndex::SP retval(new DiskIndexWithDestructorClosure
- (_operations.reloadDiskIndex(wrappedDiskIndex),
- makeClosure(this, &IndexMaintainer::deactivateDiskIndexes, indexDir)));
+ const IDiskIndex &wrappedDiskIndex = (dynamic_cast<const DiskIndexWithDestructorCallback &>(oldIndex)).getWrapped();
+ auto retval = std::make_shared<DiskIndexWithDestructorCallback>(
+ _operations.reloadDiskIndex(wrappedDiskIndex),
+ makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }));
if (LOG_WOULD_LOG(event)) {
EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed()));
}
@@ -425,7 +432,7 @@ ISearchableIndexCollection::UP
IndexMaintainer::createNewSourceCollection(const LockGuard &newSearchLock)
{
ISearchableIndexCollection::SP currentLeaf(getLeaf(newSearchLock, _source_list));
- return ISearchableIndexCollection::UP(new IndexCollection(_selector, *currentLeaf));
+ return std::make_unique<IndexCollection>(_selector, *currentLeaf);
}
IndexMaintainer::FlushArgs::FlushArgs()
@@ -434,7 +441,7 @@ IndexMaintainer::FlushArgs::FlushArgs()
old_source_list(),
save_info(),
flush_serial_num(),
- stats(NULL),
+ stats(nullptr),
_skippedEmptyLast(false),
_extraIndexes(),
_changeGens(),
@@ -508,7 +515,7 @@ IndexMaintainer::doFlush(FlushArgs args)
}
assert(!flushIds.empty());
- if (args.stats != NULL) {
+ if (args.stats != nullptr) {
updateFlushStats(args);
}
@@ -665,7 +672,7 @@ IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index)
LockGuard lock(_index_update_lock);
// make new source selector with shifted values.
- _selector.reset(getSourceSelector().cloneAndSubtract(ost.str(), id_diff).release());
+ _selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff);
_source_selector_changes = 0;
_current_index_id -= id_diff;
_last_fusion_id = args->_new_fusion_id;
@@ -720,7 +727,7 @@ IndexMaintainer::warmupDone(ISearchableIndexCollection::SP current)
LockGuard lock(_new_search_lock);
if (current == _source_list) {
auto makeSure = makeClosure(this, &IndexMaintainer::makeSureAllRemainingWarmupIsDone, current);
- Executor::Task::UP task(new ReconfigRunnableTask(_ctx.getReconfigurer(), std::move(makeSure)));
+ auto task = std::make_unique<ReconfigRunnableTask>(_ctx.getReconfigurer(), std::move(makeSure));
_ctx.getThreadingService().master().execute(std::move(task));
} else {
LOG(warning, "There has arrived a new IndexCollection while replacing the active index. "
@@ -838,7 +845,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
IIndexMaintainerOperations &operations)
: _base_dir(config.getBaseDir()),
_warmupConfig(config.getWarmup()),
- _active_indexes(new ActiveDiskIndexes()),
+ _active_indexes(std::make_shared<ActiveDiskIndexes>()),
_layout(config.getBaseDir()),
_schema(config.getSchema()),
_activeFusionSchema(),
@@ -885,10 +892,10 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
_lastFlushTime = search::FileKit::getModificationTime(latest_index_dir);
_current_serial_num = _flush_serial_num;
const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir);
- _selector.reset(FixedSourceSelector::load(selector, _next_id - 1).release());
+ _selector = FixedSourceSelector::load(selector, _next_id - 1);
} else {
_flush_serial_num = 0;
- _selector.reset(new FixedSourceSelector(0, "sourceselector", 1));
+ _selector = std::make_shared<FixedSourceSelector>(0, "sourceselector", 1);
}
uint32_t baseId(_selector->getBaseId());
if (_last_fusion_id != baseId) {
@@ -896,20 +903,22 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
uint32_t id_diff = _last_fusion_id - baseId;
ostringstream ost;
ost << "sourceselector_fusion(" << _last_fusion_id << ")";
- _selector.reset(getSourceSelector().cloneAndSubtract(ost.str(), id_diff).release());
+ _selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff);
assert(_last_fusion_id == _selector->getBaseId());
}
_current_index_id = getNewAbsoluteId() - _last_fusion_id;
assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
_selector->setDefaultSource(_current_index_id);
- ISearchableIndexCollection::UP sourceList(loadDiskIndexes(spec, ISearchableIndexCollection::UP(new IndexCollection(_selector))));
+ auto sourceList = loadDiskIndexes(spec, std::make_unique<IndexCollection>(_selector));
_current_index = operations.createMemoryIndex(_schema, *sourceList, _current_serial_num);
LOG(debug, "Index manager created with flushed serial num %" PRIu64, _flush_serial_num);
sourceList->append(_current_index_id, _current_index);
sourceList->setCurrentIndex(_current_index_id);
_source_list = std::move(sourceList);
_fusion_spec = spec;
- _ctx.getThreadingService().master().execute(makeLambdaTask([this,&config]() {pruneRemovedFields(_schema, config.getSerialNum()); }));
+ _ctx.getThreadingService().master().execute(makeLambdaTask([this,&config]() {
+ pruneRemovedFields(_schema, config.getSerialNum());
+ }));
_ctx.getThreadingService().master().sync();
}
@@ -951,7 +960,7 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat
return FlushTask::UP();
}
SerialNum realSerialNum = args.flush_serial_num;
- return makeFlushTask(makeClosure(this, &IndexMaintainer::doFlush, std::move(args)), realSerialNum);
+ return makeLambdaFlushTask([this, myargs=std::move(args)]() mutable { doFlush(std::move(myargs)); }, realSerialNum);
}
FusionSpec
@@ -1014,7 +1023,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search
{
LockGuard slock(_state_lock);
LockGuard ilock(_index_update_lock);
- _activeFusionSchema.reset(new Schema(_schema));
+ _activeFusionSchema = std::make_shared<Schema>(_schema);
_activeFusionPrunedSchema.reset();
args._schema = _schema;
}
@@ -1234,10 +1243,9 @@ IndexMaintainer::getFlushTargets(void)
{
// Called by flush engine scheduler thread
IFlushTarget::List ret;
- IFlushTarget::SP indexFlush(new IndexFlushTarget(*this));
- IFlushTarget::SP indexFusion(new IndexFusionTarget(*this));
- ret.push_back(indexFlush);
- ret.push_back(indexFusion);
+ ret.reserve(2);
+ ret.push_back(std::make_shared<IndexFlushTarget>(*this));
+ ret.push_back(std::make_shared<IndexFusionTarget>(*this));
return ret;
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
index d9d2479833f..35c0c0fbd2f 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -224,17 +224,10 @@ class IndexMaintainer : public IIndexManager,
Schema::SP _prunedSchema;
ISearchableIndexCollection::SP _old_source_list; // Delays destruction
- FusionArgs()
- : _new_fusion_id(0u),
- _changeGens(),
- _schema(),
- _prunedSchema(),
- _old_source_list()
- { }
+ FusionArgs();
~FusionArgs();
};
- IFlushTarget::SP getFusionTarget();
void scheduleFusion(const FlushIds &flushIds);
bool canRunFusion(const FusionSpec &spec) const;
bool doneFusion(FusionArgs *args, IDiskIndex::SP *new_index);
@@ -246,12 +239,7 @@ class IndexMaintainer : public IIndexManager,
IMemoryIndex::SP _oldIndex;
ISearchableIndexCollection::SP _oldSourceList; // Delays destruction
- SetSchemaArgs(void)
- : _newSchema(),
- _oldSchema(),
- _oldIndex(),
- _oldSourceList()
- { }
+ SetSchemaArgs();
~SetSchemaArgs();
};
@@ -268,7 +256,7 @@ class IndexMaintainer : public IIndexManager,
* result.
*/
bool reconfigure(vespalib::Closure0<bool>::UP closure);
- virtual void warmupDone(ISearchableIndexCollection::SP current) override;
+ void warmupDone(ISearchableIndexCollection::SP current) override;
bool makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive);
void scheduleCommit();
void commit();
@@ -371,4 +359,3 @@ public:
};
}
-