diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /vdslib/src/main |
Publish
Diffstat (limited to 'vdslib/src/main')
28 files changed, 3419 insertions, 0 deletions
diff --git a/vdslib/src/main/java/com/yahoo/vdslib/BinaryDocumentList.java b/vdslib/src/main/java/com/yahoo/vdslib/BinaryDocumentList.java new file mode 100644 index 00000000000..d2ccf2b2805 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/BinaryDocumentList.java @@ -0,0 +1,55 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; + +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.vespa.objects.Serializer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +class BinaryDocumentList extends DocumentList { + + private DocumentTypeManager docMan; + private byte[] buffer; + private int docCount; + + /** + * Create a new documentlist, using the given buffer. + * + * @param buffer buffer containing documents + */ + BinaryDocumentList(DocumentTypeManager docMan, byte[] buffer) { + this.docMan = docMan; + ByteBuffer buf = ByteBuffer.wrap(buffer); + buf.order(ByteOrder.LITTLE_ENDIAN); + docCount = buf.getInt(); + this.buffer = buffer; + + } + + @Override + public Entry get(int index) throws ArrayIndexOutOfBoundsException { + if (index < docCount) { + return Entry.create(docMan, buffer, index); + } else { + throw new ArrayIndexOutOfBoundsException(index + " >= " + docCount); + } + } + + @Override + public int size() { return docCount; } + + @Override + public int getApproxByteSize() { + return buffer.length; + } + + @Override + public void serialize(Serializer buf) { + buf.put(null, buffer); + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/BinaryEntry.java b/vdslib/src/main/java/com/yahoo/vdslib/BinaryEntry.java new file mode 100644 index 00000000000..4047a3ed4f0 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/BinaryEntry.java @@ -0,0 +1,82 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; + +import com.yahoo.document.*; +import com.yahoo.document.serialization.DocumentDeserializer; +import com.yahoo.document.serialization.DocumentDeserializerFactory; +import com.yahoo.io.GrowableByteBuffer; + +/** + * An entry in serialized form. + * + * @author <a href="mailto:thomasg@yahoo-inc.com">Thomas Gundersen</a>, <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +class BinaryEntry extends Entry { + private MetaEntry metaEntry; + private byte[] buffer; + private DocumentTypeManager docMan; + + /** + * Creates an entry from serialized form. + * @param docMan The documentmanager to use when deserializing. + * @param buffer the buffer to read the entry from + * @param entryIndex the index of the entry in the buffer + */ + BinaryEntry(DocumentTypeManager docMan, byte[] buffer, int entryIndex) { + this.buffer = buffer; + metaEntry = new MetaEntry(buffer, 4 + entryIndex * MetaEntry.SIZE); + this.docMan = docMan; + } + + @Override + public boolean valid() { return buffer != null; } + + @Override + public boolean isRemoveEntry() { return (metaEntry.flags & MetaEntry.REMOVE_ENTRY) != 0; } + + @Override + public boolean isBodyStripped() { return (metaEntry.flags & MetaEntry.BODY_STRIPPED) != 0; } + + @Override + public boolean isUpdateEntry() { return (metaEntry.flags & MetaEntry.UPDATE_ENTRY) != 0; } + + @Override + public long getTimestamp() { return metaEntry.timestamp; } + + @Override + public DocumentOperation getDocumentOperation() { + DocumentDeserializer buf = DocumentDeserializerFactory.create42( + docMan, + GrowableByteBuffer.wrap(buffer, metaEntry.headerPos, metaEntry.headerLen), + (metaEntry.bodyLen > 0) ? GrowableByteBuffer.wrap(buffer, metaEntry.bodyPos, metaEntry.bodyLen) : null + ); + + DocumentOperation op; + + if ((metaEntry.flags & MetaEntry.UPDATE_ENTRY) != 0) { + op = new DocumentUpdate(buf); + } else if ((metaEntry.flags & MetaEntry.REMOVE_ENTRY) != 0) { + op = new DocumentRemove(new Document(buf).getId()); + } else { + op = new DocumentPut(new Document(buf)); + ((DocumentPut) op).getDocument().setLastModified(getTimestamp()); + + } + return op; + } + + @Override + public DocumentOperation getHeader() { + DocumentDeserializer buf = DocumentDeserializerFactory.create42(docMan, GrowableByteBuffer.wrap(buffer, metaEntry.headerPos, metaEntry.headerLen)); + if ((metaEntry.flags & MetaEntry.UPDATE_ENTRY) != 0) { + return new DocumentUpdate(buf); + } else if ((metaEntry.flags & MetaEntry.REMOVE_ENTRY) != 0) { + return new DocumentRemove(new Document(buf).getId()); + } else { + DocumentPut op = new DocumentPut(new Document(buf)); + op.getDocument().setLastModified(getTimestamp()); + return op; + } + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/BucketDistribution.java b/vdslib/src/main/java/com/yahoo/vdslib/BucketDistribution.java new file mode 100644 index 00000000000..0e3e80b2542 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/BucketDistribution.java @@ -0,0 +1,205 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib;
+
+import com.yahoo.document.BucketId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class BucketDistribution {
+
+ // A logger object to enable proper logging.
+ private static Logger log = Logger.getLogger(BucketDistribution.class.getName());
+
+ // A map from bucket id to column index.
+ private int[] bucketToColumn;
+
+ // The number of columns to distribute to.
+ private int numColumns;
+
+ // The number of bits to use for bucket identification.
+ private int numBucketBits;
+
+ /**
+ * Constructs a new bucket distribution object with a given number of columns and buckets.
+ *
+ * @param numColumns The number of columns to distribute to.
+ * @param numBucketBits The number of bits to use for bucket id.
+ */
+ public BucketDistribution(int numColumns, int numBucketBits) {
+ this.numBucketBits = numBucketBits;
+ bucketToColumn = new int[getNumBuckets()];
+ reset();
+ setNumColumns(numColumns);
+ }
+
+ /**
+ * Constructs a new bucket distribution object as a copy of another.
+ *
+ * @param other The distribution object to copy.
+ */
+ public BucketDistribution(BucketDistribution other) {
+ bucketToColumn = other.bucketToColumn.clone();
+ numColumns = other.numColumns;
+ numBucketBits = other.numBucketBits;
+ }
+
+ /**
+ * Returns the number of buckets that the given number of bucket bits will allow.
+ *
+ * @param numBucketBits The number of bits to use for bucket id.
+ * @return The number of buckets allowed.
+ */
+ private static int getNumBuckets(int numBucketBits) {
+ return 1 << numBucketBits;
+ }
+
+ /**
+ * This method returns a list that contains the distributions of the given number of buckets over the given number
+ * of columns.
+ *
+ * @param numColumns The number of columns to distribute to.
+ * @param numBucketBits The number of bits to use for bucket id.
+ * @return The bucket distribution.
+ */
+ private static List<Integer> getBucketCount(int numColumns, int numBucketBits) {
+ List<Integer> ret = new ArrayList<Integer>(numColumns);
+ int cnt = getNumBuckets(numBucketBits) / numColumns;
+ int rst = getNumBuckets(numBucketBits) % numColumns;
+ for (int i = 0; i < numColumns; ++i) {
+ ret.add(cnt + (i < rst ? 1 : 0));
+ }
+ return ret;
+ }
+
+ /**
+ * This method returns a list similar to {@link com.yahoo.vdslib.BucketDistribution#getBucketCount(int, int)}, except that the returned list
+ * contains the number of buckets that will have to be migrated from each column if an additional column was added.
+ *
+ * @param numColumns The original number of columns.
+ * @param numBucketBits The number of bits to use for bucket id.
+ * @return The number of buckets to migrate, one value per column.
+ */
+ private static List<Integer> getBucketMigrateCount(int numColumns, int numBucketBits) {
+ List<Integer> ret = getBucketCount(numColumns++, numBucketBits);
+ int cnt = getNumBuckets(numBucketBits) / numColumns;
+ int rst = getNumBuckets(numBucketBits) % numColumns;
+ for (int i = 0; i < numColumns - 1; ++i) {
+ ret.set(i, ret.get(i) - (cnt + (i < rst ? 1 : 0)));
+ }
+ return ret;
+ }
+
+ /**
+ * Sets the number of columns to distribute to to 1, and resets the content of the internal bucket-to-column map so
+ * that it all buckets point to that single column.
+ */
+ public void reset() {
+ for (int i = 0; i < bucketToColumn.length; ++i) {
+ bucketToColumn[i] = 0;
+ }
+ numColumns = 1;
+ }
+
+ /**
+ * Adds a single column to this bucket distribution object. This will modify the internal bucket-to-column map so
+ * that it takes into account the new column.
+ */
+ private void addColumn() {
+ int newColumns = numColumns + 1;
+ List<Integer> migrate = getBucketMigrateCount(numColumns, numBucketBits);
+ int numBuckets = getNumBuckets(numBucketBits);
+ for (int i = 0; i < numBuckets; ++i) {
+ int old = bucketToColumn[i];
+ if (migrate.get(old) > 0) {
+ bucketToColumn[i] = numColumns; // move this bucket to the new column
+ migrate.set(old, migrate.get(old) - 1);
+ }
+ }
+ numColumns = newColumns;
+ }
+
+ /**
+ * Sets the number of columns to use for this document distribution object. This will reset and setup this object
+ * from scratch. The original number of buckets is maintained.
+ *
+ * @param numColumns The new number of columns to distribute to.
+ */
+ public synchronized void setNumColumns(int numColumns) {
+ if (numColumns < this.numColumns) {
+ reset();
+ }
+ if (numColumns == this.numColumns) {
+ return;
+ }
+ for (int i = numColumns - this.numColumns; --i >= 0; ) {
+ addColumn();
+ }
+ }
+
+ /**
+ * Returns the number of columns to distribute to.
+ *
+ * @return The number of columns.
+ */
+ public int getNumColumns() {
+ return numColumns;
+ }
+
+ /**
+ * Sets the number of buckets to use for this document distribution object. This will reset and setup this object
+ * from scratch. The original number of columns is maintained.
+ *
+ * @param numBucketBits The new number of bits to use for bucket id.
+ */
+ public synchronized void setNumBucketBits(int numBucketBits) {
+ if (numBucketBits == this.numBucketBits) {
+ return;
+ }
+ this.numBucketBits = numBucketBits;
+ bucketToColumn = new int[getNumBuckets(numBucketBits)];
+ int numColumns = this.numColumns;
+ reset();
+ setNumColumns(numColumns);
+ }
+
+ /**
+ * Returns the number of bits used for bucket identifiers.
+ *
+ * @return The number of bits.
+ */
+ public int getNumBucketBits() {
+ return numBucketBits;
+ }
+
+ /**
+ * Returns the number of buckets available using the configured number of bucket bits.
+ *
+ * @return The number of buckets.
+ */
+ public int getNumBuckets() {
+ return getNumBuckets(numBucketBits);
+ }
+
+ /**
+ * This method maps the given bucket id to its corresponding column.
+ *
+ * @param bucketId The bucket whose column to lookup.
+ * @return The column to distribute the bucket to.
+ */
+ public int getColumn(BucketId bucketId) {
+ int ret = (int)(bucketId.getId() & (getNumBuckets(numBucketBits) - 1));
+ if (ret >= bucketToColumn.length) {
+ log.log(Level.SEVERE,
+ "The bucket distribution map is not in sync with the number of bucket bits. " +
+ "This should never happen! Distribution is broken!!");
+ return 0;
+ }
+ return bucketToColumn[ret];
+ }
+}
diff --git a/vdslib/src/main/java/com/yahoo/vdslib/DocumentList.java b/vdslib/src/main/java/com/yahoo/vdslib/DocumentList.java new file mode 100644 index 00000000000..cbe5de78b76 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/DocumentList.java @@ -0,0 +1,108 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; + +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.vespa.objects.Serializer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class DocumentList { + + protected DocumentList() { } + + /** + * Creates a DocumentList from serialized form. + * + * @param docMan Documentmanager to use when deserializing + * @param buffer the buffer to read from + * @return a DocumentList instance + */ + public static DocumentList create(DocumentTypeManager docMan, byte[] buffer) { + return new BinaryDocumentList(docMan, buffer); + } + + /** + * Creates a DocumentList from a list of entries. + * @param entries the entries to create a DocumentList from + * @return a DocumentList instance + * @see com.yahoo.vdslib.Entry + */ + public static DocumentList create(List<Entry> entries) { + return new DynamicDocumentList(entries); + } + + /** + * Creates a DocumentList containing a single entry. + * + * @param entry the entry to create a DocumentList from + * @return a DocumentList instance + * @see com.yahoo.vdslib.Entry + */ + public static DocumentList create(Entry entry) { + return new DynamicDocumentList(entry); + } + + /** + * Retrieves the specified Entry from the list. + * + * @param index the index of the Entry to return (0-based) + * @return the entry at the specified position + * @throws ArrayIndexOutOfBoundsException if index is < 0 or > size() + * @throws com.yahoo.document.serialization.DeserializationException if the DocumentList is stored in binary form internally and deserialization fails + */ + public abstract Entry get(int index) throws ArrayIndexOutOfBoundsException; + + /** + * Returns the size of the list. + * + * @return the size of the list + */ + public abstract int size(); + + /** + * Returns the byte size of the list. The value returned is exact if the list is stored in + * binary form internally, otherwise it is approximate. + * + * @return the byte size of the list + */ + public abstract int getApproxByteSize(); + + /** + * Serialize the list into the given buffer. + * + * @param buf the buffer to serialize into + */ + public abstract void serialize(Serializer buf); + + /** + * Test if a contains b + * + * @param list DocumentList contained + * @return true if a contains b + */ + public boolean containsAll(DocumentList list) { + if( this.size() < list.size()) { + return false; + } + + Map<DocumentId, Integer> indexes = new HashMap<DocumentId, Integer>(); + for (int i=0; i<this.size(); ++i) { + indexes.put(this.get(i).getDocumentOperation().getId(), i); + } + for (int i=0; i<list.size(); ++i) { + Integer index = indexes.get(list.get(i).getDocumentOperation().getId()); + if (index == null || + list.get(i).getTimestamp() != this.get(index).getTimestamp() || + list.get(i).kind() != this.get(index).kind()) + { + return false; + } + } + return true; + } + +} + diff --git a/vdslib/src/main/java/com/yahoo/vdslib/DocumentSummary.java b/vdslib/src/main/java/com/yahoo/vdslib/DocumentSummary.java new file mode 100644 index 00000000000..27ca82b46dd --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/DocumentSummary.java @@ -0,0 +1,70 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; +import com.yahoo.vespa.objects.BufferSerializer; +import com.yahoo.vespa.objects.Deserializer; + +import java.nio.ByteOrder; +import java.io.UnsupportedEncodingException; +import java.lang.String; + + +public class DocumentSummary { + public static class Summary implements Comparable<Summary> { + private String docId; + private byte [] summary; + private Summary(String docId) { + this.docId = docId; + } + public Summary(String docId, byte [] summary) { + this(docId); + this.summary = summary; + } + final public String getDocId() { return docId; } + final public byte [] getSummary() { return summary; } + final public void setSummary(byte [] summary) { this.summary = summary; } + public int compareTo(Summary s) { + return getDocId().compareTo(s.getDocId()); + } + } + + private Summary [] summaries; + + public DocumentSummary(Deserializer buf) { + BufferSerializer bser = (BufferSerializer) buf; // This is a trick. This should be done in a different way. + bser.order(ByteOrder.BIG_ENDIAN); + int vacant4byteOldSeqId = buf.getInt(null); + int numSummaries = buf.getInt(null); + summaries = new Summary[numSummaries]; + if (numSummaries > 0) { + int summaryBufferSize = buf.getInt(null); + + byte[] cArr = bser.getBuf().array(); + int start = bser.getBuf().arrayOffset() + bser.position(); + bser.position(bser.position() + summaryBufferSize); + for(int i=0; i < numSummaries; i++) { + int summarySize = buf.getInt(null); + int end = start; + while (cArr[end++] != 0); + try { + byte [] sb = new byte [summarySize]; + System.arraycopy(cArr, end, sb, 0, summarySize); + summaries[i] = new Summary(new String(cArr, start, end-start-1, "utf-8"), sb); + start = end + summarySize; + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("UTF-8 apparently not supported"); + } + } + } + } + /** + * Constructs a new message from a byte buffer. + * + * @param buffer A byte buffer that contains a serialized message. + */ + public DocumentSummary(byte[] buffer) { + this(BufferSerializer.wrap(buffer)); + } + + final public int getSummaryCount() { return summaries.length; } + final public Summary getSummary(int hitNo) { return summaries[hitNo]; } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/DynamicDocumentList.java b/vdslib/src/main/java/com/yahoo/vdslib/DynamicDocumentList.java new file mode 100644 index 00000000000..dbc139352b0 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/DynamicDocumentList.java @@ -0,0 +1,161 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; + +import com.yahoo.compress.CompressionType; +import com.yahoo.document.*; +import com.yahoo.document.serialization.DocumentSerializer; +import com.yahoo.document.serialization.DocumentSerializerFactory; +import com.yahoo.vespa.objects.Serializer; + +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A list of document operations. + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DynamicDocumentList extends DocumentList { + private List<Entry> entries; + + DynamicDocumentList(List<Entry> entries) { + //the entries themselves are of course still modifiable, this is just an internal safeguard: + this.entries = Collections.unmodifiableList(entries); + } + + DynamicDocumentList(Entry entry) { + List<Entry> list = new ArrayList<>(1); + list.add(entry); + BucketIdFactory factory = new BucketIdFactory(); + //the entry itself is of course still modifiable, this is just an internal safeguard: + this.entries = Collections.unmodifiableList(list); + } + + @Override + public Entry get(int index) throws ArrayIndexOutOfBoundsException { + return entries.get(index); + } + + @Override + public int size() { + return entries.size(); + } + + @Override + public int getApproxByteSize() { + int size = 4; + for (Entry entry : entries) { + if (entry.getDocumentOperation() instanceof DocumentPut) { + Document doc = ((DocumentPut)entry.getDocumentOperation()).getDocument(); + size += MetaEntry.SIZE + doc.getSerializedSize(); + } else if (entry.getDocumentOperation() instanceof DocumentUpdate) { + //TODO: Implement getSerializedSize() for DocumentUpdate!!! + size += MetaEntry.SIZE + 1024; + } else if (entry.getDocumentOperation() instanceof DocumentRemove) { + //TODO: Implement getSerializedSize() for DocumentRemove!!! + size += MetaEntry.SIZE + 64; + } + } + return size; + } + + @Override + public void serialize(Serializer buf) { + if (buf instanceof DocumentSerializer) { + serializeInternal((DocumentSerializer) buf); + } else { + DocumentSerializer serializer = DocumentSerializerFactory.create42(); + serializeInternal(serializer); + serializer.getBuf().getByteBuffer().flip(); + buf.put(null, serializer.getBuf().getByteBuffer()); + } + } + private void serializeInternal(DocumentSerializer buf) { + ByteOrder originalOrder = buf.getBuf().order(); + buf.getBuf().order(ByteOrder.LITTLE_ENDIAN); + //save the position before the size + int posAtBeginning = buf.getBuf().position(); + + //write the number of entries + buf.putInt(null, entries.size()); + + //create a list of metaentries, one for each entry + List<MetaEntry> metaEntries = new ArrayList<MetaEntry>(entries.size()); + + //jump past the meta block, we will serialize this afterwards when we know sizes and positions + byte[] bogusEntry = new byte[entries.size() * MetaEntry.SIZE]; + buf.put(null, bogusEntry); + + for (Entry entry : entries) { + MetaEntry metaEntry = new MetaEntry(); + metaEntries.add(metaEntry); + + // is this a remove? in that case, set this flag + if (entry.isRemoveEntry()) metaEntry.flags |= MetaEntry.REMOVE_ENTRY; + // is the body stripped? in that case, set this flag + if (entry.isBodyStripped()) metaEntry.flags |= MetaEntry.BODY_STRIPPED; + // is this an update? in that case, set this flag + if (entry.getDocumentOperation() instanceof DocumentUpdate) metaEntry.flags |= MetaEntry.UPDATE_ENTRY; + // is this a document? in that case, try to set the timestamp + if (entry.getDocumentOperation() instanceof DocumentPut) { + Document doc = ((DocumentPut)entry.getDocumentOperation()).getDocument(); + Long lastModified = doc.getLastModified(); + if (lastModified != null) { + metaEntry.timestamp = lastModified; + } + + if (doc.getDataType().getHeaderType().getCompressionConfig() != null + && doc.getDataType().getHeaderType().getCompressionConfig().type != CompressionType.NONE) { + metaEntry.flags |= MetaEntry.COMPRESSED; + } + if (doc.getDataType().getBodyType().getCompressionConfig() != null + && doc.getDataType().getBodyType().getCompressionConfig().type != CompressionType.NONE) { + metaEntry.flags |= MetaEntry.COMPRESSED; + } + } + + metaEntry.headerPos = buf.getBuf().position() - posAtBeginning; + + buf.getBuf().order(ByteOrder.BIG_ENDIAN); + if (entry.getDocumentOperation() instanceof DocumentPut) { + Document doc = ((DocumentPut)entry.getDocumentOperation()).getDocument(); + //serialize document and save length: + doc.serializeHeader(buf); + } else if (entry.getDocumentOperation() instanceof DocumentUpdate) { + DocumentUpdate docUp = (DocumentUpdate) entry.getDocumentOperation(); + docUp.serialize(buf); + } else if (entry.getDocumentOperation() instanceof DocumentRemove) { + new Document(DataType.DOCUMENT, entry.getDocumentOperation().getId()).serialize(buf); + } else { + throw new IllegalArgumentException("Can not handle class " + entry.getDocumentOperation().getClass().getName()); + } + + metaEntry.headerLen = buf.getBuf().position() - metaEntry.headerPos - posAtBeginning; + + if (entry.getDocumentOperation() instanceof DocumentPut) { + metaEntry.bodyPos = buf.getBuf().position() - posAtBeginning; + Document doc = ((DocumentPut)entry.getDocumentOperation()).getDocument(); + doc.serializeBody(buf); + metaEntry.bodyLen = buf.getBuf().position() - metaEntry.bodyPos - posAtBeginning; + } else { + metaEntry.bodyPos = 0; + metaEntry.bodyLen = 0; + } + buf.getBuf().order(ByteOrder.LITTLE_ENDIAN); + + } + //save position after payload: + int posAfterEntries = buf.getBuf().position(); + //go to beginning (after length) to serialize metaentries: + buf.getBuf().position(posAtBeginning + 4); + //serialize metaentries + for (MetaEntry metaEntry : metaEntries) { + metaEntry.serialize(buf.getBuf()); + } + //set position to after payload: + buf.getBuf().position(posAfterEntries); + buf.getBuf().order(originalOrder); + } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/DynamicEntry.java b/vdslib/src/main/java/com/yahoo/vdslib/DynamicEntry.java new file mode 100644 index 00000000000..2aac8e711fc --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/DynamicEntry.java @@ -0,0 +1,75 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; + +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentUpdate; + +/** + * Represents an in-memory entry. + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +class DynamicEntry extends Entry { + private DocumentOperation op; + private boolean bodyStripped; + + DynamicEntry(DocumentOperation op, boolean bodyStripped) { + this.op = op; + this.bodyStripped = bodyStripped; + } + + DynamicEntry(DocumentUpdate op) { + this.op = op; + this.bodyStripped = false; + } + + DynamicEntry(DocumentRemove op) { + this.op = op; + this.bodyStripped = false; + } + + @Override + public boolean valid() { + return true; + } + + @Override + public boolean isRemoveEntry() { + return op instanceof DocumentRemove; + } + + @Override + public boolean isBodyStripped() { + return bodyStripped; + } + + @Override + public boolean isUpdateEntry() { + return op instanceof DocumentUpdate; + } + + @Override + public long getTimestamp() { + if (op instanceof DocumentPut) { + DocumentPut put = (DocumentPut) op; + final Long lastModified = put.getDocument().getLastModified(); + if (lastModified != null) { + return lastModified; + } + } + return 0L; + } + + @Override + public DocumentOperation getDocumentOperation() { + return op; + } + + @Override + public DocumentOperation getHeader() { + return op; + //TODO: Only return header fields of Document here...? + } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/Entry.java b/vdslib/src/main/java/com/yahoo/vdslib/Entry.java new file mode 100644 index 00000000000..8674db00419 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/Entry.java @@ -0,0 +1,159 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; + +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentUpdate; + +/** + * Represents a document operation in a DocumentList, which can currently be + * PUT, REMOVE and UPDATE. + * + * @author <a href="mailto:thomasg@yahoo-inc.com">Thomas Gundersen</a>, <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public abstract class Entry { + + protected Entry() { } + + /** + * Creates a new entry from serialized form. + * + * @param docMan Documentmanager to use when deserializing + * @param buffer the buffer to read the entry from + * @param entryIndex the index of the entry in the buffer + * @return an Entry reading from the buffer + */ + public static Entry create(DocumentTypeManager docMan, byte[] buffer, int entryIndex) { + return new BinaryEntry(docMan, buffer, entryIndex); + } + + /** + * Creates a new entry from a document operation. + * + * @param op the document in the entry + * @param bodyStripped true if the document contains only header fields + * @return an Entry for this document + */ + public static Entry create(DocumentOperation op, boolean bodyStripped) { + return new DynamicEntry(op, bodyStripped); + } + + /** + * Creates a new entry from a document operation. + * + * @param op the document in the entry + * @return an Entry for this document + */ + public static Entry create(DocumentOperation op) { + return create(op, false); + } + /** + * Creates a new entry from a document remove operation. + * + * @param doc the document in the entry + * @return an Entry for this document + */ + public static Entry create(DocumentRemove doc) { + return new DynamicEntry(doc); + } + + /** + * Creates a new entry from a document update operation. + * + * @param doc the document update in the entry + * @return an Entry for this document update + */ + public static Entry create(DocumentUpdate doc) { + return new DynamicEntry(doc); + } + + /** + * Entries in iterators gotten from DocumentList::end() are invalid. + * @return true if valid + */ + public abstract boolean valid(); + + /** + * Returns true if the Document represented by this entry has been removed from persistent storage. + * + * @return true if the Document has been removed + */ + public abstract boolean isRemoveEntry(); + + /** + * Returns true if the Document represented by this entry has gotten its body fields stripped + * away (note: the body fields might still be stored in persistent storage). + * + * @return true if the Document only has header fields + */ + public abstract boolean isBodyStripped(); + + /** + * Returns true if this entry represents a document update operation. + * + * @return true if this is a document update operation + */ + public abstract boolean isUpdateEntry(); + + + public int kind(){ + if (isRemoveEntry()) { + return 0; //REMOVE + } + if (isUpdateEntry()) { + return 2; //UPDATE + } + return 1; // PUT + } + + /** + * Returns the timestamp (last modified) of this entry, from persistent storage. + * @return the last modified timestamp of this entry + */ + public abstract long getTimestamp(); + + /** + * Returns the DocumentPut or DocumentUpdate operation in this entry. + * + * @return the DocumentOperation in this entry. + */ + public abstract DocumentOperation getDocumentOperation(); + + /** + * Returns the Document header (if this is a DocumentPut or a DocumentRemove operation), otherwise + * a DocumentUpdate operation. + * + * @return a DocumentPut operation containing a Document with only the header fields present + * @throws RuntimeException if deserialization fails, or if this is a DocumentUpdate operation + */ + public abstract DocumentOperation getHeader(); + + @Override + public boolean equals(Object obj) { + if ( this == obj ) { + return true; + } + if (!(obj instanceof Entry)) { + return false; + } + Entry entry = (Entry) obj; + return this.getDocumentOperation().getId().equals(entry.getDocumentOperation().getId()) && + this.getTimestamp() == entry.getTimestamp() && + this.kind() == entry.kind() && + this.isBodyStripped() == entry.isBodyStripped() && + this.valid() == entry.valid(); + } + + @Override + public int hashCode() { + int res = 31; + res = 31 * res + getDocumentOperation().getId().hashCode(); + res = (int) (31 * res + getTimestamp()); + res = 31 * res + kind()*31; + res = 31 * res + (isBodyStripped() ? 17 : 249); + res = 31 * res + (valid() ? 333 : 31); + + return res; + } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/MetaEntry.java b/vdslib/src/main/java/com/yahoo/vdslib/MetaEntry.java new file mode 100644 index 00000000000..2e5022bba53 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/MetaEntry.java @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; + +import com.yahoo.io.GrowableByteBuffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class MetaEntry { + public static int REMOVE_ENTRY = 1; + public static int BODY_STRIPPED = 2; + public static int BODY_IN_HEADER = 4; + public static int UPDATE_ENTRY = 8; + public static int COMPRESSED = 16; + + public static int SIZE = 32; + + public long timestamp = 0; + public int headerPos = 0; + public int headerLen = 0; + public int bodyPos = 0; + public int bodyLen = 0; + public byte flags = 0; + + public MetaEntry() { + } + + public MetaEntry(byte[] buffer, int position) { + ByteBuffer buf = ByteBuffer.wrap(buffer, position, SIZE); + buf.order(ByteOrder.LITTLE_ENDIAN); + + timestamp = buf.getLong(); + headerPos = buf.getInt(); + headerLen = buf.getInt(); + bodyPos = buf.getInt(); + bodyLen = buf.getInt(); + flags = buf.get(); + } + + public void serialize(GrowableByteBuffer buf) { + ByteOrder originalOrder = buf.order(); + buf.order(ByteOrder.LITTLE_ENDIAN); + buf.putLong(timestamp); // 8 + buf.putInt(headerPos); // 12 + buf.putInt(headerLen); // 16 + buf.putInt(bodyPos); // 20 + buf.putInt(bodyLen); // 24 + buf.putInt(flags); // 28 (written as little-endian int, this is on purpose) + buf.putInt(0); // 32 + buf.order(originalOrder); + } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/SearchResult.java b/vdslib/src/main/java/com/yahoo/vdslib/SearchResult.java new file mode 100644 index 00000000000..e09fa6f460a --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/SearchResult.java @@ -0,0 +1,119 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; +import com.yahoo.vespa.objects.BufferSerializer; +import com.yahoo.vespa.objects.Deserializer; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteOrder; +import java.util.Map; +import java.util.TreeMap; + +public class SearchResult { + public static class Hit implements Comparable<Hit> { + private String docId; + private double rank; + public Hit(Hit h) { + docId = h.docId; + rank = h.rank; + } + public Hit(String docId, double rank) { + this.rank = rank; + this.docId = docId; + } + final public String getDocId() { return docId; } + final public double getRank() { return rank; } + final public void setRank(double rank) { this.rank = rank; } + public int compareTo(Hit h) { + return (h.rank < rank) ? -1 : (h.rank > rank) ? 1 : 0; // Sort order: descending + } + } + public static class HitWithSortBlob extends Hit { + private byte [] sortBlob; + public HitWithSortBlob(Hit h, byte [] sb) { + super(h); + sortBlob = sb; + } + final public byte [] getSortBlob() { return sortBlob; } + public int compareTo(Hit h) { + HitWithSortBlob b = (HitWithSortBlob) h; + int m = java.lang.Math.min(sortBlob.length, b.sortBlob.length); + for (int i = 0; i < m; i++) { + if (sortBlob[i] != b.sortBlob[i]) { + return (((int)sortBlob[i]) & 0xff) < (((int)b.sortBlob[i]) & 0xff) ? -1 : 1; + } + } + return sortBlob.length - b.sortBlob.length; + } + } + private int totalHits; + private Hit[] hits; + private TreeMap<Integer, byte []> aggregatorList; + private TreeMap<Integer, byte []> groupingList; + + public SearchResult(Deserializer buf) { + BufferSerializer bser = (BufferSerializer) buf; // TODO: dirty cast. must do this differently + bser.order(ByteOrder.BIG_ENDIAN); + this.totalHits = buf.getInt(null); + int numHits = buf.getInt(null); + hits = new Hit[numHits]; + if (numHits != 0) { + int docIdBufferLength = buf.getInt(null); + byte[] cArr = bser.getBuf().array(); + int start = bser.getBuf().arrayOffset() + bser.position(); + for(int i=0; i < numHits; i++) { + int end = start; + while (cArr[end++] != 0); + try { + hits[i] = new Hit(new String(cArr, start, end-start-1, "utf-8"), 0); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("UTF-8 apparently not supported"); + } + start = end; + } + bser.position(start - bser.getBuf().arrayOffset()); + for(int i=0; i < numHits; i++) { + hits[i].setRank(buf.getDouble(null)); + } + } + + int numSortBlobs = buf.getInt(null); + int [] size = new int [numSortBlobs]; + for (int i = 0; i < numSortBlobs; i++) { + size[i] = buf.getInt(null); + } + for (int i = 0; i < numSortBlobs; i++) { + hits[i] = new HitWithSortBlob(hits[i], buf.getBytes(null, size[i])); + } + + int numAggregators = buf.getInt(null); + aggregatorList = new TreeMap<Integer, byte []>(); + for (int i = 0; i < numAggregators; i++) { + int aggrId = buf.getInt(null); + int aggrLength = buf.getInt(null); + aggregatorList.put(aggrId, buf.getBytes(null, aggrLength)); + } + + int numGroupings = buf.getInt(null); + groupingList = new TreeMap<Integer, byte []>(); + for (int i = 0; i < numGroupings; i++) { + int aggrId = buf.getInt(null); + int aggrLength = buf.getInt(null); + groupingList.put(aggrId, buf.getBytes(null, aggrLength)); + } + + } + /** + * Constructs a new message from a byte buffer. + * + * @param buffer A byte buffer that contains a serialized message. + */ + public SearchResult(byte[] buffer) { + this(BufferSerializer.wrap(buffer)); + } + + final public int getHitCount() { return hits.length; } + final public int getTotalHitCount() { return (totalHits != 0) ? totalHits : getHitCount(); } + final public Hit getHit(int hitNo) { return hits[hitNo]; } + final public Map<Integer, byte []> getAggregatorList() { return aggregatorList; } + final public Map<Integer, byte []> getGroupingList() { return groupingList; } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/VisitorOrdering.java b/vdslib/src/main/java/com/yahoo/vdslib/VisitorOrdering.java new file mode 100644 index 00000000000..cd543caa7f1 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/VisitorOrdering.java @@ -0,0 +1,40 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; + +public class VisitorOrdering { + public static int ASCENDING = 0; + public static int DESCENDING = 1; + + public int order; + public long orderingStart; + public short widthBits; + public short divisionBits; + + public VisitorOrdering() { + this(ASCENDING, (long)0, (short)0, (short)0); + } + + public VisitorOrdering(int order) { + this(order, (long)0, (short)0, (short)0); + } + + public VisitorOrdering(int order, long orderingStart, short widthBits, short divisionBits) { + this.order = order; + this.orderingStart = orderingStart; + this.widthBits = widthBits; + this.divisionBits = divisionBits; + } + + public int getOrder() { return order; } + public long getOrderingStart() { return orderingStart; } + public short getWidthBits() { return widthBits; } + public short getDivisionBits() { return divisionBits; } + + public String toString() { + String out = (order == ASCENDING ? "+" : "-") + + "," + widthBits + + "," + divisionBits + + "," + orderingStart; + return out; + } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/VisitorStatistics.java b/vdslib/src/main/java/com/yahoo/vdslib/VisitorStatistics.java new file mode 100644 index 00000000000..906eb0db58d --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/VisitorStatistics.java @@ -0,0 +1,57 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib; + +public class VisitorStatistics { + int bucketsVisited = 0; + long documentsVisited = 0; + long bytesVisited = 0; + long documentsReturned = 0; + long bytesReturned = 0; + long secondPassDocumentsReturned = 0; + long secondPassBytesReturned = 0; + + public void add(VisitorStatistics other) { + bucketsVisited += other.bucketsVisited; + documentsVisited += other.documentsVisited; + bytesVisited += other.bytesVisited; + documentsReturned += other.documentsReturned; + bytesReturned += other.bytesReturned; + secondPassDocumentsReturned += other.secondPassDocumentsReturned; + secondPassBytesReturned += other.secondPassBytesReturned; + } + + public int getBucketsVisited() { return bucketsVisited; } + public void setBucketsVisited(int bucketsVisited) { this.bucketsVisited = bucketsVisited; } + + public long getDocumentsVisited() { return documentsVisited; } + public void setDocumentsVisited(long documentsVisited) { this.documentsVisited = documentsVisited; } + + public long getBytesVisited() { return bytesVisited; } + public void setBytesVisited(long bytesVisited) { this.bytesVisited = bytesVisited; } + + public long getDocumentsReturned() { return documentsReturned; } + public void setDocumentsReturned(long documentsReturned) { this.documentsReturned = documentsReturned; } + + public long getBytesReturned() { return bytesReturned; } + public void setBytesReturned(long bytesReturned) { this.bytesReturned = bytesReturned; } + + public long getSecondPassDocumentsReturned() { return secondPassDocumentsReturned; } + public void setSecondPassDocumentsReturned(long secondPassDocumentsReturned) { this.secondPassDocumentsReturned = secondPassDocumentsReturned; } + + public long getSecondPassBytesReturned() { return secondPassBytesReturned; } + public void setSecondPassBytesReturned(long secondPassBytesReturned) { this.secondPassBytesReturned = secondPassBytesReturned; } + + public String toString() { + String out = + "Buckets visited: " + bucketsVisited + "\n" + + "Documents visited: " + documentsVisited + "\n" + + "Bytes visited: " + bytesVisited + "\n" + + "Documents returned: " + documentsReturned + "\n" + + "Bytes returned: " + bytesReturned + "\n" + + "Documents returned (2nd pass): " + secondPassDocumentsReturned + "\n" + + "Bytes returned (2nd pass): " + secondPassBytesReturned + "\n"; + + return out; + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/distribution/ConfiguredNode.java b/vdslib/src/main/java/com/yahoo/vdslib/distribution/ConfiguredNode.java new file mode 100644 index 00000000000..dd0ad29f16b --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/distribution/ConfiguredNode.java @@ -0,0 +1,42 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.distribution; + +/** + * A node configured to exist, with its configured node specific information. + * This is immutable. The identity and natural order of a node is its index. + * + * @author bratseth + */ +public class ConfiguredNode implements Comparable<ConfiguredNode> { + + private final int index; + + private final boolean retired; + + public ConfiguredNode(int index, boolean retired) { + this.index = index; + this.retired = retired; + } + + /** Return the index (distribution key) of this node */ + public int index() { return index; } + + /** Returns whether the node is configured to be retired */ + public boolean retired() { return retired; } + + @Override + public int hashCode() { return index; } + + @Override + public boolean equals(Object other) { + if (other == this) return true; + if ( ! (other instanceof ConfiguredNode)) return false; + return ((ConfiguredNode)other).index == this.index; + } + + @Override + public int compareTo(ConfiguredNode other) { + return Integer.compare(this.index, other.index); + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java new file mode 100644 index 00000000000..95b998b1ab5 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java @@ -0,0 +1,536 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.distribution; + +import com.yahoo.collections.BobHash; +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.vespa.config.content.StorDistributionConfig; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.vdslib.state.*; +import com.yahoo.document.BucketId; + +import java.util.*; +import java.text.ParseException; + +public class Distribution { + + private int[] distributionBitMasks = new int[65]; + private Group nodeGraph; + private int redundancy; + private boolean distributorAutoOwnershipTransferOnWholeGroupDown = false; + private ConfigSubscriber configSub; + + public Group getRootGroup() { + return nodeGraph; + } + + public int getRedundancy() { + return redundancy; + } + + private ConfigSubscriber.SingleSubscriber<StorDistributionConfig> configSubscriber = new ConfigSubscriber.SingleSubscriber<StorDistributionConfig>() { + private int[] getGroupPath(String path) { + if (path.equals("invalid")) { return new int[0]; } + StringTokenizer st = new StringTokenizer(path, "."); + int[] p = new int[st.countTokens()]; + for (int i=0; i<p.length; ++i) { + p[i] = Integer.valueOf(st.nextToken()); + } + return p; + } + + @Override + public void configure(StorDistributionConfig config) { + try{ + Group root = null; + for (int i=0; i<config.group().size(); ++i) { + StorDistributionConfig.Group cg = config.group().get(i); + int[] path = new int[0]; + if (root != null) { + path = getGroupPath(cg.index()); + } + boolean isLeafGroup = (cg.nodes().size() > 0); + Group group; + int index = (path.length == 0 ? 0 : path[path.length - 1]); + if (isLeafGroup) { + group = new Group(index, cg.name()); + List<ConfiguredNode> nodes = new ArrayList<>(); + for (StorDistributionConfig.Group.Nodes node : cg.nodes()) { + nodes.add(new ConfiguredNode(node.index(), node.retired())); + } + group.setNodes(nodes); + } else { + group = new Group(index, cg.name(), new Group.Distribution(cg.partitions(), config.redundancy())); + } + group.setCapacity(cg.capacity()); + if (path.length == 0) { + root = group; + } else { + assert(root != null); + Group parent = root; + for (int j=0; j<path.length - 1; ++j) { + parent = parent.getSubgroups().get(path[j]); + } + parent.addSubGroup(group); + } + } + if (root == null) { + throw new IllegalStateException("Got config that did not " + + "specify even a root group. Need a root group at" + + "\nminimum:\n" + config.toString()); + } + root.calculateDistributionHashValues(); + Distribution.this.nodeGraph = root; + Distribution.this.redundancy = config.redundancy(); + //Distribution.this.diskDistribution = config.disk_distribution(); + distributorAutoOwnershipTransferOnWholeGroupDown = config.distributor_auto_ownership_transfer_on_whole_group_down(); + } catch (ParseException e) { + throw (IllegalStateException) new IllegalStateException("Failed to parse config").initCause(e); + } + } + }; + + public Distribution(String configId) { + this(configId, null); + } + public Distribution(String configId, ConfigSourceSet configSources) { + int mask = 0; + for (int i=0; i<=64; ++i) { + distributionBitMasks[i] = mask; + mask = (mask << 1) | 1; + } + if (configSources==null) { + configSub = new ConfigSubscriber(); + } else { + configSub = new ConfigSubscriber(configSources); + } + configSub.subscribe(configSubscriber, StorDistributionConfig.class, configId); + } + + public Distribution(StorDistributionConfig config) { + int mask = 0; + for (int i=0; i<=64; ++i) { + distributionBitMasks[i] = mask; + mask = (mask << 1) | 1; + } + configSubscriber.configure(config); + } + + public void close() { + if (configSub!=null) configSub.close(); + } + + private int getGroupSeed(BucketId bucket, ClusterState state, Group group) { + int seed = ((int) bucket.getRawId()) & distributionBitMasks[state.getDistributionBitCount()]; + seed ^= group.getDistributionHash(); + return seed; + } + + private int getDistributorSeed(BucketId bucket, ClusterState state) { + return ((int) bucket.getRawId()) & distributionBitMasks[state.getDistributionBitCount()]; + } + + private int getStorageSeed(BucketId bucket, ClusterState state) { + int seed = ((int) bucket.getRawId()) & distributionBitMasks[state.getDistributionBitCount()]; + + if (bucket.getUsedBits() > 33) { + int usedBits = bucket.getUsedBits() - 1; + seed ^= (distributionBitMasks[usedBits - 32] + & (bucket.getRawId() >> 32)) << 6; + } + return seed; + } + + private class ScoredGroup implements Comparable<ScoredGroup> { + Group group; + double score; + + ScoredGroup(Group g, double score) { this.group = g; this.score = score; } + + @Override + public int compareTo(ScoredGroup o) { + // Sorts by highest first. + return new Double(o.score).compareTo(score); + } + } + private class ScoredNode { + int index; + int reliability; + double score; + + ScoredNode(int index, int reliability, double score) { this.index = index; this.reliability = reliability; this.score = score; } + } + private static boolean allDistributorsDown(Group g, ClusterState clusterState) { + if (g.isLeafGroup()) { + for (ConfiguredNode node : g.getNodes()) { + NodeState ns = clusterState.getNodeState(new Node(NodeType.DISTRIBUTOR, node.index())); + if (ns.getState().oneOf("ui")) return false; + } + } else { + for (Group childGroup : g.getSubgroups().values()) { + if (!allDistributorsDown(childGroup, clusterState)) return false; + } + } + return true; + } + private Group getIdealDistributorGroup(BucketId bucket, ClusterState clusterState, Group parent, int redundancy) { + if (parent.isLeafGroup()) return parent; + int[] redundancyArray = parent.getDistribution().getRedundancyArray(redundancy); + TreeSet<ScoredGroup> results = new TreeSet<>(); + int seed = getGroupSeed(bucket, clusterState, parent); + RandomGen random = new RandomGen(seed); + int currentIndex = 0; + for(Group g : parent.getSubgroups().values()) { + while (g.getIndex() < currentIndex++) random.nextDouble(); + double score = random.nextDouble(); + if (Math.abs(g.getCapacity() - 1.0) > 0.0000001) { + score = Math.pow(score, 1.0 / g.getCapacity()); + } + results.add(new ScoredGroup(g, score)); + } + if (distributorAutoOwnershipTransferOnWholeGroupDown) { + while (!results.isEmpty() && allDistributorsDown(results.first().group, clusterState)) { + results.remove(results.first()); + } + } + if (results.isEmpty()) return null; + return getIdealDistributorGroup(bucket, clusterState, results.first().group, redundancyArray[0]); + } + private class ResultGroup implements Comparable<ResultGroup> { + Group group; + int redundancy; + + ResultGroup(Group group, int redundancy) { + this.group = group; + this.redundancy = redundancy; + } + + @Override + public int compareTo(ResultGroup o) { + return group.compareTo(o.group); + } + } + public void getIdealGroups(BucketId bucketId, ClusterState clusterState, Group parent, + int redundancy, List<ResultGroup> results) { + if (parent.isLeafGroup()) { + results.add(new ResultGroup(parent, redundancy)); + return; + } + + int[] redundancyArray = parent.getDistribution().getRedundancyArray(redundancy); + + List<ScoredGroup> tmpResults = new ArrayList<>(); + for (int i = 0; i < redundancyArray.length; ++i) { + tmpResults.add(new ScoredGroup(null, 0.0)); + } + + int seed = getGroupSeed(bucketId, clusterState, parent); + + RandomGen random = new RandomGen(seed); + + int currentIndex = 0; + Map<Integer, Group> subGroups = parent.getSubgroups(); + + for (Map.Entry<Integer, Group> group : subGroups.entrySet()) { + while (group.getKey() < currentIndex++) { + random.nextDouble(); + } + + double score = random.nextDouble(); + + if (group.getValue().getCapacity() != 1) { + score = Math.pow(score, 1.0 / group.getValue().getCapacity()); + } + + if (score > tmpResults.get(tmpResults.size() - 1).score) { + tmpResults.add(new ScoredGroup(group.getValue(), score)); + Collections.sort(tmpResults); + tmpResults.remove(tmpResults.size() - 1); + } + } + + for (int i = 0; i < tmpResults.size(); ++i) { + Group group = tmpResults.get(i).group; + + if (group != null) { + getIdealGroups(bucketId, clusterState, group, redundancyArray[i], results); + } + } + } + + private int getDiskSeed(BucketId bucket, int nodeIndex) { + // Assumes MODULO_BID for now. + + long currentid = bucket.withoutCountBits(); + byte[] ordered = new byte[8]; + ordered[0] = (byte)(currentid >> (0*8)); + ordered[1] = (byte)(currentid >> (1*8)); + ordered[2] = (byte)(currentid >> (2*8)); + ordered[3] = (byte)(currentid >> (3*8)); + ordered[4] = (byte)(currentid >> (4*8)); + ordered[5] = (byte)(currentid >> (5*8)); + ordered[6] = (byte)(currentid >> (6*8)); + ordered[7] = (byte)(currentid >> (7*8)); + int initval = (1664525 * nodeIndex + 0xdeadbeef); + return BobHash.hash(ordered, initval); + } + /** + * This function should only depend on disk distribution and node index. It is + * assumed that any other change, for instance in hierarchical grouping, does + * not change disk index on disk. + */ + int getIdealDisk(NodeState nodeState, int nodeIndex, BucketId bucket) { + // Catch special cases in a single if statement + if (nodeState.getDiskCount() < 2) { + if (nodeState.getDiskCount() == 1) { + return 0; + } + throw new IllegalArgumentException( + "Cannot pick ideal disk without knowing disk count."); + } + + RandomGen randomizer = new RandomGen(getDiskSeed(bucket, nodeIndex)); + + double maxScore = 0.0; + int idealDisk = 0xffff; + for (int i=0, n=nodeState.getDiskCount(); i<n; ++i) { + double score = randomizer.nextDouble(); + DiskState diskState = (nodeState.getDiskState(i)); + if (diskState.getCapacity() != 1.0) { + score = Math.pow(score, + 1.0 / diskState.getCapacity()); + } + if (score > maxScore) { + maxScore = score; + idealDisk = i; + } + } + return idealDisk; + } + + public List<Integer> getIdealStorageNodes(ClusterState clusterState, BucketId bucket, + String upStates) throws TooFewBucketBitsInUseException { + List<Integer> resultNodes = new ArrayList<>(); + + // If bucket is split less than distribution bit, we cannot distribute + // it. Different nodes own various parts of the bucket. + if (bucket.getUsedBits() < clusterState.getDistributionBitCount()) { + String msg = "Cannot get ideal state for bucket " + bucket + " using " + + bucket.getUsedBits() + " bits when cluster uses " + + clusterState.getDistributionBitCount() + " distribution bits."; + throw new TooFewBucketBitsInUseException(msg); + } + + // Find what hierarchical groups we should have copies in + List<ResultGroup> groupDistribution = new ArrayList<>(); + + getIdealGroups(bucket, clusterState, nodeGraph, redundancy, groupDistribution); + + int seed = getStorageSeed(bucket, clusterState); + + RandomGen random = new RandomGen(seed); + int randomIndex = 0; + for (ResultGroup group : groupDistribution) { + int redundancy = group.redundancy; + Collection<ConfiguredNode> nodes = group.group.getNodes(); + + // Create temporary place to hold results. Use double linked list + // for cheap access to back(). Stuff in redundancy fake entries to + // avoid needing to check size during iteration. + LinkedList<ScoredNode> tmpResults = new LinkedList<>(); + for (int i = 0; i < redundancy; ++i) { + tmpResults.add(new ScoredNode(0, 0, 0.0)); + } + + for (ConfiguredNode configuredNode : nodes) { + NodeState nodeState = clusterState.getNodeState(new Node(NodeType.STORAGE, configuredNode.index())); + if (!nodeState.getState().oneOf(upStates)) { + continue; + } + + if (nodeState.isAnyDiskDown()) { + int idealDiskIndex = getIdealDisk(nodeState, configuredNode.index(), bucket); + if (nodeState.getDiskState(idealDiskIndex).getState() != State.UP) { + continue; + } + } + + // Get the score from the random number generator. Make sure we + // pick correct random number. Optimize for the case where we + // pick in rising order. + if (configuredNode.index() != randomIndex) { + if (configuredNode.index() < randomIndex) { + random.setSeed(seed); + randomIndex = 0; + } + + for (int k = randomIndex; k < configuredNode.index(); ++k) { + random.nextDouble(); + } + + randomIndex = configuredNode.index(); + } + + double score = random.nextDouble(); + ++randomIndex; + if (nodeState.getCapacity() != 1.0) { + score = Math.pow(score, 1.0 / nodeState.getCapacity()); + } + if (score > tmpResults.getLast().score) { + for (int i = 0; i < tmpResults.size(); ++i) { + if (score > tmpResults.get(i).score) { + tmpResults.add(i, new ScoredNode(configuredNode.index(), nodeState.getReliability(), score)); + break; + } + } + tmpResults.removeLast(); + } + } + + for (ScoredNode node : tmpResults) { + resultNodes.add(node.index); + } + } + + return resultNodes; + } + + public static class TooFewBucketBitsInUseException extends Exception { + public TooFewBucketBitsInUseException(String message) { + super(message); + } + } + public static class NoDistributorsAvailableException extends Exception { + public NoDistributorsAvailableException(String message) { + super(message); + } + } + public int getIdealDistributorNode(ClusterState state, BucketId bucket, String upStates) throws TooFewBucketBitsInUseException, NoDistributorsAvailableException { + if (bucket.getUsedBits() < state.getDistributionBitCount()) { + throw new TooFewBucketBitsInUseException("Cannot get ideal state for bucket " + bucket + " using " + bucket.getUsedBits() + + " bits when cluster uses " + state.getDistributionBitCount() + " distribution bits."); + } + + Group idealGroup = getIdealDistributorGroup(bucket, state, nodeGraph, redundancy); + int seed = getDistributorSeed(bucket, state); + RandomGen random = new RandomGen(seed); + int randomIndex = 0; + List<ConfiguredNode> configuredNodes = idealGroup.getNodes(); + ScoredNode node = new ScoredNode(0, 0, 0); + for (ConfiguredNode configuredNode : configuredNodes) { + NodeState nodeState = state.getNodeState(new Node(NodeType.DISTRIBUTOR, configuredNode.index())); + if (!nodeState.getState().oneOf(upStates)) continue; + if (configuredNode.index() != randomIndex) { + if (configuredNode.index() < randomIndex) { + random.setSeed(seed); + randomIndex = 0; + } + for (int k=randomIndex; k < configuredNode.index(); ++k) { + random.nextDouble(); + } + randomIndex = configuredNode.index(); + } + double score = random.nextDouble(); + ++randomIndex; + if (Math.abs(nodeState.getCapacity() - 1.0) > 0.0000001) { + score = Math.pow(score, 1.0 / nodeState.getCapacity()); + } + if (score > node.score) { + node = new ScoredNode(configuredNode.index(), 1, score); + } + } + if (node.reliability == 0) { + throw new NoDistributorsAvailableException( + "No available distributors in any of the given upstates '" + + upStates + "'."); + } + return node.index; + } + private boolean visitGroups(GroupVisitor visitor, Map<Integer, Group> groups) { + for (Group g : groups.values()) { + if (!visitor.visitGroup(g)) return false; + if (!g.isLeafGroup()) { + if (!visitGroups(visitor, g.getSubgroups())) { + return false; + } + } + } + return true; + } + public void visitGroups(GroupVisitor visitor) { + Map<Integer, Group> groups = new TreeMap<>(); + groups.put(nodeGraph.getIndex(), nodeGraph); + visitGroups(visitor, groups); + } + public Set<ConfiguredNode> getNodes() { + final Set<ConfiguredNode> nodes = new HashSet<>(); + GroupVisitor visitor = new GroupVisitor() { + @Override + public boolean visitGroup(Group g) { + if (g.isLeafGroup()) { + nodes.addAll(g.getNodes()); + } + return true; + } + }; + visitGroups(visitor); + return nodes; + } + + public static String getDefaultDistributionConfig(int redundancy, int nodeCount) { + return getDefaultDistributionConfig(redundancy, nodeCount, StorDistributionConfig.Disk_distribution.MODULO_BID); + } + + public static String getDefaultDistributionConfig(int redundancy, int nodeCount, StorDistributionConfig.Disk_distribution.Enum diskDistribution) { + StringBuilder sb = new StringBuilder(); + sb.append("raw:redundancy ").append(redundancy).append("\n") + .append("group[1]\n") + .append("group[0].index \"invalid\"\n") + .append("group[0].name \"invalid\"\n") + .append("group[0].partitions \"*\"\n") + .append("group[0].nodes[").append(nodeCount).append("]\n"); + for (int i=0; i<nodeCount; ++i) { + sb.append("group[0].nodes[").append(i).append("].index ").append(i).append("\n"); + } + sb.append("disk_distribution ").append(diskDistribution.toString()).append("\n"); + return sb.toString(); + } + public static String getSimpleGroupConfig(int redundancy, int nodeCount) { + return getSimpleGroupConfig(redundancy, nodeCount, StorDistributionConfig.Disk_distribution.Enum.MODULO_BID); + } + public static String getSimpleGroupConfig(int redundancy, int nodeCount, StorDistributionConfig.Disk_distribution.Enum diskDistribution) { + StringBuilder sb = new StringBuilder(); + sb.append("raw:redundancy ").append(redundancy).append("\n").append("group[4]\n"); + + int group = 0; + sb.append("group[" + group + "].index \"invalid\"\n") + .append("group[" + group + "].name \"invalid\"\n") + .append("group[" + group + "].partitions \"1|*\"\n"); + + ++group; + sb.append("group[" + group + "].index \"0\"\n") + .append("group[" + group + "].name \"east\"\n") + .append("group[" + group + "].partitions \"*\"\n"); + + ++group; + sb.append("group[" + group + "].index \"0.0\"\n") + .append("group[" + group + "].name \"g1\"\n") + .append("group[" + group + "].partitions \"*\"\n") + .append("group[" + group + "].nodes[").append((nodeCount + 1) / 2).append("]\n"); + for (int i=0; i<nodeCount; i += 2) { + sb.append("group[" + group + "].nodes[").append(i / 2).append("].index ").append(i).append("\n"); + } + + ++group; + sb.append("group[" + group + "].index \"0.1\"\n") + .append("group[" + group + "].name \"g2\"\n") + .append("group[" + group + "].partitions \"*\"\n") + .append("group[" + group + "].nodes[").append(nodeCount / 2).append("]\n"); + for (int i=1; i<nodeCount; i += 2) { + sb.append("group[" + group + "].nodes[").append(i / 2).append("].index ").append(i).append("\n"); + } + sb.append("disk_distribution ").append(diskDistribution.toString()).append("\n"); + return sb.toString(); + } +} + + diff --git a/vdslib/src/main/java/com/yahoo/vdslib/distribution/Group.java b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Group.java new file mode 100644 index 00000000000..7601dd31972 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Group.java @@ -0,0 +1,300 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.distribution; + +import java.util.*; +import java.text.ParseException; + +/** + * Represent a group in the tree structure of groups in hierarchical setup of VDS nodes. + */ +public class Group implements Comparable<Group> { + + private String name; + private Group parent = null; + private int index; + private int distributionHash; + private Distribution distribution = null; + private double capacity; + private Map<Integer, Group> subgroups; + private List<ConfiguredNode> nodes; + + public Group(int index, String name) { + this.name = name; + this.index = index; + this.distributionHash = 0; + this.distribution = null; + this.capacity = 1; + this.nodes = new ArrayList<>(); + this.subgroups = null; + } + + public Group(int index, String name, Distribution d) { + this.name = name; + this.index = index; + this.distributionHash = 0; + this.distribution = d; + this.capacity = 1; + this.nodes = null; + this.subgroups = new TreeMap<>(); + } + + private String getPathWithSeparator(String separator) { + if (parent != null) { + final String prefix = parent.getPathWithSeparator(separator); + return prefix.isEmpty() ? name : prefix + separator + name; + } else { + return ""; + } + } + + public String getPath() { + return getPathWithSeparator("."); + } + + public String getUnixStylePath() { + return "/" + getPathWithSeparator("/"); + } + + @Override + public int compareTo(Group o) { + return new Integer(index).compareTo(o.getIndex()); + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if ( ! (o instanceof Group)) { return false; } + Group other = (Group) o; + if ( ! name.equals(other.name) + || index != other.index + || (distribution == null ^ other.distribution == null) + || (distribution != null && ! distribution.equals(other.distribution)) + || Math.abs(capacity - other.capacity) > 0.0000001 + || (subgroups == null ^ other.subgroups == null) + || (subgroups != null && !subgroups.equals(other.subgroups)) + || (nodes == null ^ other.nodes == null) + || (nodes != null && !nodes.equals(other.nodes))) + { + return false; + } + return true; + } + + @Override + public int hashCode() { + return name.hashCode() + + 17 * index + + 23 * distribution.hashCode() + + 43 * subgroups.hashCode() + + 47 * nodes.hashCode(); + + } + + @Override + public String toString() { + return toString(""); + } + + public String toString(String indent) { + StringBuffer sb = new StringBuffer(); + sb.append("Group(name: ").append(name).append(", index: ").append(index); + if (distribution != null) sb.append(", distribution: ").append(distribution); + if (Math.abs(capacity - 1.0) > 0.0000001) sb.append(", capacity: ").append(capacity); + if (nodes != null) { + sb.append(", nodes( "); + for (ConfiguredNode node : nodes) { + sb.append(node.index()).append(' '); + } + sb.append(")"); + } + if (subgroups != null) { + sb.append(", subgroups: ").append(subgroups.size()); + } + sb.append(") {"); + if (subgroups != null && subgroups.size() > 0) { + for (Group g : subgroups.values()) { + sb.append("\n").append(indent).append(" "); + sb.append(g.toString(indent + " ")); + } + } + sb.append("\n").append(indent).append("}"); + return sb.toString(); + } + + public void addSubGroup(Group g) { + if (distribution == null) { + throw new IllegalStateException("Cannot add sub groups to a node without distribution set."); + } + if (subgroups.containsKey(g.getIndex())) { + throw new IllegalStateException("A subgroup with index " + g.getIndex() + " already exist."); + } + if (nodes != null) { + throw new IllegalStateException("Cannot add subgroup to leaf group with nodes"); + } + g.parent = this; + subgroups.put(g.getIndex(), g); + } + + public void setCapacity(double c) { capacity = c; } + + public void setNodes(List<ConfiguredNode> nodes) { + if (distribution != null) { + throw new IllegalStateException("Cannot add nodes to non-leaf group with distribution set"); + } + if (subgroups != null) { + throw new IllegalStateException("Cannot add nodes to group with children"); + } + this.nodes = new ArrayList<>(nodes); + Collections.sort(this.nodes); + } + + public String getName() { return name; } + public int getIndex() { return index; } + public List<ConfiguredNode> getNodes() { return Collections.unmodifiableList(nodes); } + public Map<Integer, Group> getSubgroups() { return Collections.unmodifiableMap(subgroups); } + public double getCapacity() { return capacity; } + public int getDistributionHash() { return distributionHash; } + public boolean isLeafGroup() { return (distribution == null); } + public Distribution getDistribution() { return distribution; } + + /** + * The distribution hashes is hopefully unique numbers for each group that is used to adjust the seed generated + * for groups. This is called by Distribution during configuration on the root node. It recursively generates all hashes. + */ + void calculateDistributionHashValues() { + calculateDistributionHashValues(0x8badf00d); + } + + private void calculateDistributionHashValues(int parentHash) { + distributionHash = parentHash ^ (1664525 * index + 1013904223); + if (subgroups == null) return; + for (Map.Entry<Integer, Group> entry : subgroups.entrySet()) { + entry.getValue().calculateDistributionHashValues(distributionHash); + } + } + + public Group getGroupForNode(int index) { + if (nodes != null) { + for (ConfiguredNode node : nodes) { + if (node.index() == index) { + return this; + } + } + } + + if (subgroups != null) { + for (Group group : subgroups.values()) { + Group retVal = group.getGroupForNode(index); + if (retVal != null) { + return retVal; + } + } + } + + return null; + } + + /** + * The distribution class keeps precalculated arrays for distributions for all legal redundancies. The class is + * immutable, such that it can be returned safely out from the group object. + */ + public static class Distribution { + + private final int[] distributionSpec; + private final int[][] preCalculatedResults; + + public Distribution(String serialized, int maxRedundancy) throws ParseException { + StringTokenizer st = new StringTokenizer(serialized, "|"); + // Create the distribution spec + int[] distributionSpec = new int[st.countTokens()]; + for (int i=0; i<distributionSpec.length; ++i) { + String token = st.nextToken(); + try{ + distributionSpec[i] = (token.equals("*") ? 0 : Integer.valueOf(token)); + } catch (NumberFormatException e) { + throw new ParseException("Illegal distribution spec \"" + serialized + "\". Copy counts must be integer values in the range 1-255.", i); + } + if (!token.equals("*") && distributionSpec[i] == 0) { + throw new ParseException("Illegal distribution spec \"" + serialized + "\". Copy counts must be in the range 1-255.", i); + } + } + // Verify sanity of the distribution spec + int firstAsterix = distributionSpec.length; + for (int i=0; i<distributionSpec.length; ++i) { + if (i > firstAsterix) { + if (distributionSpec[i] != 0) { + throw new ParseException("Illegal distribution spec \"" + serialized + "\". Asterix specification must be tailing the specification.", i); + } + continue; + } + if (i < firstAsterix && distributionSpec[i] == 0) { + firstAsterix = i; + continue; + } + if (distributionSpec[i] <= 0 || distributionSpec[i] >= 256) { + throw new ParseException("Illegal distribution spec \"" + serialized + "\". Copy counts must be in the range 1-255.", i); + } + } + this.distributionSpec = distributionSpec; + // Create the pre calculated results + if (maxRedundancy <= 0 || maxRedundancy > 255) throw new IllegalArgumentException("The max redundancy must be a positive number in the range 1-255."); + int asterixCount = distributionSpec.length - firstAsterix; + int[][] preCalculations = new int[maxRedundancy + 1][]; + for (int i=1; i<=maxRedundancy; ++i) { + List<Integer> spec = new ArrayList<Integer>(); + for (int j=0; j<distributionSpec.length; ++j) { + spec.add(distributionSpec[j]); + } + int remainingRedundancy = i; + for (int j=0; j<firstAsterix; ++j) { + spec.set(j, Math.min(remainingRedundancy, spec.get(j))); + remainingRedundancy -= spec.get(j); + } + int divided = remainingRedundancy / asterixCount; + remainingRedundancy = remainingRedundancy % asterixCount; + for (int j=firstAsterix; j<spec.size(); ++j) { + spec.set(j, divided + (j - firstAsterix < remainingRedundancy ? 1 : 0)); + } + while (spec.get(spec.size() - 1) == 0) { + spec.remove(spec.size() - 1); + } + preCalculations[i] = new int[spec.size()]; + Collections.sort(spec); + for (int j=0; j<spec.size(); ++j) preCalculations[i][j] = spec.get(spec.size() - 1 - j); + } + this.preCalculatedResults = preCalculations; + } + + public int[] getRedundancyArray(int redundancy) { + if (redundancy == 0 || redundancy >= preCalculatedResults.length) { + throw new IllegalArgumentException("Can only retrieve redundancy arrays in the inclusive range 1-" + (preCalculatedResults.length - 1) + "."); + } + return preCalculatedResults[redundancy]; + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if ( ! (o instanceof Distribution)) return false; + Distribution other = (Distribution) o; + return (distributionSpec == other.distributionSpec && preCalculatedResults.length == other.preCalculatedResults.length); + } + + @Override + public int hashCode() { + return Arrays.hashCode(distributionSpec) + 13 * preCalculatedResults.length; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + for (int i=0; i<distributionSpec.length; ++i) { + if (i != 0) sb.append('|'); + if (distributionSpec[i] == 0) sb.append('*'); + else sb.append(distributionSpec[i]); + } + return sb.toString(); + } + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/distribution/GroupVisitor.java b/vdslib/src/main/java/com/yahoo/vdslib/distribution/GroupVisitor.java new file mode 100644 index 00000000000..493413b4e36 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/distribution/GroupVisitor.java @@ -0,0 +1,8 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.distribution; + +public interface GroupVisitor { + + public boolean visitGroup(Group g); + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/distribution/RandomGen.java b/vdslib/src/main/java/com/yahoo/vdslib/distribution/RandomGen.java new file mode 100644 index 00000000000..9a0a1264529 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/distribution/RandomGen.java @@ -0,0 +1,18 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.distribution; + +public class RandomGen extends java.util.Random { + + public RandomGen() { + super(); + } + + public RandomGen(long seed) { + super(seed); + } + + public void setSeed(long seed){ + super.setSeed(seed); + nextDouble(); + } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/distribution/package-info.java b/vdslib/src/main/java/com/yahoo/vdslib/distribution/package-info.java new file mode 100644 index 00000000000..c4dfd2eca11 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/distribution/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.vdslib.distribution; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/vdslib/src/main/java/com/yahoo/vdslib/loadtype/.gitignore b/vdslib/src/main/java/com/yahoo/vdslib/loadtype/.gitignore new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/loadtype/.gitignore diff --git a/vdslib/src/main/java/com/yahoo/vdslib/package-info.java b/vdslib/src/main/java/com/yahoo/vdslib/package-info.java new file mode 100644 index 00000000000..7f38edb7ad1 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.vdslib; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/ClusterState.java b/vdslib/src/main/java/com/yahoo/vdslib/state/ClusterState.java new file mode 100644 index 00000000000..b3d572e48ae --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/ClusterState.java @@ -0,0 +1,403 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.state; +import com.yahoo.text.StringUtilities; + +import java.text.ParseException; +import java.util.*; + +/** + * Be careful about changing this class, as it mirrors the ClusterState in C++. + * Please update both if you need to change anything. + */ +public class ClusterState implements Cloneable { + + private int version = 0; + private State state = State.DOWN; + // nodeStates maps each of the non-up nodes that have an index <= the node count for its type. + private Map<Node, NodeState> nodeStates = new TreeMap<>(); + + // TODO: Change to one count for distributor and one for storage, rather than an array + // TODO: Rename, this is not the highest node count but the highest index + private ArrayList<Integer> nodeCount = new ArrayList<>(2); + + private String description = ""; + private int distributionBits = 16; + private boolean official = false; + + public ClusterState(String serialized) throws ParseException { + nodeCount.add(0); + nodeCount.add(0); + deserialize(serialized); + } + + public ClusterState clone() { + try{ + ClusterState state = (ClusterState) super.clone(); + state.nodeStates = new TreeMap<>(); + for (Map.Entry<Node, NodeState> entry : nodeStates.entrySet()) { + state.nodeStates.put(entry.getKey(), entry.getValue().clone()); + } + state.nodeCount = new ArrayList<>(2); + state.nodeCount.add(nodeCount.get(0)); + state.nodeCount.add(nodeCount.get(1)); + return state; + } catch (CloneNotSupportedException e) { + assert(false); // Should never happen + return null; + } + } + + public boolean equals(Object o) { + if (!(o instanceof ClusterState)) { return false; } + ClusterState other = (ClusterState) o; + if (version != other.version + || !state.equals(other.state) + || distributionBits != other.distributionBits + || !nodeCount.equals(other.nodeCount) + || !nodeStates.equals(other.nodeStates)) + { + return false; + } + return true; + } + + public boolean similarTo(Object o) { + if (!(o instanceof ClusterState)) { return false; } + ClusterState other = (ClusterState) o; + + if (state.equals(State.DOWN) && other.state.equals(State.DOWN)) return true; // both down, means equal (why??) + if (version != other.version || !state.equals(other.state)) return false; + if (distributionBits != other.distributionBits) return false; + if ( ! nodeCount.equals(other.nodeCount)) return false; + + for (Map.Entry<Node, NodeState> nodeStateEntry : nodeStates.entrySet()) { + NodeState otherNodeState = other.nodeStates.get(nodeStateEntry.getKey()); + if (otherNodeState == null || ! otherNodeState.similarTo(nodeStateEntry.getValue())) return false; + } + return true; + } + + /** + * Fleet controller marks states that are actually sent out to nodes as official states. Only fleetcontroller + * should set this to official, and only just before sending it out. This state is currently not serialized with + * the system state, but only used internally in the fleetcontroller. Might be useful client side though, where + * clients modify states to mark nodes down that they cannot speak with. + */ + public void setOfficial(boolean official) { this.official = official; } + /** Whether this system state is an unmodified version of an official system state. */ + public boolean isOfficial() { return official; } + + /** Used during deserialization */ + private class NodeData { + + boolean empty = true; + Node node = new Node(NodeType.STORAGE, 0); + StringBuilder sb = new StringBuilder(); + + public void addNodeState() throws ParseException { + if (!empty) { + NodeState ns = NodeState.deserialize(node.getType(), sb.toString()); + if (!ns.equals(new NodeState(node.getType(), State.UP))) { + nodeStates.put(node, ns); + } + if (nodeCount.get(node.getType().ordinal()) <= node.getIndex()) { + nodeCount.set(node.getType().ordinal(), node.getIndex() + 1); + } + } + empty = true; + sb = new StringBuilder(); + } + } + + private void deserialize(String serialized) throws ParseException { + official = false; + StringTokenizer st = new StringTokenizer(serialized, " \t\n\f\r", false); + NodeData nodeData = new NodeData(); + String lastAbsolutePath = ""; + state = State.UP; + while (st.hasMoreTokens()) { + String token = st.nextToken(); + + int index = token.indexOf(':'); + if (index < 0) { + throw new ParseException("Token " + token + " does not contain ':': " + serialized, 0); + } + String key = token.substring(0, index); + String value = token.substring(index + 1); + if (key.length() > 0 && key.charAt(0) == '.') { + if (lastAbsolutePath.equals("")) { + throw new ParseException("The first path in system state string needs to be absolute, in state: " + serialized, 0); + } + key = lastAbsolutePath + key; + } else { + lastAbsolutePath = key; + } + if (key.length() > 0) switch (key.charAt(0)) { + case 'c': + if (key.equals("cluster")) { + setClusterState(State.get(value)); + continue; + } + break; + case 'b': + if (key.equals("bits")) { + distributionBits = Integer.parseInt(value); + continue; + } + break; + case 'v': + if (key.equals("version")) { + Integer version; + try{ + version = Integer.valueOf(value); + } catch (Exception e) { + throw new ParseException("Illegal version '" + value + "'. Must be an integer, in state: " + serialized, 0); + } + setVersion(version); + continue; + } + break; + case 'm': + if (key.length() > 1) break; + setDescription(StringUtilities.unescape(value)); + continue; + case 'd': + case 's': + NodeType nodeType = null; + int dot = key.indexOf('.'); + String type = (dot < 0 ? key : key.substring(0, dot)); + if (type.equals("storage")) { + nodeType = NodeType.STORAGE; + } else if (type.equals("distributor")) { + nodeType = NodeType.DISTRIBUTOR; + } + if (nodeType == null) break; + if (dot < 0) { + int nodeCount; + try{ + nodeCount = Integer.valueOf(value); + } catch (Exception e) { + throw new ParseException("Illegal node count '" + value + "' in state: " + serialized, 0); + } + if (nodeCount > this.nodeCount.get(nodeType.ordinal())) { + this.nodeCount.set(nodeType.ordinal(), nodeCount); + } + continue; + } + int dot2 = key.indexOf('.', dot + 1); + Node node; + if (dot2 < 0) { + node = new Node(nodeType, Integer.valueOf(key.substring(dot + 1))); + } else { + node = new Node(nodeType, Integer.valueOf(key.substring(dot + 1, dot2))); + } + if (node.getIndex() >= this.nodeCount.get(nodeType.ordinal())) { + throw new ParseException("Cannot index " + nodeType + " node " + node.getIndex() + " of " + this.nodeCount.get(nodeType.ordinal()) + " in state: " + serialized, 0); + } + if (!nodeData.node.equals(node)) { + nodeData.addNodeState(); + } + if (dot2 < 0) { + break; // No default key for nodeStates. + } else { + nodeData.sb.append(' ').append(key.substring(dot2 + 1)).append(':').append(value); + } + nodeData.node = node; + nodeData.empty = false; + continue; + default: + break; + } + // Ignore unknown nodeStates + } + nodeData.addNodeState(); + removeLastNodesDownWithoutReason(); + } + + public String getTextualDifference(ClusterState other) { + return getDiff(other).toString(); + } + public String getHtmlDifference(ClusterState other) { + return getDiff(other).toHtml(); + } + + public Diff getDiff(ClusterState other) { + Diff diff = new Diff(); + + if (version != other.version) { + diff.add(new Diff.Entry("version", version, other.version)); + } + if (!state.equals(other.state)) { + diff.add(new Diff.Entry("cluster", state, other.state)); + } + if (distributionBits != other.distributionBits) { + diff.add(new Diff.Entry("bits", distributionBits, other.distributionBits)); + } + if (official != other.official) { + diff.add(new Diff.Entry("official", official, other.official)); + } + for (NodeType type : NodeType.getTypes()) { + Diff typeDiff = new Diff(); + int maxCount = Math.max(getNodeCount(type), other.getNodeCount(type)); + for (int i = 0; i < maxCount; i++) { + Node n = new Node(type, i); + Diff d = getNodeState(n).getDiff(other.getNodeState(n)); + if (d.differs()) { + typeDiff.add(new Diff.Entry(i, d)); + } + } + if (typeDiff.differs()) { + diff.add(new Diff.Entry(type, typeDiff).splitLine()); + } + } + return diff; + } + + public int getVersion() { + return version; + } + + public void setVersion(int version) { + official = false; + this.version = version; + } + + public int getDistributionBitCount() { return distributionBits; } + public void setDistributionBits(int bits) { distributionBits = bits; } + + /** + * Returns the state of this cluster state. In particular, it does not return the cluster state, + * no matter what the function name says. + */ + public State getClusterState() { return state; } + + /** + * Sets the state of this cluster state. In particular, it does not set the cluster state, + * no matter what the function name says. + */ + public void setClusterState(State s) { + if (!s.validClusterState()) { + throw new IllegalArgumentException("Illegal cluster state " + s); + } + state = s; + } + + /** + * Take the distributor nodes as an example. Let X be the highest index of + * the distributor nodes added through setNodeState(). Let Y be the number + * of suffix nodes for which the state is down and without description. + * E.g. if node X is down and without description, but nodex X-1 is up, then Y is 1. + * The node count for distributors is then X + 1 - Y. + */ + public int getNodeCount(NodeType type) { return nodeCount.get(type.ordinal()); } + + /** + * Returns the state of a node. + * If the node is not known this returns a node in the state UP (never null) if it has lower index than the max + * and DOWN otherwise. + */ + public NodeState getNodeState(Node node) { + if (node.getIndex() >= nodeCount.get(node.getType().ordinal())) + return new NodeState(node.getType(), State.DOWN); + return nodeStates.getOrDefault(node, new NodeState(node.getType(), State.UP)); + } + + /** + * Set the node state of the given node. + * + * Automatically adjusts number of nodes of that given type if out of range of current nodes seen. + */ + public void setNodeState(Node node, NodeState newState) { + newState.verifyValidInSystemState(node.getType()); + if (node.getIndex() >= nodeCount.get(node.getType().ordinal())) { + for (int i= nodeCount.get(node.getType().ordinal()); i<node.getIndex(); ++i) { + nodeStates.put(new Node(node.getType(), i), new NodeState(node.getType(), State.DOWN)); + } + nodeCount.set(node.getType().ordinal(), node.getIndex() + 1); + } + if (newState.equals(new NodeState(node.getType(), State.UP))) { + nodeStates.remove(node); + } else { + nodeStates.put(node, newState); + } + if (newState.getState().equals(State.DOWN)) { + // We might be setting the last node down, so we can remove some states + removeLastNodesDownWithoutReason(); + } + } + + private void removeLastNodesDownWithoutReason() { + for (NodeType nodeType : NodeType.values()) { + for (int index = nodeCount.get(nodeType.ordinal()) - 1; index >= 0; --index) { + Node node = new Node(nodeType, index); + NodeState nodeState = nodeStates.get(node); + if (nodeState == null) break; // Node not existing is up + if ( ! nodeState.getState().equals(State.DOWN)) break; // Node not down can not be removed + if (nodeState.hasDescription()) break; // Node have reason to be down. Don't remove node as we will forget reason + nodeStates.remove(node); + nodeCount.set(nodeType.ordinal(), node.getIndex()); + } + } + } + + public String getDescription() { return description; } + + public void setDescription(String description) { + this.description = description; + } + + /** Returns the serialized form of this cluster state */ + // TODO: Don't rely on toString for that + @Override + public String toString() { return toString(false); } + + public String toString(boolean verbose) { + StringBuilder sb = new StringBuilder(); + + if (version != 0) { + sb.append(" version:").append(version); + } + + if (!state.equals(State.UP)) { + sb.append(" cluster:").append(state.serialize()); + } + + if (distributionBits != 16) { + sb.append(" bits:").append(distributionBits); + } + + int distributorNodeCount = getNodeCount(NodeType.DISTRIBUTOR); + int storageNodeCount = getNodeCount(NodeType.STORAGE); + // If not printing verbose, we're not printing descriptions, so we can remove tailing nodes that are down that has descriptions too + if (!verbose) { + while (distributorNodeCount > 0 && getNodeState(new Node(NodeType.DISTRIBUTOR, distributorNodeCount - 1)).getState().equals(State.DOWN)) --distributorNodeCount; + while (storageNodeCount > 0 && getNodeState(new Node(NodeType.STORAGE, storageNodeCount - 1)).getState().equals(State.DOWN)) --storageNodeCount; + } + if (distributorNodeCount > 0){ + sb.append(" distributor:").append(distributorNodeCount); + for (Map.Entry<Node, NodeState> entry : nodeStates.entrySet()) { + if (entry.getKey().getType().equals(NodeType.DISTRIBUTOR) && entry.getKey().getIndex() < distributorNodeCount) { + String nodeState = entry.getValue().serialize(entry.getKey().getIndex(), verbose); + if (!nodeState.isEmpty()) { + sb.append(' ').append(nodeState); + } + } + } + } + if (storageNodeCount > 0){ + sb.append(" storage:").append(storageNodeCount); + for (Map.Entry<Node, NodeState> entry : nodeStates.entrySet()) { + if (entry.getKey().getType().equals(NodeType.STORAGE) && entry.getKey().getIndex() < storageNodeCount) { + String nodeState = entry.getValue().serialize(entry.getKey().getIndex(), verbose); + if (!nodeState.isEmpty()) { + sb.append(' ').append(nodeState); + } + } + } + } + if (sb.length() > 0) { // Remove first space if not empty + sb.deleteCharAt(0); + } + return sb.toString(); + } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/Diff.java b/vdslib/src/main/java/com/yahoo/vdslib/state/Diff.java new file mode 100644 index 00000000000..084c2bbb851 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/Diff.java @@ -0,0 +1,121 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.state; + +import java.util.LinkedList; +import java.util.List; + +/** + * TODO: document this + */ +public class Diff { + public static class Entry { + String id; + // Values set for entries that contain diff themselves + String preContent; + String postContent; + boolean bold = false; // Print content in bold. Used if very important + // Values set for entries that contains subdiffs + Diff subDiff; + boolean splitLine = false; // If set, split this content on multiple lines + + public Entry(Object id, Object pre, Object post) { + this.id = id.toString(); + preContent = pre.toString(); + postContent = post.toString(); + } + + public Entry(Object id, Diff subDiff) { + this.id = id.toString(); + this.subDiff = subDiff; + } + + public Entry bold() { bold = true; return this; } + public Entry splitLine() { splitLine = true; return this; } + } + private List<Entry> diff = new LinkedList<Entry>(); + + public void add(Entry e) { diff.add(e); } + + public boolean differs() { return (!diff.isEmpty()); } + + class PrintProperties { + boolean insertLineBreaks = false; + boolean ommitGroupForSingleEntries = true; + String lineBreak = "\n"; + String entrySeparator = ", "; + String idValueSeparator = ": "; + String keyValueSeparator = " => "; + String singleGroupSeparator = ""; + String groupStart = "["; + String groupStop = "]"; + String indent = " "; + String boldStart = ""; + String boldStop = ""; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + PrintProperties pp = new PrintProperties(); + print(sb, "", pp, false); + return sb.toString(); + } + public String toHtml() { + StringBuilder sb = new StringBuilder(); + PrintProperties pp = new PrintProperties(); + pp.lineBreak = "<br>\n"; + pp.indent = " "; + pp.keyValueSeparator = " => "; + pp.insertLineBreaks = true; + pp.boldStart = "<b>"; + pp.boldStop = "</b>"; + print(sb, "", pp, false); + return sb.toString(); + } + + public void print(StringBuilder sb, String indent, PrintProperties pp, boolean splitLines) { + boolean first = true; + for (Entry e : diff) { + if (first) { + first = false; + } else { + sb.append(pp.entrySeparator); + if (splitLines && pp.insertLineBreaks) { + sb.append(pp.lineBreak).append(indent); + } + } + sb.append(e.id); + if (e.subDiff != null) { + sb.append(pp.idValueSeparator); + if (e.subDiff.diff.size() > 1 || !pp.ommitGroupForSingleEntries) { + sb.append(pp.groupStart); + } else { + sb.append(pp.singleGroupSeparator); + } + if (e.splitLine && pp.insertLineBreaks) { + sb.append(pp.lineBreak).append(indent + pp.indent); + } + e.subDiff.print(sb, indent + pp.indent, pp, e.splitLine); + if (e.splitLine && pp.insertLineBreaks) { + sb.append(pp.lineBreak).append(indent); + } + if (e.subDiff.diff.size() > 1 || !pp.ommitGroupForSingleEntries) { + sb.append(pp.groupStop); + } + } else { + if (!e.id.isEmpty()) { + sb.append(pp.idValueSeparator); + } + if (e.bold) { + sb.append(pp.boldStart).append(e.preContent).append(pp.boldStop) + .append(pp.keyValueSeparator) + .append(pp.boldStart).append(e.postContent).append(pp.boldStop); + } else { + sb.append(e.preContent) + .append(pp.keyValueSeparator) + .append(e.postContent); + } + } + } + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/DiskState.java b/vdslib/src/main/java/com/yahoo/vdslib/state/DiskState.java new file mode 100644 index 00000000000..fb91af14f90 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/DiskState.java @@ -0,0 +1,128 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.state; + +import com.yahoo.text.StringUtilities; + +import java.util.StringTokenizer; +import java.text.ParseException; + +/** + * + */ +public class DiskState implements Cloneable { + private State state = State.UP; + private String description = ""; + private double capacity = 1.0; + + public DiskState() {} + public DiskState(State s) { + setState(s); + } + public DiskState(State s, String description, double capacity) { + setState(s); // Set via set methods, so we can have illegal argument checks only one place + setCapacity(capacity); + setDescription(description); + } + public DiskState clone() { + try{ + return (DiskState) super.clone(); + } catch (CloneNotSupportedException e) { + assert(false); // Should not happen + return null; + } + } + + public DiskState(String serialized) throws ParseException { + StringTokenizer st = new StringTokenizer(serialized, " \t\f\r\n"); + while (st.hasMoreTokens()) { + String token = st.nextToken(); + int index = token.indexOf(':'); + if (index < 0) { + throw new ParseException("Token " + token + " does not contain ':': " + serialized, 0); + } + String key = token.substring(0, index); + String value = token.substring(index + 1); + if (key.length() > 0) switch (key.charAt(0)) { + case 's': + if (key.length() > 1) break; + setState(State.get(value)); + continue; + case 'c': + if (key.length() > 1) break; + try{ + setCapacity(Double.valueOf(value)); + } catch (Exception e) { + throw new ParseException("Illegal disk capacity '" + value + "'. Capacity must be a positive floating point number", 0); + } + continue; + case 'm': + if (key.length() > 1) break; + description = StringUtilities.unescape(value); + continue; + default: + break; + } + // Ignore unknown tokens + } + } + + public String serialize(String prefix, boolean includeDescription) { + boolean empty = true; + StringBuilder sb = new StringBuilder(); + if (!state.equals(State.UP) || prefix.length() < 2) { + sb.append(prefix).append("s:").append(state.serialize()); + empty = false; + } + if (Math.abs(capacity - 1.0) > 0.000000001) { + if (empty) { empty = false; } else { sb.append(' '); } + sb.append(prefix).append("c:").append(capacity); + } + if (includeDescription && description.length() > 0) { + if (!empty) { sb.append(' '); } + sb.append(prefix).append("m:").append(StringUtilities.escape(description, ' ')); + } + return sb.toString(); + } + + public State getState() { return state; } + public double getCapacity() { return capacity; } + public String getDescription() { return description; } + + public void setState(State s) { + if (!s.validDiskState()) { + throw new IllegalArgumentException("State " + s + " is not a valid disk state."); + } + state = s; + } + public void setCapacity(double capacity) { + if (capacity < 0) { + throw new IllegalArgumentException("Negative capacity makes no sense."); + } + this.capacity = capacity; + } + public void setDescription(String desc) { description = desc; } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("DiskState(").append(state.serialize()); + if (Math.abs(capacity - 1.0) > 0.00000001) { + sb.append(", capacity ").append(capacity); + } + if (description.length() > 0) { + sb.append(": ").append(description); + } + sb.append(")"); + return sb.toString(); + } + + public boolean equals(Object o) { + if (!(o instanceof DiskState)) { return false; } + DiskState other = (DiskState) o; + if (state.equals(other.state) + && Math.abs(capacity - other.capacity) < 0.00000001) + { + return true; + } + return false; + } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/Node.java b/vdslib/src/main/java/com/yahoo/vdslib/state/Node.java new file mode 100644 index 00000000000..832120d6b5b --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/Node.java @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.state; + +/** + * A node in a content cluster. This is immutable. + */ +public class Node implements Comparable<Node> { + + private final NodeType type; + private final int index; + + public Node(NodeType type, int index) { + this.type = type; + this.index = index; + } + + public Node(String serialized) { + int dot = serialized.lastIndexOf('.'); + if (dot < 0) throw new IllegalArgumentException("Not a legal node string '" + serialized + "'."); + type = NodeType.get(serialized.substring(0, dot)); + index = Integer.valueOf(serialized.substring(dot + 1)); + } + + public String toString() { + return type.toString() + "." + index; + } + + public NodeType getType() { return type; } + public int getIndex() { return index; } + + private int getOrdering() { + return (type.equals(NodeType.STORAGE) ? 65536 : 0) + index; + } + + @Override + public int compareTo(Node n) { + return getOrdering() - n.getOrdering(); + } + + @Override + public int hashCode() { + return type.hashCode() ^ index; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Node)) return false; + Node n = (Node) o; + return (type.equals(n.type) && index == n.index); + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/NodeState.java b/vdslib/src/main/java/com/yahoo/vdslib/state/NodeState.java new file mode 100644 index 00000000000..8c31938dfaf --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/NodeState.java @@ -0,0 +1,499 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.state; + +import com.yahoo.text.StringUtilities; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.StringTokenizer; + +/** + * The state of a single node in the cluster state + * TODO: The config aspects of this should move to ConfiguredNode + * TODO: The type should be removed, as it is part of the owner. + * TODO: Monitoring aspects should move to NodeInfo + */ +public class NodeState implements Cloneable { + + private final NodeType type; + private State state = State.UP; + private String description = ""; + private double capacity = 1.0; + private int reliability = 1; + private double initProgress = 1.0; + private int minUsedBits = 16; + private List<DiskState> diskStates = new ArrayList<>(); + /** When generating ideal states, we want to cheaply check if any disks are down in the nodestate. */ + private boolean anyDiskDown = false; + private long startTimestamp = 0; + + public static double getListingBucketsInitProgressLimit() { return 0.01; } + + public NodeState(NodeType type, State state) { + this.type = type; + this.state = state; + updateAnyDiskDownFlag(); + } + + private void updateAnyDiskDownFlag() { + boolean anyDown = false; + for (DiskState ds : diskStates) { + if (!ds.getState().equals(State.UP)) { + anyDown = true; + break; + } + } + anyDiskDown = anyDown; + } + + public NodeState clone() { + try{ + NodeState ns = (NodeState) super.clone(); + ns.diskStates = new ArrayList<>(); + for (DiskState s : diskStates) { + ns.diskStates.add(s.clone()); + } + return ns; + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Does not happen"); + } + } + + /** + * A state can not be forced to be in a state above it's reported state. + * For instance, a down being down, cannot be forced up, but a node being down can be forced in maintenance. + */ + public boolean above(NodeState other) { + return (state.ordinal() > other.state.ordinal()); + } + + public boolean equals(Object o) { + if (!(o instanceof NodeState)) { return false; } + NodeState ns = (NodeState) o; + if (state != ns.state + || Math.abs(capacity - ns.capacity) > 0.0000000001 + || Math.abs(reliability - ns.reliability) > 0.0000000001 + || Math.abs(initProgress - ns.initProgress) > 0.0000000001 + || startTimestamp != ns.startTimestamp + || minUsedBits != ns.minUsedBits) + { + return false; + } + if (diskStates.size() == 0 && ns.diskStates.size() == 0) { + // Everything is fine + } else if (diskStates.size() == 0 || ns.diskStates.size() == 0) { + NodeState nonEmptyState = (diskStates.size() == 0 ? ns : this); + for (int i=0; i<nonEmptyState.diskStates.size(); ++i) { + if (!nonEmptyState.diskStates.get(i).equals(new DiskState(State.UP))) { + return false; + } + } + } else if (diskStates.size() != ns.diskStates.size()) { + return false; + } else { + for (int i=0; i<diskStates.size(); ++i) { + if (!diskStates.get(i).equals(ns.diskStates.get(i))) { + return false; + } + } + } + return true; + } + public int hashCode() { + return state.hashCode() ^ diskStates.hashCode() ^ new Double(capacity).hashCode() ^ new Double(reliability).hashCode(); + } + + /** + * States are similar if the cluster state doesn't need to be updated due to a change. + * Note that min dist bits may need to alter cluster state, but as we don't know at this point, we ignore it. + * Cluster state will check for that. + */ + public boolean similarTo(Object o) { + if (!(o instanceof NodeState)) { return false; } + NodeState other = (NodeState) o; + + if (state != other.state) return false; + if (Math.abs(capacity - other.capacity) > 0.0000000001) return false; + if (Math.abs(reliability - other.reliability) > 0.0000000001) return false; + if (startTimestamp != other.startTimestamp) return false; + + // Init progress on different sides of the init progress limit boundary is not similar. + if (type.equals(NodeType.STORAGE) + && initProgress < getListingBucketsInitProgressLimit() ^ other.initProgress < getListingBucketsInitProgressLimit()) + { + return false; + } + + if (diskStates.size() == 0 && other.diskStates.size() == 0) { + // Everything is fine + } else if (diskStates.size() == 0 || other.diskStates.size() == 0) { + NodeState nonEmptyState = (diskStates.size() == 0 ? other : this); + for (int i=0; i<nonEmptyState.diskStates.size(); ++i) { + if (!nonEmptyState.diskStates.get(i).equals(new DiskState(State.UP))) { + return false; + } + } + } else if (diskStates.size() != other.diskStates.size()) { + return false; + } else { + for (int i=0; i<diskStates.size(); ++i) { + if (!diskStates.get(i).equals(other.diskStates.get(i))) { + return false; + } + } + } + return true; + } + + public Diff getDiff(NodeState other) { + Diff diff = new Diff(); + if (!state.equals(other.state)) { + diff.add(new Diff.Entry("", state, other.state).bold()); + } + if (Math.abs(capacity - other.capacity) > 0.000000001) { + diff.add(new Diff.Entry("capacity", capacity, other.capacity)); + } + if (Math.abs(reliability - other.reliability) > 0.000000001) { + diff.add(new Diff.Entry("reliability", reliability, other.reliability)); + } + if (minUsedBits != other.minUsedBits) { + diff.add(new Diff.Entry("minUsedBits", minUsedBits, other.minUsedBits)); + } + if (Math.abs(initProgress - other.initProgress) > 0.000000001 && state.equals(State.INITIALIZING) && other.state.equals(State.INITIALIZING)) { + diff.add(new Diff.Entry("initProgress", initProgress, other.initProgress)); + } + if (startTimestamp != other.startTimestamp) { + diff.add(new Diff.Entry("startTimestamp", startTimestamp, other.startTimestamp)); + } + if (diskStates.size() != other.diskStates.size()) { + diff.add(new Diff.Entry("disks", diskStates.size(), other.diskStates.size())); + } else { + Diff diskDiff = new Diff(); + for (int i=0; i<diskStates.size(); ++i) { + if (!diskStates.get(i).equals(other.diskStates.get(i))) { + diskDiff.add(new Diff.Entry(i, diskStates.get(i), other.diskStates.get(i))); + } + } + if (diskDiff.differs()) { + diff.add(new Diff.Entry("disks", diskDiff)); + } + } + if (!description.equals(other.description)) { + diff.add(new Diff.Entry("description", description, other.description)); + } + return diff; + } + + public String getTextualDifference(NodeState other) { + return getDiff(other).toString(); + } + + /** Capacity is set by deserializing a node state. This seems odd, as it is config */ + public NodeState setCapacity(double c) { this.capacity = c; return this; } + + public NodeState setReliability(int r) { this.reliability = r; return this; } + public NodeState setInitProgress(double p) { this.initProgress = p; return this; } + public NodeState setDescription(String desc) { this.description = desc; return this; } + public NodeState setMinUsedBits(int u) { this.minUsedBits = u; return this; } + public NodeState setState(State state) { this.state = state; return this; } + public NodeState setStartTimestamp(long ts) { this.startTimestamp = ts; return this; } + + public double getCapacity() { return this.capacity; } + public int getReliability() { return this.reliability; } + public double getInitProgress() { return this.initProgress; } + public boolean hasDescription() { return (description.length() > 0); } + public String getDescription() { return description; } + public State getState() { return this.state; } + public int getMinUsedBits() { return minUsedBits; } + public long getStartTimestamp() { return startTimestamp; } + + public boolean isAnyDiskDown() { return anyDiskDown; } + public int getDiskCount() { return diskStates.size(); } + public List<DiskState> getDiskStates() { return Collections.unmodifiableList(diskStates); } + + public String toString() { return toString(false); } + + public String toString(boolean compact) { + StringBuilder sb = new StringBuilder(); + if (compact) { + sb.append(state.serialize().toUpperCase()); + } else { + sb.append(state); + } + if (Math.abs(capacity - 1.0) > 0.000000001) { + sb.append(compact ? ", c " : ", capacity ").append(compact ? String.format(Locale.ENGLISH, "%.3g", capacity) : capacity); + } + if (Math.abs(reliability - 1.0) > 0.000000001) { + sb.append(compact ? ", r " : ", reliability ").append(reliability); + } + if (state.equals(State.INITIALIZING)) { + sb.append(compact ? ", i " : ", init progress ").append(compact ? String.format(Locale.ENGLISH, "%.3g", initProgress) : initProgress); + if (type.equals(NodeType.STORAGE)) { + if (initProgress < getListingBucketsInitProgressLimit()) { + sb.append(compact ? " (ls)" : " (listing files)"); + } else { + sb.append(compact ? " (read)" : " (reading file headers)"); + } + } + } + if (startTimestamp > 0) { + sb.append(compact ? ", t " : ", start timestamp ").append(startTimestamp); + } + if (minUsedBits != 16) { + sb.append(compact ? ", b " : ", minimum used bits ").append(minUsedBits); + } + + if (diskStates.size() > 0) { + if (compact) { + boolean anyNonDefault = false; + for (DiskState diskState : diskStates) { + anyNonDefault |= (!diskState.equals(new DiskState(State.UP))); + } + if (anyNonDefault) { + sb.append(","); + DiskState defaultDiskState = new DiskState(State.UP); + for (int i=0; i<diskStates.size(); ++i) { + if (!diskStates.get(i).equals(defaultDiskState)) { + sb.append(" d").append(i).append("(").append(diskStates.get(i).serialize("", false)).append(")"); + } + } + } + } else { + sb.append(", disk states:"); + for (int i=0; i<diskStates.size(); ++i) { + sb.append(" disk ").append(i).append(": ").append(diskStates.get(i).toString()); + } + } + } + if (description.length() > 0) { + sb.append(": ").append(description); + } + return sb.toString(); + } + + public NodeState setDiskCount(int count) { + if (count < 0) { + throw new IllegalArgumentException("Count must be positive. Was "+count+"."); + } + diskStates.clear(); + for(int i=0;i<count;i++) { + diskStates.add(new DiskState(State.UP, "", 1.0)); + } + return this; + } + + public NodeState setDiskState(int disk, DiskState state) throws IndexOutOfBoundsException { + diskStates.set(disk, state); + updateAnyDiskDownFlag(); + return this; + } + + public DiskState getDiskState(int disk) throws IndexOutOfBoundsException { + if (diskStates.isEmpty()) { // Zero disks, means unknown amount of disks, but all are up, + return new DiskState(); // in which case we don't need to know amount of disks. + } + return diskStates.get(disk); + } + + public String serialize() { return serialize(-1, false); } + public String serialize(boolean verbose) { return serialize(-1, verbose); } + public String serialize(int nodeIdx, boolean verbose) { + boolean empty = true; + StringBuilder sb = new StringBuilder(); + String prefix = (nodeIdx == -1 ? "" : "." + nodeIdx + "."); + if (state != State.UP){ + empty = false; + sb.append(prefix).append("s:").append(state.serialize()); + } + if (Math.abs(capacity - 1.0) > 0.000000001) { + if (empty) { empty = false; } else { sb.append(' '); } + sb.append(prefix).append("c:").append(capacity); + } + if (Math.abs(reliability - 1.0) > 0.000000001) { + if (empty) { empty = false; } else { sb.append(' '); } + sb.append(prefix).append("r:").append(reliability); + } + if (state == State.INITIALIZING) { + sb.append(' '); + sb.append(prefix).append("i:").append(initProgress); + } + if (startTimestamp != 0) { + if (empty) { empty = false; } else { sb.append(' '); } + sb.append(prefix).append("t:").append(startTimestamp); + } + if (nodeIdx == -1 && minUsedBits != 16) { + if (empty) { empty = false; } else { sb.append(' '); } + sb.append(prefix).append("b:").append(minUsedBits); + } + + if (diskStates.size() > 0) { + StringBuilder diskInfo = new StringBuilder(); + for(int i = 0; i < diskStates.size(); ++i) { + String diskPrefix = prefix + "d." + i + "."; + String disk = diskStates.get(i).serialize(diskPrefix, verbose); + if (disk.length() > 0) { + diskInfo.append(' ').append(disk); + } + } + String diskInfoStr = diskInfo.toString(); + if (verbose || diskInfoStr.length() > 0) { + if (empty) { empty = false; } else { sb.append(' '); } + sb.append(prefix).append("d:").append(diskStates.size()); + sb.append(diskInfoStr); + } else if (nodeIdx == -1) { + if (empty) { empty = false; } else { sb.append(' '); } + sb.append(prefix).append("d:").append(diskStates.size()); + } + } + if ((verbose || nodeIdx == -1) && description.length() > 0) { + if (!empty) { sb.append(' '); } + sb.append(prefix).append("m:").append(StringUtilities.escape(description, ' ')); + } + return sb.toString(); + } + + private static class DiskData { + + boolean empty = true; + int diskIndex = 0; + StringBuilder sb = new StringBuilder(); + + public void addDisk(NodeState ns) throws ParseException { + if (!empty) { + while (diskIndex >= ns.diskStates.size()) { + ns.diskStates.add(new DiskState()); + } + ns.diskStates.set(diskIndex, new DiskState(sb.toString())); + empty = true; + sb = new StringBuilder(); + } + } + } + + /** Creates an instance from the serialized form produced by serialize */ + public static NodeState deserialize(NodeType type, String serialized) throws ParseException { + NodeState newState = new NodeState(type, State.UP); + StringTokenizer st = new StringTokenizer(serialized, " \t\r\f\n", false); + DiskData diskData = new DiskData(); + while (st.hasMoreTokens()) { + String token = st.nextToken(); + int index = token.indexOf(':'); + if (index < 0) { + throw new ParseException("Token " + token + " does not contain ':': " + serialized, 0); + } + String key = token.substring(0, index); + String value = token.substring(index + 1); + if (key.length() > 0) switch (key.charAt(0)) { + case 's': + if (key.length() > 1) break; + newState.setState(State.get(value)); + continue; + case 'b': + if (key.length() > 1) break; + newState.setMinUsedBits(Integer.parseInt(value)); + continue; + case 'c': + if (key.length() > 1) break; + if (type != null && !type.equals(NodeType.STORAGE)) break; + try{ + newState.setCapacity(Double.valueOf(value)); + } catch (Exception e) { + throw new ParseException("Illegal capacity '" + value + "'. Capacity must be a positive floating point number", 0); + } + continue; + case 'r': + if (key.length() > 1) break; + if (type != null && !type.equals(NodeType.STORAGE)) break; + try{ + newState.setReliability(Integer.valueOf(value)); + } catch (Exception e) { + throw new ParseException("Illegal reliability '" + value + "'. Reliability must be a positive integer number", 0); + } + continue; + case 'i': + if (key.length() > 1) break; + try{ + newState.setInitProgress(Double.valueOf(value)); + } catch (Exception e) { + throw new ParseException("Illegal init progress '" + value + "'. Init progress must be a floating point number from 0.0 to 1.0", 0); + } + continue; + case 't': + if (key.length() > 1) break; + try{ + newState.setStartTimestamp(Long.valueOf(value)); + if (newState.getStartTimestamp() < 0) throw new Exception(); + } catch (Exception e) { + throw new ParseException("Illegal start timestamp " + value + ". Start timestamp must be 0 or a positive long.", 0); + } + continue; + case 'm': + if (key.length() > 1) break; + newState.setDescription(StringUtilities.unescape(value)); + continue; + case 'd': + if (type != null && !type.equals(NodeType.STORAGE)) break; + if (key.length() == 1) { + int size; + try{ + size = Integer.valueOf(value); + } catch (Exception e) { + throw new ParseException("Invalid disk count '" + value + "'. Need a positive integer value", 0); + } + while (newState.diskStates.size() < size) { + newState.diskStates.add(new DiskState()); + } + continue; + } + if (key.charAt(1) != '.') break; + int diskIndex; + int endp = key.indexOf('.', 2); + String indexStr = (endp < 0 ? key.substring(2) : key.substring(2, endp)); + try{ + diskIndex = Integer.valueOf(indexStr); + } catch (Exception e) { + throw new ParseException("Invalid disk index '" + indexStr + "'. need a positive integer value", 0); + } + if (diskIndex >= newState.diskStates.size()) { + throw new ParseException("Cannot index disk " + diskIndex + " of " + newState.diskStates.size(), 0); + } + if (diskData.diskIndex != diskIndex) { + diskData.addDisk(newState); + } + if (endp < 0) { + diskData.sb.append(" s:").append(value); + } else { + diskData.sb.append(" ").append(key.substring(endp + 1)).append(':').append(value); + } + diskData.diskIndex = diskIndex; + diskData.empty = false; + continue; + default: + break; + } + // Ignore unknown tokens + } + diskData.addDisk(newState); + newState.updateAnyDiskDownFlag(); + return newState; + } + + public void verifyValidInSystemState(NodeType type) { + if (!state.validCurrentNodeState(type)) { + throw new IllegalArgumentException("State " + state + " cannot fit in system state for node of type: " + type); + } + if (type.equals(NodeType.DISTRIBUTOR) && Math.abs(capacity - 1.0) > 0.000000001) { + throw new IllegalArgumentException("Capacity should not be set for a distributor node"); + } + if (type.equals(NodeType.DISTRIBUTOR) && Math.abs(reliability - 1.0) > 0.000000001) { + throw new IllegalArgumentException("Reliability should not be set for a distributor node"); + } + if (type.equals(NodeType.DISTRIBUTOR) && !diskStates.isEmpty()) { + throw new IllegalArgumentException("Disk states should not be set for a distributor node"); + } + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/NodeType.java b/vdslib/src/main/java/com/yahoo/vdslib/state/NodeType.java new file mode 100644 index 00000000000..2c1db1c5cc2 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/NodeType.java @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.state; + +public enum NodeType { + STORAGE("storage"), + DISTRIBUTOR("distributor"); + + private final String serializeAs; + + private NodeType(String serializeAs) { + this.serializeAs = serializeAs; + } + + public String toString() { + return serializeAs; + } + + public static NodeType get(String serialized) { + for(NodeType type : values()) { + if (type.serializeAs.equals(serialized)) return type; + } + throw new IllegalArgumentException("Unknown node type '" + serialized + "'. Legal values are 'storage' and 'distributor'."); + } + + public static NodeType[] getTypes() { + NodeType types[] = new NodeType[2]; + types[0] = STORAGE; + types[1] = DISTRIBUTOR; + return types; + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/State.java b/vdslib/src/main/java/com/yahoo/vdslib/state/State.java new file mode 100644 index 00000000000..99323dbc4de --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/State.java @@ -0,0 +1,82 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vdslib.state; + +import java.util.ArrayList; + +/** + * + * Defines legal states for various uses. Split this into its own class such + * that we can easily see what states are legal to use in what situations. + * They double as disk states and node states nodes report they are in, and + * wanted states set external sources. + */ +public enum State { + + // The order declares the ordinals, and defines what states are above/below others + UNKNOWN ("-", false, true, true, false, false, false, false, false), // This state is used by the fleetcontroller to indicate + // that we have failed to contact the node. It should never be + // sent out of the fleetcontroller + MAINTENANCE ("m", false, false, false, true, true, false, true, true), + DOWN ("d", true, true, true, true, true, true, true, true), // Down is not valid reported state sent from the node itself. + STOPPING ("s", false, true, true, false, false, true, true, true), + INITIALIZING("i", false, true, true, false, false, true, true, true), + RETIRED ("r", false, false, false, false, true, false, true, true), + UP ("u", true, true, true, true, true, true, true, true); + + private final boolean validDiskState; + private final boolean validClusterState; + private final ArrayList<Boolean> validReportedNodeState = new ArrayList<>(); + private final ArrayList<Boolean> validWantedNodeState = new ArrayList<>(); + private final ArrayList<Boolean> validCurrentNodeState = new ArrayList<>(); + private final String serializedAs; + + private State(String serialized, boolean validDisk, boolean validDistReported, boolean validStorReported, + boolean validDistWanted, boolean validStorWanted, boolean validCluster, boolean validDistCurrent, + boolean validStorCurrent) + { + validDiskState = validDisk; + validClusterState = validCluster; + assert(NodeType.STORAGE.ordinal() == 0); + assert(NodeType.DISTRIBUTOR.ordinal() == 1); + validReportedNodeState.add(validStorReported); + validReportedNodeState.add(validDistReported); + validWantedNodeState.add(validStorWanted); + validWantedNodeState.add(validDistWanted); + validCurrentNodeState.add(validStorCurrent); + validCurrentNodeState.add(validDistCurrent); + this.serializedAs = serialized; + } + + public static State get(String serialized) { + for (State s : values()) { + if (s.serializedAs.equals(serialized)) { return s; } + } + throw new IllegalArgumentException("Invalid state '" + serialized + "'."); + } + + public String serialize() { return serializedAs; } + + public boolean validDiskState() { return validDiskState; } + public boolean validClusterState() { return validClusterState; } + public boolean validReportedNodeState(NodeType type) { return validReportedNodeState.get(type.ordinal()); } + public boolean validWantedNodeState(NodeType type) { return validWantedNodeState.get(type.ordinal()); } + public boolean validCurrentNodeState(NodeType type) { return validCurrentNodeState.get(type.ordinal()); } + + public boolean maySetWantedStateForThisNodeState(State s) { return (s.ordinal() <= ordinal()); } + + public boolean oneOf(String states) { + for (char c : states.toCharArray()) { + String s = "" + c; + if (s.equals(serializedAs)) return true; + } + return false; + } + + @Override + public String toString() { + String id = name(); + String lower = id.substring(1).toLowerCase(); + return id.charAt(0) + lower; + } + +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/package-info.java b/vdslib/src/main/java/com/yahoo/vdslib/state/package-info.java new file mode 100644 index 00000000000..78662d368f2 --- /dev/null +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.vdslib.state; + +import com.yahoo.osgi.annotation.ExportPackage; |