diff options
Diffstat (limited to 'docproc/src/main/java/com/yahoo/docproc/impl/DocprocExecutor.java')
-rw-r--r-- | docproc/src/main/java/com/yahoo/docproc/impl/DocprocExecutor.java | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/docproc/src/main/java/com/yahoo/docproc/impl/DocprocExecutor.java b/docproc/src/main/java/com/yahoo/docproc/impl/DocprocExecutor.java new file mode 100644 index 00000000000..cfd43099cc8 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/impl/DocprocExecutor.java @@ -0,0 +1,193 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.impl; + +import com.yahoo.docproc.Call; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.json.JsonWriter; +import com.yahoo.jdisc.Metric; +import com.yahoo.text.Utf8; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static java.util.stream.Collectors.counting; +import static java.util.stream.Collectors.groupingBy; + +/** + * An executor executed incoming processings on its CallStack + * + * @author Einar M R Rosenvinge + */ +public class DocprocExecutor { + + private final static String METRIC_NAME_DOCUMENTS_PROCESSED = "documents_processed"; + + private static final Logger log = Logger.getLogger(DocprocExecutor.class.getName()); + + private final String name; + private final String docCounterName; + private final Metric metric; + private final Function<String, Metric.Context> contexts; + private final CallStack callStack; + + /** + * Creates a new named DocprocExecutor with the given CallStack. + * + * @param name the name of this executor + * @param callStack the chain of document processors this executor shall execute on processings + */ + public DocprocExecutor(String name, CallStack callStack) { + this.name = name; + String chainDimension = name != null ? name.replaceAll("[^\\p{Alnum}]", "_") : name; + docCounterName = "chain_" + chainDimension + "_documents"; + this.metric = callStack.getMetric(); + this.callStack = callStack; + this.callStack.setName(name); + this.contexts = cachedContexts(chainDimension); + } + + /** + * Creates a new named DocprocExecutor, with the same instance variables as the given executor, + * but a new call stack. + * + * @param oldExecutor the executor to inherit the instance variables from, sans call stack. + * @param callStack the call stack to use. + */ + public DocprocExecutor(DocprocExecutor oldExecutor, CallStack callStack) { + this.name = oldExecutor.name; + this.docCounterName = oldExecutor.docCounterName; + this.metric = oldExecutor.metric; + this.contexts = oldExecutor.contexts; + this.callStack = callStack; + } + + public CallStack getCallStack() { + return callStack; + } + + public String getName() { + return name; + } + + private void incrementNumDocsProcessed(Processing processing) { + List<DocumentOperation> operations = ((ProcessingAccess)processing).getOnceOperationsToBeProcessed(); + if ( ! operations.isEmpty()) { + metric.add(docCounterName, operations.size(), null); + operations.stream() + .collect(groupingBy(operation -> operation.getId().getDocType(), counting())) + .forEach((type, count) -> metric.add(METRIC_NAME_DOCUMENTS_PROCESSED, count, contexts.apply(type))); + } + } + + /** + * Processes a given Processing through the CallStack of this executor. + * + * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null. + * @return a Progress; if this is LATER, the Processing is not done and must be reprocessed later. + * @throws RuntimeException if a document processor throws an exception during processing. + * @see com.yahoo.docproc.Processing + */ + public DocumentProcessor.Progress process(Processing processing) { + processing.setServiceName(getName()); + if (processing.callStack() == null) { + ((ProcessingAccess)processing).setCallStack(new CallStack(getCallStack())); + } + + DocumentProcessor.Progress progress = DocumentProcessor.Progress.DONE; + //metrics stuff: + //TODO: Note that this is *wrong* in case of Progress.LATER, documents are then counted several times until the Processing is DONE or FAILED. + incrementNumDocsProcessed(processing); + do { + Call call = processing.callStack().pop(); + if (call == null) { + // No more processors - done + return progress; + } + + //might throw exception, which is OK: + progress = call.call(processing); + + if (log.isLoggable(Level.FINEST)) { + logProgress(processing, progress, call); + } + + if (DocumentProcessor.Progress.LATER.equals(progress)) { + processing.callStack().addNext(call); + return progress; + } + } while (DocumentProcessor.Progress.DONE.equals(progress)); + return progress; + } + + private void logProgress(Processing processing, DocumentProcessor.Progress progress, Call call) { + StringBuilder message = new StringBuilder(); + boolean first = true; + message.append(call.getDocumentProcessorId()).append(" of class ") + .append(call.getDocumentProcessor().getClass().getSimpleName()).append(" returned ").append(progress) + .append(" for the documents: ["); + for (DocumentOperation op : processing.getDocumentOperations()) { + if (first) { + first = false; + } else { + message.append(", "); + } + if (op instanceof DocumentPut) { + message.append(Utf8.toString(JsonWriter.toByteArray(((DocumentPut) op).getDocument()))); + } else { + message.append(op.toString()); + } + } + message.append("]"); + log.log(Level.FINEST, message.toString()); + } + + /** + * Processes a given Processing through the CallStack of this executor. Note that if a DocumentProcessor + * returns a LaterProgress for this processing, it will be re-processed (after waiting the specified delay given + * by the LaterProgress), until done or failed. + * + * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null. + * @return a Progress; this is never a LaterProgress. + * @throws RuntimeException if a document processor throws an exception during processing, or this thread is interrupted while waiting. + * @see com.yahoo.docproc.Processing + * @see com.yahoo.docproc.DocumentProcessor.Progress + * @see com.yahoo.docproc.DocumentProcessor.LaterProgress + */ + public DocumentProcessor.Progress processUntilDone(Processing processing) { + DocumentProcessor.Progress progress; + while (true) { + progress = process(processing); + if (!(progress instanceof DocumentProcessor.LaterProgress)) { + break; + } + DocumentProcessor.LaterProgress later = (DocumentProcessor.LaterProgress) progress; + try { + Thread.sleep(later.getDelay()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + return progress; + } + + private Function<String, Metric.Context> cachedContexts(String chainDimension) { + Map<String, Metric.Context> contextCache = new ConcurrentHashMap<>(); + return documentType -> contextCache.computeIfAbsent(documentType, type -> { + Map<String, String> dimensions = new HashMap<>(2); + dimensions.put("chain", chainDimension); + dimensions.put("documenttype", type); + return metric.createContext(dimensions); + }); + } + +} |