// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.fastsearch;
import java.util.*;
import java.util.logging.Logger;
import com.yahoo.fs4.BasicPacket;
import com.yahoo.fs4.DocsumPacket;
import com.yahoo.fs4.DocumentInfo;
import com.yahoo.fs4.Packet;
import com.yahoo.fs4.QueryResultPacket;
import com.yahoo.document.GlobalId;
import com.yahoo.document.DocumentId;
/**
* A wrapper for cache entries to make it possible to check whether the
* hits are truly correct.
*
* @author Steinar Knutsen
* @author Mathias Lidal
*/
public class PacketWrapper implements Cloneable {
private static Logger log = Logger.getLogger(PacketWrapper.class.getName());
final int keySize;
// associated result packets, sorted in regard to offset
private ArrayList resultPackets = new ArrayList<>(3); // length = "some small number"
LinkedHashMap packets;
private static class ResultPacketComparator implements Comparator {
@Override
public int compare(T o1, T o2) {
QueryResultPacket r1 = (QueryResultPacket) o1;
QueryResultPacket r2 = (QueryResultPacket) o2;
return r1.getOffset() - r2.getOffset();
}
}
private static ResultPacketComparator resultPacketComparator = new ResultPacketComparator<>();
public PacketWrapper(CacheKey key, DocsumPacketKey[] packetKeys, BasicPacket[] bpackets) {
// Should not support key == null
this.keySize = key.byteSize();
resultPackets.add(bpackets[0]);
this.packets = new LinkedHashMap<>();
Packet[] ppackets = new Packet[packetKeys.length];
for (int i = 0; i < packetKeys.length; i++) {
ppackets[i] = (Packet) bpackets[i + 1];
}
addDocsums(packetKeys, ppackets);
}
/**
* Only used by PacketCacheTestCase, should not be used otherwise
*/
public PacketWrapper(CacheKey key, BasicPacket[] packets) {
// Should support key == null as this is for testing
if (key == null) {
keySize = 0;
} else {
this.keySize = key.byteSize();
}
resultPackets.add(packets[0]);
this.packets = new LinkedHashMap<>();
for (int i = 0; i < packets.length - 1; i++) {
this.packets.put(new DocsumPacketKey(new GlobalId(new DocumentId("doc:test:" + i).getGlobalId()), i, null), packets[i + 1]);
}
}
public QueryResultPacket getFirstResultPacket() {
if (resultPackets.size() > 0) {
return (QueryResultPacket) resultPackets.get(0);
} else {
return null;
}
}
/**
* @return list of documents, null if not all are available
*/
public List getDocuments(int offset, int hits) {
// speculatively allocate list for the hits
List docs = new ArrayList<>(hits);
int currentOffset = 0;
QueryResultPacket r = getFirstResultPacket();
if (offset >= r.getTotalDocumentCount()) {
// shortcut especially for results with 0 hits
// >= both necessary for end of result sets and
// offset == 0 && totalDocumentCount == 0
return docs;
}
for (Iterator i = resultPackets.iterator(); i.hasNext();) {
QueryResultPacket result = (QueryResultPacket) i.next();
if (result.getOffset() > offset + currentOffset) {
// we haven't got all the requested document info objects
return null;
}
if (result.getOffset() + result.getDocumentCount()
<= currentOffset + offset) {
// no new hits available
continue;
}
List documents = result.getDocuments();
int packetOffset = (offset + currentOffset) - result.getOffset();
int afterLastDoc = Math.min(documents.size(), packetOffset + hits);
for (Iterator j = documents.subList(packetOffset, afterLastDoc).iterator();
docs.size() < hits && j.hasNext();
++currentOffset) {
docs.add(j.next());
}
if (hits == docs.size()
|| offset + docs.size() >= result.getTotalDocumentCount()) {
// We have the hits we need, or there are no more hits available
return docs;
}
}
return null;
}
public void addResultPacket(QueryResultPacket resultPacket) {
// This function only keeps the internal list sorted according
// to offset
int insertionPoint;
QueryResultPacket r;
if (resultPacket.getDocumentCount() == 0) {
return; // do not add a packet which does not contain new info
}
insertionPoint = Collections.binarySearch(resultPackets,
resultPacket,
resultPacketComparator);
if (insertionPoint < 0) {
// new offset
insertionPoint = ~insertionPoint; // (insertionPoint + 1) * -1;
resultPackets.add(insertionPoint, resultPacket);
cleanResultPackets();
} else {
// there exists a packet with same offset
r = (QueryResultPacket) resultPackets.get(insertionPoint);
if (resultPacket.getDocumentCount() > r.getDocumentCount()) {
resultPackets.set(insertionPoint, resultPacket);
cleanResultPackets();
}
}
}
private void cleanResultPackets() {
int marker;
QueryResultPacket previous;
if (resultPackets.size() == 1) {
return;
}
// we know the list is sorted with regard to offset
// First ensure the list grows in regards to lastOffset as well.
// Could have done this addResultPacket, but this makes the code
// simpler.
previous = (QueryResultPacket) resultPackets.get(0);
for (int i = 1; i < resultPackets.size(); ++i) {
QueryResultPacket r = (QueryResultPacket) resultPackets.get(i);
if (r.getOffset() + r.getDocumentCount()
<= previous.getOffset() + previous.getDocumentCount()) {
resultPackets.remove(i--);
} else {
previous = r;
}
}
marker = 0;
while (marker < (resultPackets.size() - 2)) {
QueryResultPacket r0 = (QueryResultPacket) resultPackets.get(marker);
QueryResultPacket r1 = (QueryResultPacket) resultPackets.get(marker + 1);
QueryResultPacket r2 = (QueryResultPacket) resultPackets.get(marker + 2);
int nextOffset = r0.getOffset() + r0.getDocumentCount();
if (r1.getOffset() < nextOffset
&& r2.getOffset() <= nextOffset) {
resultPackets.remove(marker + 1);
}
++marker;
}
}
/** Only for testing. */
public List getResultPackets() {
return resultPackets;
}
public void addDocsums(DocsumPacketKey[] packetKeys, BasicPacket[] bpackets,
int offset) {
Packet[] ppackets = new Packet[packetKeys.length];
for (int i = 0; i < packetKeys.length; i++) {
ppackets[i] = (Packet) bpackets[i + offset];
}
addDocsums(packetKeys, ppackets);
}
public void addDocsums(DocsumPacketKey[] packetKeys, Packet[] packets) {
if (packetKeys == null || packets == null) {
log.warning(
"addDocsums called with "
+ (packetKeys == null ? "packetKeys == null " : "")
+ (packets == null ? "packets == null" : ""));
return;
}
for (int i = 0; i < packetKeys.length && i < packets.length; i++) {
if (packetKeys[i] == null) {
log.warning(
"addDocsums called, but packetsKeys[" + i + "] is null");
} else if (packets[i] instanceof DocsumPacket) {
DocsumPacket dp = (DocsumPacket) packets[i];
if (packetKeys[i].getGlobalId().equals(dp.getGlobalId())
&& dp.getData().length > 0)
{
this.packets.put(packetKeys[i], packets[i]);
log.fine("addDocsums " + i + " globalId: " + dp.getGlobalId());
} else {
log.warning("not caching bad Docsum for globalId " + packetKeys[i].getGlobalId() + ": " + dp);
}
} else {
log.warning(
"addDocsums called, but packets[" + i
+ "] is not a DocsumPacket instance");
}
}
}
public int getNumPackets() {
return packets.size();
}
BasicPacket getPacket(GlobalId globalId, int partid, String summaryClass) {
return getPacket(
new DocsumPacketKey(globalId, partid, summaryClass));
}
BasicPacket getPacket(DocsumPacketKey packetKey) {
return packets.get(packetKey);
}
long getTimestamp() {
return getFirstResultPacket().getTimestamp();
}
public void setTimestamp(long timestamp) {
getFirstResultPacket().setTimestamp(timestamp);
}
public int getPacketsSize() {
int size = 0;
for (Iterator i = resultPackets.iterator(); i.hasNext();) {
QueryResultPacket r = (QueryResultPacket) i.next();
int l = r.getLength();
if (l < 0) {
log.warning("resultpacket length " + l);
l = 10240;
}
size += l;
}
for (Iterator i = packets.values().iterator(); i.hasNext();) {
BasicPacket packet = i.next();
int l = packet.getLength();
if (l < 0) {
log.warning("BasicPacket length " + l);
l = 10240;
}
size += l;
}
size += keySize;
return size;
}
/**
* Straightforward shallow copy.
*/
@SuppressWarnings("unchecked")
public Object clone() {
try {
PacketWrapper other = (PacketWrapper) super.clone();
other.resultPackets = (ArrayList) resultPackets.clone();
if (packets != null) {
other.packets = (LinkedHashMap) packets.clone();
}
return other;
} catch (CloneNotSupportedException e) {
throw new RuntimeException("A non-cloneable superclass has been inserted.",
e);
}
}
}