diff options
35 files changed, 682 insertions, 721 deletions
diff --git a/document/src/tests/repo/documenttyperepo_test.cpp b/document/src/tests/repo/documenttyperepo_test.cpp index 0bc80ebcd16..7d17a3cfa11 100644 --- a/document/src/tests/repo/documenttyperepo_test.cpp +++ b/document/src/tests/repo/documenttyperepo_test.cpp @@ -392,10 +392,6 @@ TEST("requireThatDocumentsCanUseOtherDocumentTypes") { EXPECT_TRUE(dynamic_cast<const DocumentType *>(&type)); } -void storeId(set<int> *s, const DocumentType &type) { - s->insert(type.getId()); -} - TEST("requireThatDocumentTypesCanBeIterated") { DocumenttypesConfigBuilderHelper builder; builder.document(doc_type_id, type_name, @@ -405,7 +401,8 @@ TEST("requireThatDocumentTypesCanBeIterated") { DocumentTypeRepo repo(builder.config()); set<int> ids; - repo.forEachDocumentType(*makeClosure(storeId, &ids)); + repo.forEachDocumentType(*DocumentTypeRepo::makeLambda( + [&ids](const DocumentType &type) { ids.insert(type.getId()); })); EXPECT_EQUAL(3u, ids.size()); ASSERT_TRUE(ids.count(DataType::T_DOCUMENT)); @@ -436,8 +433,7 @@ TEST("requireThatBuildFromConfigWorks") { TEST("requireThatStructsCanBeRecursive") { DocumenttypesConfigBuilderHelper builder; builder.document(doc_type_id, type_name, - Struct(header_name).setId(header_id).addField(field_name, - header_id), + Struct(header_name).setId(header_id).addField(field_name, header_id), Struct(body_name)); DocumentTypeRepo repo(builder.config()); diff --git a/document/src/vespa/document/fieldset/fieldsetrepo.cpp b/document/src/vespa/document/fieldset/fieldsetrepo.cpp index bf9c9923572..f16387810ce 100644 --- a/document/src/vespa/document/fieldset/fieldsetrepo.cpp +++ b/document/src/vespa/document/fieldset/fieldsetrepo.cpp @@ -119,7 +119,9 @@ FieldSetRepo::FieldSetRepo(const DocumentTypeRepo& repo) : _doumentTyperepo(repo), _configuredFieldSets() { - repo.forEachDocumentType(*vespalib::makeClosure(this, &FieldSetRepo::configureDocumentType)); + repo.forEachDocumentType(*DocumentTypeRepo::makeLambda([&](const DocumentType &type) { + configureDocumentType(type); + })); } FieldSetRepo::~FieldSetRepo() = default; diff --git a/document/src/vespa/document/repo/documenttyperepo.cpp b/document/src/vespa/document/repo/documenttyperepo.cpp index 578d2999038..50ab6aaa646 100644 --- a/document/src/vespa/document/repo/documenttyperepo.cpp +++ b/document/src/vespa/document/repo/documenttyperepo.cpp @@ -587,9 +587,9 @@ DocumentTypeRepo::getAnnotationType(const DocumentType &doc_type, int32_t id) co } void -DocumentTypeRepo::forEachDocumentType(Closure1<const DocumentType &> &c) const { +DocumentTypeRepo::forEachDocumentType(Handler & handler) const { for (const auto & entry : *_doc_types) { - c.call(*entry.second->doc_type); + handler.handle(*entry.second->doc_type); } } diff --git a/document/src/vespa/document/repo/documenttyperepo.h b/document/src/vespa/document/repo/documenttyperepo.h index fd17bd5640a..4e3a1b07619 100644 --- a/document/src/vespa/document/repo/documenttyperepo.h +++ b/document/src/vespa/document/repo/documenttyperepo.h @@ -19,11 +19,21 @@ struct DataTypeRepo; class DocumentType; class DocumentTypeRepo { - std::unique_ptr<internal::DocumentTypeMap> _doc_types; - const DocumentType * _default; - public: using DocumenttypesConfig = const internal::InternalDocumenttypesType; + struct Handler { + virtual ~Handler() = default; + virtual void handle(const DocumentType & type) = 0; + }; + + + template <class FunctionType> + static std::unique_ptr<Handler> + makeLambda(FunctionType &&function) + { + return std::make_unique<LambdaHandler<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function)); + } // This one should only be used for testing. If you do not have any config. explicit DocumentTypeRepo(const DocumentType & docType); @@ -39,8 +49,22 @@ public: const DataType *getDataType(const DocumentType &doc_type, int32_t id) const; const DataType *getDataType(const DocumentType &doc_type, vespalib::stringref name) const; const AnnotationType *getAnnotationType(const DocumentType &doc_type, int32_t id) const; - void forEachDocumentType(vespalib::Closure1<const DocumentType &> &c) const; + void forEachDocumentType(Handler & handler) const; const DocumentType *getDefaultDocType() const { return _default; } +private: + template <class FunctionType> + class LambdaHandler : public Handler { + FunctionType _func; + public: + LambdaHandler(FunctionType &&func) : _func(std::move(func)) {} + LambdaHandler(const LambdaHandler &) = delete; + LambdaHandler & operator = (const LambdaHandler &) = delete; + ~LambdaHandler() override = default; + void handle(const DocumentType & type) override { _func(type); } + }; + + std::unique_ptr<internal::DocumentTypeMap> _doc_types; + const DocumentType * _default; }; } // namespace document diff --git a/document/src/vespa/document/select/bodyfielddetector.cpp b/document/src/vespa/document/select/bodyfielddetector.cpp index 3d32813621d..d1961810c7a 100644 --- a/document/src/vespa/document/select/bodyfielddetector.cpp +++ b/document/src/vespa/document/select/bodyfielddetector.cpp @@ -28,7 +28,9 @@ BodyFieldDetector::detectFieldType(const FieldValueNode *expr, const DocumentTyp void BodyFieldDetector::visitFieldValueNode(const FieldValueNode& expr) { - _repo.forEachDocumentType(*makeClosure(this, &BodyFieldDetector::detectFieldType, &expr)); + _repo.forEachDocumentType(*DocumentTypeRepo::makeLambda([&](const DocumentType &type) { + detectFieldType(&expr, type); + })); } diff --git a/jdisc_http_service/pom.xml b/jdisc_http_service/pom.xml index 68a0f0636d3..2baba974b03 100644 --- a/jdisc_http_service/pom.xml +++ b/jdisc_http_service/pom.xml @@ -151,6 +151,11 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java index 15a38f6eeae..a6b800a9279 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java @@ -2,13 +2,6 @@ package com.yahoo.container.logging; -import com.yahoo.slime.Cursor; -import com.yahoo.slime.Slime; -import com.yahoo.slime.SlimeUtils; -import com.yahoo.slime.Type; -import com.yahoo.yolean.Exceptions; - -import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Optional; import java.util.UUID; @@ -69,75 +62,6 @@ public class ConnectionLogEntry { this.sslHandshakeFailureType = builder.sslHandshakeFailureType; } - public String toJson() { - Slime slime = new Slime(); - Cursor cursor = slime.setObject(); - cursor.setString("id", id.toString()); - setTimestamp(cursor, timestamp, "timestamp"); - - setDouble(cursor, durationSeconds, "duration"); - setString(cursor, peerAddress, "peerAddress"); - setInteger(cursor, peerPort, "peerPort"); - setString(cursor, localAddress, "localAddress"); - setInteger(cursor, localPort, "localPort"); - setString(cursor, remoteAddress, "remoteAddress"); - setInteger(cursor, remotePort, "remotePort"); - setLong(cursor, httpBytesReceived, "httpBytesReceived"); - setLong(cursor, httpBytesSent, "httpBytesSent"); - setLong(cursor, requests, "requests"); - setLong(cursor, responses, "responses"); - setString(cursor, sslProtocol, "ssl", "protocol"); - setString(cursor, sslSessionId, "ssl", "sessionId"); - setString(cursor, sslCipherSuite, "ssl", "cipherSuite"); - setString(cursor, sslPeerSubject, "ssl", "peerSubject"); - setTimestamp(cursor, sslPeerNotBefore, "ssl", "peerNotBefore"); - setTimestamp(cursor, sslPeerNotAfter, "ssl", "peerNotAfter"); - setString(cursor, sslSniServerName, "ssl", "sniServerName"); - setString(cursor, sslHandshakeFailureException, "ssl", "handshake-failure", "exception"); - setString(cursor, sslHandshakeFailureMessage, "ssl", "handshake-failure", "message"); - setString(cursor, sslHandshakeFailureType, "ssl", "handshake-failure", "type"); - return new String(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(slime)), StandardCharsets.UTF_8); - } - - private void setString(Cursor cursor, String value, String... keys) { - if(value != null) { - subCursor(cursor, keys).setString(keys[keys.length - 1], value); - } - } - - private void setLong(Cursor cursor, Long value, String... keys) { - if (value != null) { - subCursor(cursor, keys).setLong(keys[keys.length - 1], value); - } - } - - private void setInteger(Cursor cursor, Integer value, String... keys) { - if (value != null) { - subCursor(cursor, keys).setLong(keys[keys.length - 1], value); - } - } - - private void setTimestamp(Cursor cursor, Instant timestamp, String... keys) { - if (timestamp != null) { - subCursor(cursor, keys).setString(keys[keys.length - 1], timestamp.toString()); - } - } - - private void setDouble(Cursor cursor, Double value, String... keys) { - if (value != null) { - subCursor(cursor, keys).setDouble(keys[keys.length - 1], value); - } - } - - private static Cursor subCursor(Cursor cursor, String... keys) { - Cursor subCursor = cursor; - for (int i = 0; i < keys.length - 1; ++i) { - Cursor field = subCursor.field(keys[i]); - subCursor = field.type() != Type.NIX ? field : subCursor.setObject(keys[i]); - } - return subCursor; - } - public static Builder builder(UUID id, Instant timestamp) { return new Builder(id, timestamp); } diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java index 62e53a5a514..968ba74b4f2 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java @@ -5,23 +5,19 @@ package com.yahoo.container.logging; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.logging.Level; import java.util.logging.Logger; /** * @author mortent */ -public class FileConnectionLog extends AbstractComponent implements ConnectionLog, LogWriter<ConnectionLogEntry> { +public class FileConnectionLog extends AbstractComponent implements ConnectionLog { private static final Logger logger = Logger.getLogger(FileConnectionLog.class.getName()); private final ConnectionLogHandler logHandler; @Inject public FileConnectionLog(ConnectionLogConfig config) { - logHandler = new ConnectionLogHandler(config.cluster(), this); + logHandler = new ConnectionLogHandler(config.cluster(), new JsonConnectionLogWriter()); } @Override @@ -34,9 +30,4 @@ public class FileConnectionLog extends AbstractComponent implements ConnectionLo logHandler.shutdown(); } - @Override - // TODO serialize directly to outputstream - public void write(ConnectionLogEntry entry, OutputStream outputStream) throws IOException { - outputStream.write(entry.toJson().getBytes(StandardCharsets.UTF_8)); - } }
\ No newline at end of file diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java new file mode 100644 index 00000000000..ee780ad2a83 --- /dev/null +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java @@ -0,0 +1,46 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.logging; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; + +/** + * @author bjorncs + */ +class FormatUtil { + + private FormatUtil() {} + + static void writeSecondsField(JsonGenerator generator, String fieldName, Instant instant) throws IOException { + writeSecondsField(generator, fieldName, instant.toEpochMilli()); + } + + static void writeSecondsField(JsonGenerator generator, String fieldName, Duration duration) throws IOException { + writeSecondsField(generator, fieldName, duration.toMillis()); + } + + static void writeSecondsField(JsonGenerator generator, String fieldName, double seconds) throws IOException { + writeSecondsField(generator, fieldName, (long)(seconds * 1000)); + } + + static void writeSecondsField(JsonGenerator generator, String fieldName, long milliseconds) throws IOException { + generator.writeFieldName(fieldName); + generator.writeRawValue(toSecondsString(milliseconds)); + } + + /** @return a string with number of seconds with 3 decimals */ + static String toSecondsString(long milliseconds) { + StringBuilder builder = new StringBuilder().append(milliseconds / 1000L).append('.'); + long decimals = milliseconds % 1000; + if (decimals < 100) { + builder.append('0'); + if (decimals < 10) { + builder.append('0'); + } + } + return builder.append(decimals).toString(); + } +} diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java index c6d177684ac..441e139bc67 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java @@ -15,6 +15,8 @@ import java.util.Objects; import java.util.logging.Level; import java.util.logging.Logger; +import static com.yahoo.container.logging.FormatUtil.writeSecondsField; + /** * Formatting of an {@link AccessLogEntry} in the Vespa JSON access log format. * @@ -45,8 +47,8 @@ public class JSONFormatter implements LogWriter<RequestLogEntry> { String peerAddress = entry.peerAddress().get(); generator.writeStringField("ip", peerAddress); long time = entry.timestamp().get().toEpochMilli(); - writeSeconds(generator, "time", time); - writeSeconds(generator, "duration", entry.duration().get().toMillis()); + FormatUtil.writeSecondsField(generator, "time", time); + FormatUtil.writeSecondsField(generator, "duration", entry.duration().get()); generator.writeNumberField("responsesize", entry.contentSize().orElse(0)); generator.writeNumberField("code", entry.statusCode().orElse(0)); generator.writeStringField("method", entry.httpMethod().orElse("")); @@ -185,23 +187,4 @@ public class JSONFormatter implements LogWriter<RequestLogEntry> { if (rawPath == null) return null; return rawQuery != null ? rawPath + "?" + rawQuery : rawPath; } - - private static void writeSeconds(JsonGenerator generator, String fieldName, long milliseconds) throws IOException { - generator.writeFieldName(fieldName); - generator.writeRawValue(toSecondsString(milliseconds)); - } - - /** @return a string with number of seconds with 3 decimals */ - private static String toSecondsString(long milliseconds) { - StringBuilder builder = new StringBuilder().append(milliseconds / 1000L).append('.'); - long decimals = milliseconds % 1000; - if (decimals < 100) { - builder.append('0'); - if (decimals < 10) { - builder.append('0'); - } - } - return builder.append(decimals).toString(); - } - } diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java new file mode 100644 index 00000000000..394f87c07cc --- /dev/null +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java @@ -0,0 +1,117 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.logging; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.OutputStream; +import java.time.Instant; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; + +/** + * @author bjorncs + */ +class JsonConnectionLogWriter implements LogWriter<ConnectionLogEntry> { + + private final JsonFactory jsonFactory = new JsonFactory(new ObjectMapper()); + + @Override + public void write(ConnectionLogEntry record, OutputStream outputStream) throws IOException { + try (JsonGenerator generator = createJsonGenerator(outputStream)) { + generator.writeStartObject(); + generator.writeStringField("id", record.id()); + generator.writeStringField("timestamp", record.timestamp().toString()); + + writeOptionalSeconds(generator, "duration", unwrap(record.durationSeconds())); + writeOptionalString(generator, "peerAddress", unwrap(record.peerAddress())); + writeOptionalInteger(generator, "peerPort", unwrap(record.peerPort())); + writeOptionalString(generator, "localAddress", unwrap(record.localAddress())); + writeOptionalInteger(generator, "localPort", unwrap(record.localPort())); + writeOptionalString(generator, "remoteAddress", unwrap(record.remoteAddress())); + writeOptionalInteger(generator, "remotePort", unwrap(record.remotePort())); + writeOptionalLong(generator, "httpBytesReceived", unwrap(record.httpBytesReceived())); + writeOptionalLong(generator, "httpBytesSent", unwrap(record.httpBytesSent())); + writeOptionalLong(generator, "requests", unwrap(record.requests())); + writeOptionalLong(generator, "responses", unwrap(record.responses())); + + String sslProtocol = unwrap(record.sslProtocol()); + String sslSessionId = unwrap(record.sslSessionId()); + String sslCipherSuite = unwrap(record.sslCipherSuite()); + String sslPeerSubject = unwrap(record.sslPeerSubject()); + Instant sslPeerNotBefore = unwrap(record.sslPeerNotBefore()); + Instant sslPeerNotAfter = unwrap(record.sslPeerNotAfter()); + String sslSniServerName = unwrap(record.sslSniServerName()); + String sslHandshakeFailureException = unwrap(record.sslHandshakeFailureException()); + String sslHandshakeFailureMessage = unwrap(record.sslHandshakeFailureMessage()); + String sslHandshakeFailureType = unwrap(record.sslHandshakeFailureType()); + + if (isAnyValuePresent( + sslProtocol, sslSessionId, sslCipherSuite, sslPeerSubject, sslPeerNotBefore, sslPeerNotAfter, + sslSniServerName, sslHandshakeFailureException, sslHandshakeFailureMessage, sslHandshakeFailureType)) { + generator.writeObjectFieldStart("ssl"); + + writeOptionalString(generator, "protocol", sslProtocol); + writeOptionalString(generator, "sessionId", sslSessionId); + writeOptionalString(generator, "cipherSuite", sslCipherSuite); + writeOptionalString(generator, "peerSubject", sslPeerSubject); + writeOptionalTimestamp(generator, "peerNotBefore", sslPeerNotBefore); + writeOptionalTimestamp(generator, "peerNotAfter", sslPeerNotAfter); + writeOptionalString(generator, "sniServerName", sslSniServerName); + + if (isAnyValuePresent(sslHandshakeFailureException, sslHandshakeFailureMessage, sslHandshakeFailureType)) { + generator.writeObjectFieldStart("handshake-failure"); + writeOptionalString(generator, "exception", sslHandshakeFailureException); + writeOptionalString(generator, "message", sslHandshakeFailureMessage); + writeOptionalString(generator, "type", sslHandshakeFailureType); + generator.writeEndObject(); + } + + generator.writeEndObject(); + } + } + } + + private void writeOptionalString(JsonGenerator generator, String name, String value) throws IOException { + if (value != null) { + generator.writeStringField(name, value); + } + } + + private void writeOptionalInteger(JsonGenerator generator, String name, Integer value) throws IOException { + if (value != null) { + generator.writeNumberField(name, value); + } + } + + private void writeOptionalLong(JsonGenerator generator, String name, Long value) throws IOException { + if (value != null) { + generator.writeNumberField(name, value); + } + } + + private void writeOptionalTimestamp(JsonGenerator generator, String name, Instant value) throws IOException { + if (value != null) { + generator.writeStringField(name, value.toString()); + } + } + + private void writeOptionalSeconds(JsonGenerator generator, String name, Double value) throws IOException { + if (value != null) { + FormatUtil.writeSecondsField(generator, name, value); + } + } + + private static boolean isAnyValuePresent(Object... values) { return Arrays.stream(values).anyMatch(Objects::nonNull); } + private static <T> T unwrap(Optional<T> maybeValue) { return maybeValue.orElse(null); } + + private JsonGenerator createJsonGenerator(OutputStream outputStream) throws IOException { + return jsonFactory.createGenerator(outputStream, JsonEncoding.UTF8) + .configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false) + .configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false); + } +} diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java index d0f31a6b866..bfb51d21c6c 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java @@ -5,7 +5,6 @@ import com.yahoo.compress.ZstdOuputStream; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.io.NativeIO; import com.yahoo.log.LogFileDb; -import com.yahoo.protect.Process; import com.yahoo.system.ProcessExecuter; import com.yahoo.yolean.Exceptions; @@ -19,13 +18,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.GZIPOutputStream; @@ -36,15 +33,74 @@ import java.util.zip.GZIPOutputStream; * @author Bob Travis * @author bjorncs */ -class LogFileHandler <LOGTYPE> { +class LogFileHandler <LOGTYPE> { - enum Compression {NONE, GZIP, ZSTD} + enum Compression { NONE, GZIP, ZSTD } private final static Logger logger = Logger.getLogger(LogFileHandler.class.getName()); - private final ArrayBlockingQueue<Operation<LOGTYPE>> logQueue = new ArrayBlockingQueue<>(10000); - final LogThread<LOGTYPE> logThread; - @FunctionalInterface private interface Pollable<T> { Operation<T> poll() throws InterruptedException; } + private final Compression compression; + private final long[] rotationTimes; + private final String filePattern; // default to current directory, ms time stamp + private final String symlinkName; + private final ArrayBlockingQueue<LOGTYPE> logQueue = new ArrayBlockingQueue<>(100000); + private final AtomicBoolean rotate = new AtomicBoolean(false); + private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("logfilehandler.compression")); + private final NativeIO nativeIO = new NativeIO(); + private final LogThread<LOGTYPE> logThread; + + private volatile FileOutputStream currentOutputStream = null; + private volatile long nextRotationTime = 0; + private volatile String fileName; + private volatile long lastDropPosition = 0; + + private final LogWriter<LOGTYPE> logWriter; + + static private class LogThread<LOGTYPE> extends Thread { + final LogFileHandler<LOGTYPE> logFileHandler; + long lastFlush = 0; + LogThread(LogFileHandler<LOGTYPE> logFile) { + super("Logger"); + setDaemon(true); + logFileHandler = logFile; + } + @Override + public void run() { + try { + storeLogRecords(); + } catch (InterruptedException e) { + } catch (Exception e) { + com.yahoo.protect.Process.logAndDie("Failed storing log records", e); + } + + logFileHandler.flush(); + } + + private void storeLogRecords() throws InterruptedException { + while (!isInterrupted()) { + LOGTYPE r = logFileHandler.logQueue.poll(100, TimeUnit.MILLISECONDS); + if(logFileHandler.rotate.get()) { + logFileHandler.internalRotateNow(); + lastFlush = System.nanoTime(); + logFileHandler.rotate.set(false); + } + if (r != null) { + logFileHandler.internalPublish(r); + flushIfOld(3, TimeUnit.SECONDS); + } else { + flushIfOld(100, TimeUnit.MILLISECONDS); + } + } + } + + private void flushIfOld(long age, TimeUnit unit) { + long now = System.nanoTime(); + if (TimeUnit.NANOSECONDS.toMillis(now - lastFlush) > unit.toMillis(age)) { + logFileHandler.flush(); + lastFlush = now; + } + } + } LogFileHandler(Compression compression, String filePattern, String rotationTimes, String symlinkName, LogWriter<LOGTYPE> logWriter) { this(compression, filePattern, calcTimesMinutes(rotationTimes), symlinkName, logWriter); @@ -54,433 +110,320 @@ class LogFileHandler <LOGTYPE> { Compression compression, String filePattern, long[] rotationTimes, - String symlinkName, - LogWriter<LOGTYPE> logWriter) { - this.logThread = new LogThread<LOGTYPE>(logWriter, filePattern, compression, rotationTimes, symlinkName, this::poll); + String symlinkName, LogWriter<LOGTYPE> logWriter) { + this.compression = compression; + this.filePattern = filePattern; + this.rotationTimes = rotationTimes; + this.symlinkName = (symlinkName != null && !symlinkName.isBlank()) ? symlinkName : null; + this.logWriter = logWriter; + this.logThread = new LogThread<>(this); this.logThread.start(); } - private Operation<LOGTYPE> poll() throws InterruptedException { - return logQueue.poll(100, TimeUnit.MILLISECONDS); - } - /** * Sends logrecord to file, first rotating file if needed. * * @param r logrecord to publish */ public void publish(LOGTYPE r) { - addOperation(new Operation<>(r)); - } - - public void flush() { - addOperationAndWait(new Operation<>(Operation.Type.flush)); - } - - /** - * Force file rotation now, independent of schedule. - */ - void rotateNow() { - addOperationAndWait(new Operation<>(Operation.Type.rotate)); - } - - public void close() { - addOperationAndWait(new Operation<>(Operation.Type.close)); + try { + logQueue.put(r); + } catch (InterruptedException e) { + } } - private void addOperation(Operation<LOGTYPE> op) { + public synchronized void flush() { try { - logQueue.put(op); - } catch (InterruptedException e) { + FileOutputStream currentOut = this.currentOutputStream; + if (currentOut != null) { + if (compression == Compression.GZIP) { + long newPos = currentOut.getChannel().position(); + if (newPos > lastDropPosition + 102400) { + nativeIO.dropPartialFileFromCache(currentOut.getFD(), lastDropPosition, newPos, true); + lastDropPosition = newPos; + } + } else { + currentOut.flush(); + } + } + } catch (IOException e) { + logger.warning("Failed dropping from cache : " + Exceptions.toMessageString(e)); } } - private void addOperationAndWait(Operation<LOGTYPE> op) { + public void close() { try { - logQueue.put(op); - op.countDownLatch.await(); - } catch (InterruptedException e) { + flush(); + FileOutputStream currentOut = this.currentOutputStream; + if (currentOut != null) currentOut.close(); + } catch (Exception e) { + logger.log(Level.WARNING, "Got error while closing log file", e); } } - /** - * Flushes all queued messages, interrupts the log thread in this and - * waits for it to end before returning - */ - void shutdown() { - logThread.interrupt(); + private void internalPublish(LOGTYPE r) { + // first check to see if new file needed. + // if so, use this.internalRotateNow() to do it + + long now = System.currentTimeMillis(); + if (nextRotationTime <= 0) { + nextRotationTime = getNextRotationTime(now); // lazy initialization + } + if (now > nextRotationTime || currentOutputStream == null) { + internalRotateNow(); + } try { - logThread.executor.shutdownNow(); - logThread.executor.awaitTermination(600, TimeUnit.SECONDS); - logThread.join(); - } catch (InterruptedException e) { + FileOutputStream out = this.currentOutputStream; + logWriter.write(r, out); + out.write('\n'); + } catch (IOException e) { + logger.warning("Failed writing log record: " + Exceptions.toMessageString(e)); } } /** - * Calculate rotation times array, given times in minutes, as "0 60 ..." + * Find next rotation after specified time. + * + * @param now the specified time; if zero, current time is used. + * @return the next rotation time */ - private static long[] calcTimesMinutes(String times) { - ArrayList<Long> list = new ArrayList<>(50); - int i = 0; - boolean etc = false; - - while (i < times.length()) { - if (times.charAt(i) == ' ') { - i++; - continue; - } // skip spaces - int j = i; // start of string - i = times.indexOf(' ', i); - if (i == -1) i = times.length(); - if (times.charAt(j) == '.' && times.substring(j, i).equals("...")) { // ... - etc = true; + long getNextRotationTime (long now) { + if (now <= 0) { + now = System.currentTimeMillis(); + } + long nowTod = timeOfDayMillis(now); + long next = 0; + for (long rotationTime : rotationTimes) { + if (nowTod < rotationTime) { + next = rotationTime-nowTod + now; break; } - list.add(Long.valueOf(times.substring(j, i))); } - - int size = list.size(); - long[] longtimes = new long[size]; - for (i = 0; i < size; i++) { - longtimes[i] = list.get(i) // pick up value in minutes past midnight - * 60000; // and multiply to get millis + if (next == 0) { // didn't find one -- use 1st time 'tomorrow' + next = rotationTimes[0]+lengthOfDayMillis-nowTod + now; } - if (etc) { // fill out rest of day, same as final interval - long endOfDay = 24 * 60 * 60 * 1000; - long lasttime = longtimes[size - 1]; - long interval = lasttime - longtimes[size - 2]; - long moreneeded = (endOfDay - lasttime) / interval; - if (moreneeded > 0) { - int newsize = size + (int) moreneeded; - long[] temp = new long[newsize]; - for (i = 0; i < size; i++) { - temp[i] = longtimes[i]; - } - while (size < newsize) { - lasttime += interval; - temp[size++] = lasttime; - } - longtimes = temp; + return next; + } + + void waitDrained() { + while(! logQueue.isEmpty()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { } } + flush(); + } - return longtimes; + private void checkAndCreateDir(String pathname) { + int lastSlash = pathname.lastIndexOf("/"); + if (lastSlash > -1) { + String pathExcludingFilename = pathname.substring(0, lastSlash); + File filepath = new File(pathExcludingFilename); + if (!filepath.exists()) { + filepath.mkdirs(); + } + } } /** - * Only for unit testing. Do not use. + * Force file rotation now, independent of schedule. */ - String getFileName() { - return logThread.fileName; + void rotateNow () { + rotate.set(true); } - /** - * Handle logging and file operations - */ - static class LogThread<LOGTYPE> extends Thread { - private final Pollable<LOGTYPE> operationProvider; - long lastFlush = 0; - private FileOutputStream currentOutputStream = null; - private long nextRotationTime = 0; - private final String filePattern; // default to current directory, ms time stamp - private volatile String fileName; - private long lastDropPosition = 0; - private final LogWriter<LOGTYPE> logWriter; - private final Compression compression; - private final long[] rotationTimes; - private final String symlinkName; - private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("logfilehandler.compression")); - private final NativeIO nativeIO = new NativeIO(); - - - LogThread(LogWriter<LOGTYPE> logWriter, - String filePattern, - Compression compression, - long[] rotationTimes, - String symlinkName, - Pollable<LOGTYPE> operationProvider) { - super("Logger"); - setDaemon(true); - this.logWriter = logWriter; - this.filePattern = filePattern; - this.compression = compression; - this.rotationTimes = rotationTimes; - this.symlinkName = (symlinkName != null && !symlinkName.isBlank()) ? symlinkName : null; - this.operationProvider = operationProvider; - } + // Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as + // isInterrupted() returns false after interruption in p.waitFor + private void internalRotateNow() { + // figure out new file name, then + // use super.setOutputStream to switch to a new file - @Override - public void run() { - try { - handleLogOperations(); - } catch (InterruptedException e) { - } catch (Exception e) { - Process.logAndDie("Failed storing log records", e); - } + String oldFileName = fileName; + long now = System.currentTimeMillis(); + fileName = LogFormatter.insertDate(filePattern, now); + flush(); - internalFlush(); + try { + checkAndCreateDir(fileName); + FileOutputStream os = new FileOutputStream(fileName, true); // append mode, for safety + currentOutputStream = os; + lastDropPosition = 0; + LogFileDb.nowLoggingTo(fileName); } - - private void handleLogOperations() throws InterruptedException { - while (!isInterrupted()) { - Operation<LOGTYPE> r = operationProvider.poll(); - if (r != null) { - if (r.type == Operation.Type.flush) { - internalFlush(); - } else if (r.type == Operation.Type.close) { - internalClose(); - } else if (r.type == Operation.Type.rotate) { - internalRotateNow(); - lastFlush = System.nanoTime(); - } else if (r.type == Operation.Type.log) { - internalPublish(r.log.get()); - flushIfOld(3, TimeUnit.SECONDS); - } - r.countDownLatch.countDown(); - } else { - flushIfOld(100, TimeUnit.MILLISECONDS); - } - } + catch (IOException e) { + throw new RuntimeException("Couldn't open log file '" + fileName + "'", e); } - private void flushIfOld(long age, TimeUnit unit) { - long now = System.nanoTime(); - if (TimeUnit.NANOSECONDS.toMillis(now - lastFlush) > unit.toMillis(age)) { - internalFlush(); - lastFlush = now; - } - } + createSymlinkToCurrentFile(); - private synchronized void internalFlush() { - try { - FileOutputStream currentOut = this.currentOutputStream; - if (currentOut != null) { - if (compression == Compression.GZIP) { - long newPos = currentOut.getChannel().position(); - if (newPos > lastDropPosition + 102400) { - nativeIO.dropPartialFileFromCache(currentOut.getFD(), lastDropPosition, newPos, true); - lastDropPosition = newPos; - } - } else { - currentOut.flush(); - } + nextRotationTime = 0; //figure it out later (lazy evaluation) + if ((oldFileName != null)) { + File oldFile = new File(oldFileName); + if (oldFile.exists()) { + if (compression != Compression.NONE) { + executor.execute(() -> runCompression(oldFile, compression)); + } else { + nativeIO.dropFileFromCache(oldFile); } - } catch (IOException e) { - logger.warning("Failed dropping from cache : " + Exceptions.toMessageString(e)); - } - } - - private void internalClose() { - try { - internalFlush(); - FileOutputStream currentOut = this.currentOutputStream; - if (currentOut != null) currentOut.close(); - } catch (Exception e) { - logger.log(Level.WARNING, "Got error while closing log file", e); - } - } - - private void internalPublish(LOGTYPE r) { - // first check to see if new file needed. - // if so, use this.internalRotateNow() to do it - - long now = System.currentTimeMillis(); - if (nextRotationTime <= 0) { - nextRotationTime = getNextRotationTime(now); // lazy initialization - } - if (now > nextRotationTime || currentOutputStream == null) { - internalRotateNow(); - } - try { - FileOutputStream out = this.currentOutputStream; - logWriter.write(r, out); - out.write('\n'); - } catch (IOException e) { - logger.warning("Failed writing log record: " + Exceptions.toMessageString(e)); } } + } - /** - * Find next rotation after specified time. - * - * @param now the specified time; if zero, current time is used. - * @return the next rotation time - */ - long getNextRotationTime(long now) { - if (now <= 0) { - now = System.currentTimeMillis(); - } - long nowTod = timeOfDayMillis(now); - long next = 0; - for (long rotationTime : rotationTimes) { - if (nowTod < rotationTime) { - next = rotationTime - nowTod + now; - break; - } - } - if (next == 0) { // didn't find one -- use 1st time 'tomorrow' - next = rotationTimes[0] + lengthOfDayMillis - nowTod + now; - } - return next; + private static void runCompression(File oldFile, Compression compression) { + switch (compression) { + case ZSTD: + runCompressionZstd(oldFile.toPath()); + break; + case GZIP: + runCompressionGzip(oldFile); + break; + default: + throw new IllegalArgumentException("Unknown compression " + compression); } + } - private void checkAndCreateDir(String pathname) { - int lastSlash = pathname.lastIndexOf("/"); - if (lastSlash > -1) { - String pathExcludingFilename = pathname.substring(0, lastSlash); - File filepath = new File(pathExcludingFilename); - if (!filepath.exists()) { - filepath.mkdirs(); + private static void runCompressionZstd(Path oldFile) { + try { + Path compressedFile = Paths.get(oldFile.toString() + ".zst"); + Files.createFile(compressedFile); + int bufferSize = 0x400000; // 4M + byte[] buffer = new byte[bufferSize]; + try (ZstdOuputStream out = new ZstdOuputStream(Files.newOutputStream(compressedFile), bufferSize); + InputStream in = Files.newInputStream(oldFile)) { + int read; + while ((read = in.read(buffer)) >= 0) { + out.write(buffer, 0, read); } + out.flush(); } + Files.delete(oldFile); + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to compress log file with zstd: " + oldFile, e); } + } + private static void runCompressionGzip(File oldFile) { + File gzippedFile = new File(oldFile.getPath() + ".gz"); + try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000); + FileInputStream inputStream = new FileInputStream(oldFile)) + { + byte [] buffer = new byte[0x400000]; // 4M buffer - // Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as - // isInterrupted() returns false after interruption in p.waitFor - private void internalRotateNow() { - // figure out new file name, then - // use super.setOutputStream to switch to a new file - - String oldFileName = fileName; - long now = System.currentTimeMillis(); - fileName = LogFormatter.insertDate(filePattern, now); - internalFlush(); - - try { - checkAndCreateDir(fileName); - FileOutputStream os = new FileOutputStream(fileName, true); // append mode, for safety - currentOutputStream = os; - lastDropPosition = 0; - LogFileDb.nowLoggingTo(fileName); - } catch (IOException e) { - throw new RuntimeException("Couldn't open log file '" + fileName + "'", e); + long totalBytesRead = 0; + NativeIO nativeIO = new NativeIO(); + for (int read = inputStream.read(buffer); read > 0; read = inputStream.read(buffer)) { + compressor.write(buffer, 0, read); + nativeIO.dropPartialFileFromCache(inputStream.getFD(), totalBytesRead, read, false); + totalBytesRead += read; } + compressor.finish(); + compressor.flush(); - createSymlinkToCurrentFile(); + oldFile.delete(); + nativeIO.dropFileFromCache(gzippedFile); + } catch (IOException e) { + logger.warning("Got '" + e + "' while compressing '" + oldFile.getPath() + "'."); + } + } - nextRotationTime = 0; //figure it out later (lazy evaluation) - if ((oldFileName != null)) { - File oldFile = new File(oldFileName); - if (oldFile.exists()) { - if (compression != Compression.NONE) { - executor.execute(() -> runCompression(oldFile, compression)); - } else { - nativeIO.dropFileFromCache(oldFile); - } - } + /** Name files by date - create a symlink with a constant name to the newest file */ + private void createSymlinkToCurrentFile() { + if (symlinkName == null) return; + File f = new File(fileName); + File f2 = new File(f.getParent(), symlinkName); + String [] cmd = new String[]{"/bin/ln", "-sf", f.getName(), f2.getPath()}; + try { + int retval = new ProcessExecuter().exec(cmd).getFirst(); + // Detonator pattern: Think of all the fun we can have if ln isn't what we + // think it is, if it doesn't return, etc, etc + if (retval != 0) { + logger.warning("Command '" + Arrays.toString(cmd) + "' + failed with exitcode=" + retval); } + } catch (IOException e) { + logger.warning("Got '" + e + "' while doing'" + Arrays.toString(cmd) + "'."); } + } + /** + * Calculate rotation times array, given times in minutes, as "0 60 ..." + * + */ + private static long[] calcTimesMinutes(String times) { + ArrayList<Long> list = new ArrayList<>(50); + int i = 0; + boolean etc = false; - private static void runCompression(File oldFile, Compression compression) { - switch (compression) { - case ZSTD: - runCompressionZstd(oldFile.toPath()); - break; - case GZIP: - runCompressionGzip(oldFile); - break; - default: - throw new IllegalArgumentException("Unknown compression " + compression); + while (i < times.length()) { + if (times.charAt(i) == ' ') { i++; continue; } // skip spaces + int j = i; // start of string + i = times.indexOf(' ', i); + if (i == -1) i = times.length(); + if (times.charAt(j) == '.' && times.substring(j,i).equals("...")) { // ... + etc = true; + break; } + list.add(Long.valueOf(times.substring(j,i))); } - private static void runCompressionZstd(Path oldFile) { - try { - Path compressedFile = Paths.get(oldFile.toString() + ".zst"); - Files.createFile(compressedFile); - int bufferSize = 0x400000; // 4M - byte[] buffer = new byte[bufferSize]; - try (ZstdOuputStream out = new ZstdOuputStream(Files.newOutputStream(compressedFile), bufferSize); - InputStream in = Files.newInputStream(oldFile)) { - int read; - while ((read = in.read(buffer)) >= 0) { - out.write(buffer, 0, read); - } - out.flush(); - } - Files.delete(oldFile); - } catch (IOException e) { - logger.log(Level.WARNING, "Failed to compress log file with zstd: " + oldFile, e); - } + int size = list.size(); + long[] longtimes = new long[size]; + for (i = 0; i<size; i++) { + longtimes[i] = list.get(i) // pick up value in minutes past midnight + * 60000; // and multiply to get millis } - private static void runCompressionGzip(File oldFile) { - File gzippedFile = new File(oldFile.getPath() + ".gz"); - NativeIO nativeIO = new NativeIO(); - try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000); - FileInputStream inputStream = new FileInputStream(oldFile)) { - byte[] buffer = new byte[0x400000]; // 4M buffer - - long totalBytesRead = 0; - for (int read = inputStream.read(buffer); read > 0; read = inputStream.read(buffer)) { - compressor.write(buffer, 0, read); - nativeIO.dropPartialFileFromCache(inputStream.getFD(), totalBytesRead, read, false); - totalBytesRead += read; + if (etc) { // fill out rest of day, same as final interval + long endOfDay = 24*60*60*1000; + long lasttime = longtimes[size-1]; + long interval = lasttime - longtimes[size-2]; + long moreneeded = (endOfDay - lasttime)/interval; + if (moreneeded > 0) { + int newsize = size + (int)moreneeded; + long[] temp = new long[newsize]; + for (i=0; i<size; i++) { + temp[i] = longtimes[i]; } - compressor.finish(); - compressor.flush(); - - } catch (IOException e) { - logger.warning("Got '" + e + "' while compressing '" + oldFile.getPath() + "'."); - } - oldFile.delete(); - nativeIO.dropFileFromCache(gzippedFile); - } - - /** - * Name files by date - create a symlink with a constant name to the newest file - */ - private void createSymlinkToCurrentFile() { - if (symlinkName == null) return; - File f = new File(fileName); - File f2 = new File(f.getParent(), symlinkName); - String[] cmd = new String[]{"/bin/ln", "-sf", f.getName(), f2.getPath()}; - try { - int retval = new ProcessExecuter().exec(cmd).getFirst(); - // Detonator pattern: Think of all the fun we can have if ln isn't what we - // think it is, if it doesn't return, etc, etc - if (retval != 0) { - logger.warning("Command '" + Arrays.toString(cmd) + "' + failed with exitcode=" + retval); + while (size < newsize) { + lasttime += interval; + temp[size++] = lasttime; } - } catch (IOException e) { - logger.warning("Got '" + e + "' while doing'" + Arrays.toString(cmd) + "'."); + longtimes = temp; } } - private static final long lengthOfDayMillis = 24 * 60 * 60 * 1000; - private static long timeOfDayMillis(long time) { - return time % lengthOfDayMillis; - } - + return longtimes; } - private static class Operation<LOGTYPE> { - enum Type {log, flush, close, rotate} - - ; + // Support staff :-) + private static final long lengthOfDayMillis = 24*60*60*1000; // ? is this close enough ? - final Type type; - - final Optional<LOGTYPE> log; - final CountDownLatch countDownLatch = new CountDownLatch(1); + private static long timeOfDayMillis ( long time ) { + return time % lengthOfDayMillis; + } - Operation(Type type) { - this(type, Optional.empty()); + /** + * Flushes all queued messages, interrupts the log thread in this and + * waits for it to end before returning + */ + void shutdown() { + logThread.interrupt(); + try { + logThread.join(); + executor.shutdown(); + executor.awaitTermination(600, TimeUnit.SECONDS); } - - Operation(LOGTYPE log) { - this(Type.log, Optional.of(log)); + catch (InterruptedException e) { } + } - private Operation(Type type, Optional<LOGTYPE> log) { - this.type = type; - this.log = log; - } + /** + * Only for unit testing. Do not use. + */ + String getFileName() { + return fileName; } -} +} diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java deleted file mode 100644 index fbf9bd1dc23..00000000000 --- a/jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.container.logging; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.time.Instant; -import java.util.UUID; - -/** - * @author mortent - */ -public class ConnectionLogEntryTest { - - @Test - public void test_serialization () throws IOException { - var id = UUID.randomUUID(); - var instant = Instant.parse("2021-01-13T12:12:12Z"); - ConnectionLogEntry entry = ConnectionLogEntry.builder(id, instant) - .withPeerPort(1234) - .build(); - - String expected = "{" + - "\"id\":\""+id.toString()+"\"," + - "\"timestamp\":\"2021-01-13T12:12:12Z\"," + - "\"peerPort\":1234" + - "}"; - Assert.assertEquals(expected, entry.toJson()); - } -} diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java new file mode 100644 index 00000000000..b8978fe489c --- /dev/null +++ b/jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java @@ -0,0 +1,36 @@ +package com.yahoo.container.logging;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +import com.yahoo.test.json.JsonTestHelper; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.UUID; + +/** + * @author bjorncs + */ +class JsonConnectionLogWriterTest { + + @Test + void test_serialization() throws IOException { + var id = UUID.randomUUID(); + var instant = Instant.parse("2021-01-13T12:12:12Z"); + ConnectionLogEntry entry = ConnectionLogEntry.builder(id, instant) + .withPeerPort(1234) + .build(); + String expectedJson = "{" + + "\"id\":\""+id.toString()+"\"," + + "\"timestamp\":\"2021-01-13T12:12:12Z\"," + + "\"peerPort\":1234" + + "}"; + + JsonConnectionLogWriter writer = new JsonConnectionLogWriter(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + writer.write(entry, out); + String actualJson = out.toString(StandardCharsets.UTF_8); + JsonTestHelper.assertJsonEquals(actualJson, expectedJson); + } +}
\ No newline at end of file diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java index cd3c174a12e..f76312af61e 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java +++ b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java @@ -42,14 +42,20 @@ public class LogFileHandlerTestCase { String pattern = root.getAbsolutePath() + "/logfilehandlertest.%Y%m%d%H%M%S"; long[] rTimes = {1000, 2000, 10000}; + Formatter formatter = new Formatter() { + public String format(LogRecord r) { + DateFormat df = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss.SSS"); + String timeStamp = df.format(new Date(r.getMillis())); + return ("["+timeStamp+"]" + " " + formatMessage(r) + "\n"); + } + }; LogFileHandler<String> h = new LogFileHandler<>(Compression.NONE, pattern, rTimes, null, new StringLogWriter()); long now = System.currentTimeMillis(); long millisPerDay = 60*60*24*1000; long tomorrowDays = (now / millisPerDay) +1; long tomorrowMillis = tomorrowDays * millisPerDay; - - assertThat(tomorrowMillis+1000).isEqualTo(h.logThread.getNextRotationTime(tomorrowMillis)); - assertThat(tomorrowMillis+10000).isEqualTo(h.logThread.getNextRotationTime(tomorrowMillis+3000)); + assertThat(tomorrowMillis+1000).isEqualTo(h.getNextRotationTime(tomorrowMillis)); + assertThat(tomorrowMillis+10000).isEqualTo(h.getNextRotationTime(tomorrowMillis+3000)); String message = "test"; h.publish(message); h.publish( "another test"); @@ -121,7 +127,7 @@ public class LogFileHandlerTestCase { String longMessage = formatter.format(new LogRecord(Level.INFO, "string which is way longer than the word test")); handler.publish(longMessage); - handler.flush(); + handler.waitDrained(); assertThat(Files.size(Paths.get(firstFile))).isEqualTo(31); final long expectedSecondFileLength = 72; long secondFileLength; @@ -166,7 +172,7 @@ public class LogFileHandlerTestCase { for (int i = 0; i < logEntries; i++) { h.publish("test"); } - h.flush(); + h.waitDrained(); String f1 = h.getFileName(); assertThat(f1).startsWith(root.getAbsolutePath() + "/logfilehandlertest."); File uncompressed = new File(f1); diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 6b4061081ea..4715ff80d03 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -109,12 +109,13 @@ public: DocumenttypesConfigSP getTypeCfg() const { return _typeCfg; } DocTypeVector getDocTypes() const { DocTypeVector types; - _repo->forEachDocumentType(*makeClosure(storeDocType, &types)); + _repo->forEachDocumentType(*DocumentTypeRepo::makeLambda([&types](const DocumentType &type) { + types.push_back(DocTypeName(type.getName())); + })); return types; } DocumentDBConfig::SP create(const DocTypeName &docTypeName) const { - const DocumentType *docType = - _repo->getDocumentType(docTypeName.getName()); + const DocumentType *docType = _repo->getDocumentType(docTypeName.getName()); if (docType == nullptr) { return DocumentDBConfig::SP(); } diff --git a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp index 2352fda65a0..e6bcbf18495 100644 --- a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp +++ b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp @@ -43,12 +43,12 @@ makeBaseConfigSnapshot() DBCM dbcm(spec, "test"); DocumenttypesConfigSP dtcfg(config::ConfigGetter<DocumenttypesConfig>::getConfig("", spec).release()); - BootstrapConfig::SP b(new BootstrapConfig(1, dtcfg, - std::shared_ptr<const DocumentTypeRepo>(new DocumentTypeRepo(*dtcfg)), - std::make_shared<ProtonConfig>(), - std::make_shared<FiledistributorrpcConfig>(), - std::make_shared<BucketspacesConfig>(), - std::make_shared<TuneFileDocumentDB>(), HwInfo())); + auto b = std::make_shared<BootstrapConfig>(1, dtcfg, + std::make_shared<DocumentTypeRepo>(*dtcfg), + std::make_shared<ProtonConfig>(), + std::make_shared<FiledistributorrpcConfig>(), + std::make_shared<BucketspacesConfig>(), + std::make_shared<TuneFileDocumentDB>(), HwInfo()); dbcm.forwardConfig(b); dbcm.nextGeneration(0ms); DocumentDBConfig::SP snap = dbcm.getConfig(); @@ -71,8 +71,6 @@ makeEmptyConfigSnapshot() return test::DocumentDBConfigBuilder(0, std::make_shared<Schema>(), "client", "test").build(); } -void incInt(int *i, const DocumentType&) { ++*i; } - void assertEqualSnapshot(const DocumentDBConfig &exp, const DocumentDBConfig &act) { @@ -91,10 +89,12 @@ assertEqualSnapshot(const DocumentDBConfig &exp, const DocumentDBConfig &act) int expTypeCount = 0; int actTypeCount = 0; - exp.getDocumentTypeRepoSP()->forEachDocumentType( - *vespalib::makeClosure(incInt, &expTypeCount)); - act.getDocumentTypeRepoSP()->forEachDocumentType( - *vespalib::makeClosure(incInt, &actTypeCount)); + exp.getDocumentTypeRepoSP()->forEachDocumentType(*DocumentTypeRepo::makeLambda([&expTypeCount](const DocumentType &) { + expTypeCount++; + })); + act.getDocumentTypeRepoSP()->forEachDocumentType(*DocumentTypeRepo::makeLambda([&actTypeCount](const DocumentType &) { + actTypeCount++; + })); EXPECT_EQUAL(expTypeCount, actTypeCount); EXPECT_TRUE(*exp.getSchemaSP() == *act.getSchemaSP()); EXPECT_EQUAL(expTypeCount, actTypeCount); @@ -164,8 +164,7 @@ TEST_F("requireThatConfigCanBeLoadedWithoutExtraConfigsDataFile", DocumentDBConf } -TEST_F("requireThatVisibilityDelayIsPropagated", - DocumentDBConfig::SP(makeBaseConfigSnapshot())) +TEST_F("requireThatVisibilityDelayIsPropagated", DocumentDBConfig::SP(makeBaseConfigSnapshot())) { saveBaseConfigSnapshot(*f, 80); DocumentDBConfig::SP esnap(makeEmptyConfigSnapshot()); @@ -177,8 +176,7 @@ TEST_F("requireThatVisibilityDelayIsPropagated", protonConfigBuilder.documentdb.push_back(ddb); protonConfigBuilder.maxvisibilitydelay = 100.0; FileConfigManager cm("out", myId, "dummy"); - using ProtonConfigSP = BootstrapConfig::ProtonConfigSP; - cm.setProtonConfig(ProtonConfigSP(new ProtonConfig(protonConfigBuilder))); + cm.setProtonConfig(std::make_shared<ProtonConfig>(protonConfigBuilder)); cm.loadConfig(*esnap, 70, esnap); } EXPECT_EQUAL(61s, esnap->getMaintenanceConfigSP()->getVisibilityDelay()); diff --git a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp index 23267e0628b..4bf8f36caa3 100644 --- a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp @@ -113,8 +113,7 @@ public: if (!EXPECT_EQUAL(expRemoveCompleteCount, _removeCompleteCount)) { return false; } - if (!EXPECT_EQUAL(expRemoveBatchCompleteCount, - _removeBatchCompleteCount)) { + if (!EXPECT_EQUAL(expRemoveBatchCompleteCount, _removeBatchCompleteCount)) { return false; } if (!EXPECT_EQUAL(expRemoveCompleteLids, _removeCompleteLids)) { @@ -152,9 +151,7 @@ public: test::runInMaster(_writeService, func); } - void - cycledLids(const std::vector<uint32_t> &lids) - { + void cycledLids(const std::vector<uint32_t> &lids) { if (lids.size() == 1) { _store.removeComplete(lids[0]); } else { @@ -162,33 +159,23 @@ public: } } - void - performCycleLids(const std::vector<uint32_t> &lids) - { - _writeService.master().execute( - makeLambdaTask([this, lids]() { cycledLids(lids);})); + void performCycleLids(const std::vector<uint32_t> &lids) { + _writeService.master().execute(makeLambdaTask([this, lids]() { cycledLids(lids);})); } - void - cycleLids(const std::vector<uint32_t> &lids) - { + void cycleLids(const std::vector<uint32_t> &lids) { if (lids.empty()) return; - _writeService.index().execute( - makeLambdaTask([this, lids]() { performCycleLids(lids);})); + _writeService.index().execute(makeLambdaTask([this, lids]() { performCycleLids(lids);})); } - bool - delayReuse(uint32_t lid) - { + bool delayReuse(uint32_t lid) { bool res = false; runInMaster([&] () { res = _lidReuseDelayer->delayReuse(lid); } ); return res; } - bool - delayReuse(const std::vector<uint32_t> &lids) - { + bool delayReuse(const std::vector<uint32_t> &lids) { bool res = false; runInMaster([&] () { res = _lidReuseDelayer->delayReuse(lids); }); return res; @@ -198,26 +185,10 @@ public: runInMaster([&] () { cycleLids(_lidReuseDelayer->getReuseLids()); }); } - void - sync() - { - _writeService.sync(); - } + void sync() { _writeService.sync(); } - void - scheduleDelayReuseLid(uint32_t lid) - { - runInMaster([&] () { cycleLids({ lid }); }); - } - - void - scheduleDelayReuseLids(const std::vector<uint32_t> &lids) - { - runInMaster([&] () { cycleLids(lids); }); - } }; - TEST_F("require that nothing happens before free list is active", Fixture) { EXPECT_FALSE(f.delayReuse(4)); @@ -226,7 +197,6 @@ TEST_F("require that nothing happens before free list is active", Fixture) EXPECT_TRUE(assertThreadObserver(2, 0, 0, f._writeService)); } - TEST_F("require that reuse can be batched", Fixture) { f._store._freeListActive = true; @@ -243,7 +213,6 @@ TEST_F("require that reuse can be batched", Fixture) EXPECT_TRUE(assertThreadObserver(6, 1, 0, f._writeService)); } - TEST_F("require that single element array is optimized", Fixture) { f._store._freeListActive = true; diff --git a/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp b/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp index f730d286b34..577bd32ca1f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp @@ -69,8 +69,7 @@ ClusterStateHandler::performSetClusterState(const ClusterState *calc, IGenericRe } void -ClusterStateHandler::performGetModifiedBuckets( - IBucketIdListResultHandler *resultHandler) +ClusterStateHandler::performGetModifiedBuckets(IBucketIdListResultHandler *resultHandler) { storage::spi::BucketIdListResult::List modifiedBuckets; modifiedBuckets.resize(_modifiedBuckets.size()); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index e10f7937dd0..6951125f408 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -75,8 +75,7 @@ DocumentBucketMover::setupForBucket(const BucketId &bucket, } -namespace -{ +namespace { class MoveKey { diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 7fb16e851fa..17843a5cd22 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -33,7 +33,6 @@ #include <vespa/searchlib/engine/docsumreply.h> #include <vespa/searchlib/engine/searchreply.h> #include <vespa/vespalib/util/destructor_callbacks.h> -#include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/metrics/updatehook.h> @@ -217,10 +216,6 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, // Forward changes of cluster state to bucket handler _clusterStateHandler.addClusterStateChangedHandler(&_bucketHandler); - _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getReadySubDB(), _docTypeName.getName())); - _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getRemSubDB(), _docTypeName.getName())); - _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getNotReadySubDB(), _docTypeName.getName())); - _writeFilter.setConfig(loaded_config->getMaintenanceConfigSP()->getAttributeUsageFilterConfig()); } @@ -416,6 +411,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum LOG(error, "Applying config to closed document db"); return; } + _maintenanceController.killJobs(); ConfigComparisonResult cmpres; Schema::SP oldSchema; int64_t generation = configSnapshot->getGeneration(); @@ -483,9 +479,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum _state.clearDelayedConfig(); } setActiveConfig(configSnapshot, generation); - if (params.shouldMaintenanceControllerChange()) { - forwardMaintenanceConfig(); - } + forwardMaintenanceConfig(); _writeFilter.setConfig(configSnapshot->getMaintenanceConfigSP()->getAttributeUsageFilterConfig()); if (_subDBs.getReprocessingRunner().empty()) { _subDBs.pruneRemovedFields(serialNum); @@ -699,9 +693,7 @@ DocumentDB::getAllowPrune() const void DocumentDB::start() { - LOG(debug, - "DocumentDB(%s): Database starting.", - _docTypeName.toString().c_str()); + LOG(debug, "DocumentDB(%s): Database starting.", _docTypeName.toString().c_str()); internalInit(); } @@ -934,6 +926,10 @@ DocumentDB::injectMaintenanceJobs(const DocumentDBMaintenanceConfig &config, std { // Called by executor thread _maintenanceController.killJobs(); + _lidSpaceCompactionHandlers.clear(); + _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getReadySubDB(), _docTypeName.getName())); + _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getRemSubDB(), _docTypeName.getName())); + _lidSpaceCompactionHandlers.push_back(std::make_unique<LidSpaceCompactionHandler>(_maintenanceController.getNotReadySubDB(), _docTypeName.getName())); MaintenanceJobsInjector::injectJobs(_maintenanceController, config, _bucketExecutor, @@ -997,13 +993,11 @@ DocumentDB::forwardMaintenanceConfig() // Called by executor thread DocumentDBConfig::SP activeConfig = getActiveConfig(); assert(activeConfig); - DocumentDBMaintenanceConfig::SP - maintenanceConfig(activeConfig->getMaintenanceConfigSP()); + auto maintenanceConfig(activeConfig->getMaintenanceConfigSP()); const auto &attributes_config = activeConfig->getAttributesConfig(); auto attribute_config_inspector = std::make_unique<AttributeConfigInspector>(attributes_config); if (!_state.getClosed()) { - if (_maintenanceController.getStarted() && - !_maintenanceController.getStopping()) { + if (_maintenanceController.getStarted() && !_maintenanceController.getStopping()) { injectMaintenanceJobs(*maintenanceConfig, std::move(attribute_config_inspector)); } _maintenanceController.newConfig(maintenanceConfig); diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketmodifiedhandler.h b/searchcore/src/vespa/searchcore/proton/server/ibucketmodifiedhandler.h index 96f70d32dc5..652c303283f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ibucketmodifiedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/ibucketmodifiedhandler.h @@ -2,16 +2,9 @@ #pragma once -namespace document -{ - -class BucketId; - -} - -namespace proton -{ +namespace document { class BucketId;} +namespace proton { class IBucketModifiedHandler { @@ -20,6 +13,4 @@ public: virtual ~IBucketModifiedHandler() {} }; - -} // namespace proton - +} diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h b/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h index 10c0b194aac..15211c8ceb7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h +++ b/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h @@ -16,7 +16,7 @@ struct IBucketStateCalculator virtual bool nodeUp() const = 0; virtual bool nodeInitializing() const = 0; virtual bool nodeRetired() const = 0; - virtual ~IBucketStateCalculator() {} + virtual ~IBucketStateCalculator() = default; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangedhandler.h b/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangedhandler.h index 1ed0359042e..f97d1b697cb 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangedhandler.h @@ -4,16 +4,9 @@ #include <vespa/persistence/spi/bucketinfo.h> -namespace document -{ - -class BucketId; - -} - -namespace proton -{ +namespace document { class BucketId; } +namespace proton { /** * Interface used to notify when bucket state has changed. @@ -24,8 +17,7 @@ public: virtual void notifyBucketStateChanged(const document::BucketId &bucketId, storage::spi::BucketInfo::ActiveState newState) = 0; - virtual ~IBucketStateChangedHandler() {} + virtual ~IBucketStateChangedHandler() = default; }; - -} // namespace proton +} diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangednotifier.h b/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangednotifier.h index b21c184d431..0f1f03370d0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangednotifier.h +++ b/searchcore/src/vespa/searchcore/proton/server/ibucketstatechangednotifier.h @@ -2,8 +2,7 @@ #pragma once -namespace proton -{ +namespace proton { class IBucketStateChangedHandler; @@ -13,14 +12,10 @@ class IBucketStateChangedHandler; class IBucketStateChangedNotifier { public: - virtual void - addBucketStateChangedHandler(IBucketStateChangedHandler *handler) = 0; - - virtual void - removeBucketStateChangedHandler(IBucketStateChangedHandler *handler) = 0; + virtual void addBucketStateChangedHandler(IBucketStateChangedHandler *handler) = 0; + virtual void removeBucketStateChangedHandler(IBucketStateChangedHandler *handler) = 0; - virtual ~IBucketStateChangedNotifier() {} + virtual ~IBucketStateChangedNotifier() = default; }; - -} // namespace proton +} diff --git a/searchcore/src/vespa/searchcore/proton/server/iclusterstatechangedhandler.h b/searchcore/src/vespa/searchcore/proton/server/iclusterstatechangedhandler.h index f50e591d0e8..16c98054a11 100644 --- a/searchcore/src/vespa/searchcore/proton/server/iclusterstatechangedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/iclusterstatechangedhandler.h @@ -4,8 +4,7 @@ #include "ibucketstatecalculator.h" -namespace proton -{ +namespace proton { /** * Interface used to notify when cluster state has changed. @@ -13,10 +12,9 @@ namespace proton class IClusterStateChangedHandler { public: - virtual ~IClusterStateChangedHandler() { } + virtual ~IClusterStateChangedHandler() = default; - virtual void - notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) = 0; + virtual void notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) = 0; }; -} // namespace proton +} diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index aa04cc89f52..c05c25990a6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -117,14 +117,14 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, IAttributeManagerSP notReadyAttributeManager, std::unique_ptr<const AttributeConfigInspector> attribute_config_inspector, std::shared_ptr<TransientMemoryUsageProvider> transient_memory_usage_provider, - AttributeUsageFilter &attributeUsageFilter) { + AttributeUsageFilter &attributeUsageFilter) +{ controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig())); controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval())); const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB()); auto pruneRDjob = std::make_unique<PruneRemovedDocumentsJob>(config.getPruneRemovedDocumentsConfig(), *mRemSubDB.meta_store(), - mRemSubDB.sub_db_id(), docTypeName, prdHandler, fbHandler); - controller.registerJobInMasterThread( - trackJob(jobTrackers.getRemovedDocumentsPrune(), std::move(pruneRDjob))); + mRemSubDB.sub_db_id(), docTypeName, prdHandler, fbHandler); + controller.registerJobInMasterThread(trackJob(jobTrackers.getRemovedDocumentsPrune(), std::move(pruneRDjob))); if (!config.getLidSpaceCompactionConfig().isDisabled()) { injectLidSpaceCompactionJobs(controller, config, bucketExecutor, lscHandlers, opStorer, fbHandler, jobTrackers.getLidSpaceCompact(), diskMemUsageNotifier, diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h index e35f9e1aa48..8ec479084d2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h @@ -46,10 +46,8 @@ struct MaintenanceJobsInjector IPruneRemovedDocumentsHandler &prdHandler, IDocumentMoveHandler &moveHandler, IBucketModifiedHandler &bucketModifiedHandler, - IClusterStateChangedNotifier & - clusterStateChangedNotifier, - IBucketStateChangedNotifier & - bucketStateChangedNotifier, + IClusterStateChangedNotifier & clusterStateChangedNotifier, + IBucketStateChangedNotifier & bucketStateChangedNotifier, const std::shared_ptr<IBucketStateCalculator> &calc, IDiskMemUsageNotifier &diskMemUsageNotifier, DocumentDBJobTrackers &jobTrackers, diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp index 43be4e9accd..dfb84af5da5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp @@ -23,7 +23,7 @@ PruneRemovedDocumentsJob(const Config &config, IPruneRemovedDocumentsHandler &handler, IFrozenBucketHandler &frozenHandler) : BlockableMaintenanceJob("prune_removed_documents." + docTypeName, - config.getDelay(), config.getInterval()), + config.getDelay(), config.getInterval()), _metaStore(metaStore), _subDbId(subDbId), _cfgAgeLimit(config.getAge()), diff --git a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp index 020111fbf58..38478b4ff20 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp @@ -16,7 +16,6 @@ #include <vespa/searchlib/fef/indexproperties.h> #include <vespa/searchlib/fef/properties.h> #include <vespa/eval/eval/fast_value.h> -#include <vespa/vespalib/util/closuretask.h> using vespa::config::search::RankProfilesConfig; using proton::matching::MatchingStats; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp index 7dbf54cfd6c..95051341e8c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp @@ -23,7 +23,6 @@ #include <vespa/searchlib/docstore/document_store_visitor_progress.h> #include <vespa/searchlib/util/fileheadertk.h> #include <vespa/vespalib/io/fileutil.h> -#include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/log/log.h> diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp index 070cfd8085e..9c928f49ca8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp @@ -4,7 +4,6 @@ #include "transactionlogmanager.h" #include <vespa/searchlib/transactionlog/translogclient.h> #include <vespa/searchcore/proton/common/eventlogger.h> -#include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/log/log.h> diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_utils.h b/searchcore/src/vespa/searchcore/proton/test/thread_utils.h index 340d3b08441..43033d35185 100644 --- a/searchcore/src/vespa/searchcore/proton/test/thread_utils.h +++ b/searchcore/src/vespa/searchcore/proton/test/thread_utils.h @@ -2,17 +2,10 @@ #pragma once #include <vespa/searchcorespi/index/ithreadingservice.h> -#include <vespa/vespalib/util/closuretask.h> +#include <vespa/vespalib/util/lambdatask.h> namespace proton::test { -template <typename FunctionType> -void -runFunction(FunctionType *func) -{ - (*func)(); -} - /** * Run the given function in the master thread and wait until done. */ @@ -20,8 +13,7 @@ template <typename FunctionType> void runInMaster(searchcorespi::index::IThreadingService &writeService, FunctionType func) { - writeService.master().execute(vespalib::makeTask - (vespalib::makeClosure(&runFunction<FunctionType>, &func))); + writeService.master().execute(vespalib::makeLambdaTask(std::move(func))); writeService.sync(); } diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index 1fe23dd16ae..b8bc4832202 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -20,6 +20,7 @@ #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/time.h> #include <sstream> @@ -36,6 +37,7 @@ using search::queryeval::ISourceSelector; using search::queryeval::Source; using search::SerialNum; using vespalib::makeLambdaTask; +using vespalib::makeLambdaCallback; using std::ostringstream; using vespalib::makeClosure; using vespalib::makeTask; @@ -43,6 +45,7 @@ using vespalib::string; using vespalib::Closure0; using vespalib::Executor; using vespalib::Runnable; +using vespalib::IDestructorCallback; namespace searchcorespi::index { @@ -62,7 +65,7 @@ public: _closure(std::move(closure)) { } - virtual void run() override { + void run() override { _result = _reconfigurer.reconfigure(std::move(_closure)); } }; @@ -76,7 +79,7 @@ public: _reconfigurer(reconfigurer), _closure(std::move(closure)) { } - virtual void run() override { + void run() override { _reconfigurer.reconfigure(std::move(_closure)); } }; @@ -84,18 +87,18 @@ public: SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max(); -class DiskIndexWithDestructorClosure : public IDiskIndex { +class DiskIndexWithDestructorCallback : public IDiskIndex { private: - vespalib::AutoClosureCaller _caller; - IDiskIndex::SP _index; + std::shared_ptr<IDestructorCallback> _callback; + IDiskIndex::SP _index; public: - DiskIndexWithDestructorClosure(const IDiskIndex::SP &index, - vespalib::Closure::UP closure) - : _caller(std::move(closure)), - _index(index) + DiskIndexWithDestructorCallback(IDiskIndex::SP index, + std::shared_ptr<IDestructorCallback> callback) noexcept + : _callback(std::move(callback)), + _index(std::move(index)) { } - ~DiskIndexWithDestructorClosure(); + ~DiskIndexWithDestructorCallback() override; const IDiskIndex &getWrapped() const { return *_index; } /** @@ -140,15 +143,22 @@ public: }; -DiskIndexWithDestructorClosure::~DiskIndexWithDestructorClosure() {} +DiskIndexWithDestructorCallback::~DiskIndexWithDestructorCallback() = default; } // namespace -IndexMaintainer::FusionArgs::~FusionArgs() { -} +IndexMaintainer::FusionArgs::FusionArgs() + : _new_fusion_id(0u), + _changeGens(), + _schema(), + _prunedSchema(), + _old_source_list() +{ } -IndexMaintainer::SetSchemaArgs::~SetSchemaArgs() { -} +IndexMaintainer::FusionArgs::~FusionArgs() = default; + +IndexMaintainer::SetSchemaArgs::SetSchemaArgs() = default; +IndexMaintainer::SetSchemaArgs::~SetSchemaArgs() = default; uint32_t IndexMaintainer::getNewAbsoluteId() @@ -176,9 +186,8 @@ IndexMaintainer::reopenDiskIndexes(ISearchableIndexCollection &coll) uint32_t count = coll.getSourceCount(); for (uint32_t i = 0; i < count; ++i) { IndexSearchable &is = coll.getSearchable(i); - const DiskIndexWithDestructorClosure *const d = - dynamic_cast<const DiskIndexWithDestructorClosure *>(&is); - if (d == NULL) { + const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is); + if (d == nullptr) { continue; // not a disk index } const string indexDir = d->getIndexDir(); @@ -215,11 +224,10 @@ IndexMaintainer::updateIndexSchemas(IIndexCollection &coll, uint32_t count = coll.getSourceCount(); for (uint32_t i = 0; i < count; ++i) { IndexSearchable &is = coll.getSearchable(i); - const DiskIndexWithDestructorClosure *const d = - dynamic_cast<const DiskIndexWithDestructorClosure *>(&is); - if (d == NULL) { + const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is); + if (d == nullptr) { IMemoryIndex *const m = dynamic_cast<IMemoryIndex *>(&is); - if (m != NULL) { + if (m != nullptr) { m->pruneRemovedFields(schema); } continue; @@ -245,10 +253,10 @@ IndexMaintainer::updateActiveFusionPrunedSchema(const Schema &schema) return; // No active fusion if (!activeFusionPrunedSchema) { Schema::UP newSchema = Schema::intersect(*activeFusionSchema, schema); - newActiveFusionPrunedSchema.reset(newSchema.release()); + newActiveFusionPrunedSchema = std::move(newSchema); } else { Schema::UP newSchema = Schema::intersect(*activeFusionPrunedSchema, schema); - newActiveFusionPrunedSchema.reset(newSchema.release()); + newActiveFusionPrunedSchema = std::move(newSchema); } { LockGuard slock(_state_lock); @@ -279,9 +287,9 @@ IndexMaintainer::loadDiskIndex(const string &indexDir) } vespalib::Timer timer; _active_indexes->setActive(indexDir); - IDiskIndex::SP retval(new DiskIndexWithDestructorClosure - (_operations.loadDiskIndex(indexDir), - makeClosure(this, &IndexMaintainer::deactivateDiskIndexes, indexDir))); + auto retval = std::make_shared<DiskIndexWithDestructorCallback>( + _operations.loadDiskIndex(indexDir), + makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); })); if (LOG_WOULD_LOG(event)) { EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed())); } @@ -298,11 +306,10 @@ IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex) } vespalib::Timer timer; _active_indexes->setActive(indexDir); - const IDiskIndex &wrappedDiskIndex = - (dynamic_cast<const DiskIndexWithDestructorClosure &>(oldIndex)).getWrapped(); - IDiskIndex::SP retval(new DiskIndexWithDestructorClosure - (_operations.reloadDiskIndex(wrappedDiskIndex), - makeClosure(this, &IndexMaintainer::deactivateDiskIndexes, indexDir))); + const IDiskIndex &wrappedDiskIndex = (dynamic_cast<const DiskIndexWithDestructorCallback &>(oldIndex)).getWrapped(); + auto retval = std::make_shared<DiskIndexWithDestructorCallback>( + _operations.reloadDiskIndex(wrappedDiskIndex), + makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); })); if (LOG_WOULD_LOG(event)) { EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed())); } @@ -425,7 +432,7 @@ ISearchableIndexCollection::UP IndexMaintainer::createNewSourceCollection(const LockGuard &newSearchLock) { ISearchableIndexCollection::SP currentLeaf(getLeaf(newSearchLock, _source_list)); - return ISearchableIndexCollection::UP(new IndexCollection(_selector, *currentLeaf)); + return std::make_unique<IndexCollection>(_selector, *currentLeaf); } IndexMaintainer::FlushArgs::FlushArgs() @@ -434,7 +441,7 @@ IndexMaintainer::FlushArgs::FlushArgs() old_source_list(), save_info(), flush_serial_num(), - stats(NULL), + stats(nullptr), _skippedEmptyLast(false), _extraIndexes(), _changeGens(), @@ -508,7 +515,7 @@ IndexMaintainer::doFlush(FlushArgs args) } assert(!flushIds.empty()); - if (args.stats != NULL) { + if (args.stats != nullptr) { updateFlushStats(args); } @@ -665,7 +672,7 @@ IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index) LockGuard lock(_index_update_lock); // make new source selector with shifted values. - _selector.reset(getSourceSelector().cloneAndSubtract(ost.str(), id_diff).release()); + _selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff); _source_selector_changes = 0; _current_index_id -= id_diff; _last_fusion_id = args->_new_fusion_id; @@ -720,7 +727,7 @@ IndexMaintainer::warmupDone(ISearchableIndexCollection::SP current) LockGuard lock(_new_search_lock); if (current == _source_list) { auto makeSure = makeClosure(this, &IndexMaintainer::makeSureAllRemainingWarmupIsDone, current); - Executor::Task::UP task(new ReconfigRunnableTask(_ctx.getReconfigurer(), std::move(makeSure))); + auto task = std::make_unique<ReconfigRunnableTask>(_ctx.getReconfigurer(), std::move(makeSure)); _ctx.getThreadingService().master().execute(std::move(task)); } else { LOG(warning, "There has arrived a new IndexCollection while replacing the active index. " @@ -838,7 +845,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, IIndexMaintainerOperations &operations) : _base_dir(config.getBaseDir()), _warmupConfig(config.getWarmup()), - _active_indexes(new ActiveDiskIndexes()), + _active_indexes(std::make_shared<ActiveDiskIndexes>()), _layout(config.getBaseDir()), _schema(config.getSchema()), _activeFusionSchema(), @@ -885,10 +892,10 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, _lastFlushTime = search::FileKit::getModificationTime(latest_index_dir); _current_serial_num = _flush_serial_num; const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir); - _selector.reset(FixedSourceSelector::load(selector, _next_id - 1).release()); + _selector = FixedSourceSelector::load(selector, _next_id - 1); } else { _flush_serial_num = 0; - _selector.reset(new FixedSourceSelector(0, "sourceselector", 1)); + _selector = std::make_shared<FixedSourceSelector>(0, "sourceselector", 1); } uint32_t baseId(_selector->getBaseId()); if (_last_fusion_id != baseId) { @@ -896,20 +903,22 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, uint32_t id_diff = _last_fusion_id - baseId; ostringstream ost; ost << "sourceselector_fusion(" << _last_fusion_id << ")"; - _selector.reset(getSourceSelector().cloneAndSubtract(ost.str(), id_diff).release()); + _selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff); assert(_last_fusion_id == _selector->getBaseId()); } _current_index_id = getNewAbsoluteId() - _last_fusion_id; assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); _selector->setDefaultSource(_current_index_id); - ISearchableIndexCollection::UP sourceList(loadDiskIndexes(spec, ISearchableIndexCollection::UP(new IndexCollection(_selector)))); + auto sourceList = loadDiskIndexes(spec, std::make_unique<IndexCollection>(_selector)); _current_index = operations.createMemoryIndex(_schema, *sourceList, _current_serial_num); LOG(debug, "Index manager created with flushed serial num %" PRIu64, _flush_serial_num); sourceList->append(_current_index_id, _current_index); sourceList->setCurrentIndex(_current_index_id); _source_list = std::move(sourceList); _fusion_spec = spec; - _ctx.getThreadingService().master().execute(makeLambdaTask([this,&config]() {pruneRemovedFields(_schema, config.getSerialNum()); })); + _ctx.getThreadingService().master().execute(makeLambdaTask([this,&config]() { + pruneRemovedFields(_schema, config.getSerialNum()); + })); _ctx.getThreadingService().master().sync(); } @@ -1014,7 +1023,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search { LockGuard slock(_state_lock); LockGuard ilock(_index_update_lock); - _activeFusionSchema.reset(new Schema(_schema)); + _activeFusionSchema = std::make_shared<Schema>(_schema); _activeFusionPrunedSchema.reset(); args._schema = _schema; } @@ -1234,10 +1243,9 @@ IndexMaintainer::getFlushTargets(void) { // Called by flush engine scheduler thread IFlushTarget::List ret; - IFlushTarget::SP indexFlush(new IndexFlushTarget(*this)); - IFlushTarget::SP indexFusion(new IndexFusionTarget(*this)); - ret.push_back(indexFlush); - ret.push_back(indexFusion); + ret.reserve(2); + ret.push_back(std::make_shared<IndexFlushTarget>(*this)); + ret.push_back(std::make_shared<IndexFusionTarget>(*this)); return ret; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h index d9d2479833f..35c0c0fbd2f 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -224,17 +224,10 @@ class IndexMaintainer : public IIndexManager, Schema::SP _prunedSchema; ISearchableIndexCollection::SP _old_source_list; // Delays destruction - FusionArgs() - : _new_fusion_id(0u), - _changeGens(), - _schema(), - _prunedSchema(), - _old_source_list() - { } + FusionArgs(); ~FusionArgs(); }; - IFlushTarget::SP getFusionTarget(); void scheduleFusion(const FlushIds &flushIds); bool canRunFusion(const FusionSpec &spec) const; bool doneFusion(FusionArgs *args, IDiskIndex::SP *new_index); @@ -246,12 +239,7 @@ class IndexMaintainer : public IIndexManager, IMemoryIndex::SP _oldIndex; ISearchableIndexCollection::SP _oldSourceList; // Delays destruction - SetSchemaArgs(void) - : _newSchema(), - _oldSchema(), - _oldIndex(), - _oldSourceList() - { } + SetSchemaArgs(); ~SetSchemaArgs(); }; @@ -268,7 +256,7 @@ class IndexMaintainer : public IIndexManager, * result. */ bool reconfigure(vespalib::Closure0<bool>::UP closure); - virtual void warmupDone(ISearchableIndexCollection::SP current) override; + void warmupDone(ISearchableIndexCollection::SP current) override; bool makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive); void scheduleCommit(); void commit(); @@ -371,4 +359,3 @@ public: }; } - |