// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.docproc;
import com.yahoo.collections.Pair;
import com.yahoo.component.chain.ChainedComponent;
import com.yahoo.docproc.impl.DocprocService;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.logging.Logger;
/**
*
A document processor is a component which performs some
* operation on a document or document update. Document processors are asynchronous,
* they may request some data and then return. The processing framework
* is responsible for calling processors again at unspecified times
* until they are done processing the document or document update.
*
* Document processor instances
* are chained together by the framework to realize a complete processing pipeline.
* The processing chain is represented by the processor instances themselves, see
* getNext/setNext. Document processors may optionally control the routing
* through the chain by setting the next processor on ongoing processings.
*
* A processing may contain one or multiple documents or document updates. Document processors
* may optionally handle collections of processors in some other way than just
* processing each one in order.
*
* A document processor must have an empty constructor. When instantiated
* from Vespa config (as opposed to being instantiated programmatically in a stand-alone
* Docproc system), the framework is responsible for configuring the processor using
* setConfig(). If a document processor wants to do some initial setup after configuration
* has been set, but before it has begun processing documents or document updates, it should
* override initialize().
*
* Document processors must be thread safe. To ensure this, make sure that
* access to any mutable, thread-unsafe state held in a field by the processor is
* synchronized.
*
* @author bratseth
*/
public abstract class DocumentProcessor extends ChainedComponent {
static Logger log = Logger.getLogger(DocprocService.class.getName());
/** Schema map for doctype-fieldnames */
private Map, String> fieldMap = new HashMap<>();
/** For a doc type, the actual field name mapping to do */
// TODO: How to flush this when reconfig of schemamapping?
private final Map> docMapCache = new HashMap<>();
private final boolean hasAnnotations;
public DocumentProcessor() {
hasAnnotations = getClass().getAnnotation(Accesses.class) != null;
}
final boolean hasAnnotations() {
return hasAnnotations;
}
/**
* Processes a processing, which can contain zero or more document bases. The implementing document processor
* is free to modify, replace or delete elements in the list inside processing.
*
* @param processing the processing to process
* @return the outcome of this processing
*/
public abstract Progress process(Processing processing);
/** Sets the schema map for field names */
public void setFieldMap(Map, String> fieldMap) {
this.fieldMap = fieldMap;
}
/** Schema map for field names (doctype,from)→to */
public Map, String> getFieldMap() {
return fieldMap;
}
public Map getDocMap(String docType) {
Map cached = docMapCache.get(docType);
if (cached!=null) {
return cached;
}
Map ret = new HashMap<>();
for (Entry, String> e : fieldMap.entrySet()) {
// Remember to include tuple if doctype is unset in mapping
if (docType.equals(e.getKey().getFirst()) || e.getKey().getFirst()==null || "".equals(e.getKey().getFirst())) {
ret.put(e.getKey().getSecond(), e.getValue());
}
}
docMapCache.put(docType, ret);
return ret;
}
@Override
public String toString() {
return "processor " + getId().stringValue();
}
/** An enumeration of possible results of calling a process method */
public static class Progress {
/** Returned by a processor when it is done with a processing */
public static final Progress DONE = new Progress("done");
/**
* Returned by a processor when it should be called again later
* for the same processing
*/
public static final Progress LATER = new LaterProgress();
/**
* Returned by a processor when a processing has failed
* and it should not be called again for this processing.
*/
public static final Progress FAILED = new Progress("failed");
/**
* Returned by a processor when processing has permanently failed,
* so that the document processing service should disable itself until
* reconfigured or restarted.
*/
public static final Progress PERMANENT_FAILURE = new Progress("permanent_failure");
private final String name;
private Optional reason = Optional.empty();
protected Progress(String name) {
this.name = name;
}
protected Progress(String name, String reason) {
this(name);
this.reason = Optional.of(reason);
}
public static Progress later(long delay) {
return new LaterProgress(delay);
}
public Progress withReason(String reason) {
return new Progress(this.name, reason);
}
@Override
public String toString() {
return name;
}
public Optional getReason() {
return reason;
}
@Override
public boolean equals(Object object) {
return object instanceof Progress && ((Progress) object).name.equals(this.name);
}
@Override
public int hashCode() {
return name.hashCode();
}
}
public static final class LaterProgress extends Progress {
private final long delay;
public static final long DEFAULT_LATER_DELAY = 20; //ms
private LaterProgress() {
this(DEFAULT_LATER_DELAY);
}
private LaterProgress(long delay) {
super("later");
this.delay = delay;
}
public long getDelay() {
return delay;
}
}
}