Add AddBatch CaseD

CASE D: Already populated Tree
==============================
- Use A, B, C, D as subtree
- Sort the Keys in Buckets that share the initial part of the path
- For each subtree add there the new leafs

              R
             /  \
            /    \
           /      \
          *        *
         / |      / \
        /  |     /   \
       /   |    /     \
L:    A    B   C       D
     /\   /\  / \     / \
    ...  ... ... ... ... ...
This commit is contained in:
arnaucube
2021-04-24 23:35:06 +02:00
parent b3007a057e
commit 890057cd82
4 changed files with 157 additions and 7 deletions

View File

@@ -168,7 +168,10 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
return nil, err
}
// TODO if nCPU is not a power of two, cut at the highest power of two
// under nCPU
nCPU := runtime.NumCPU()
l := int(math.Log2(float64(nCPU)))
// CASE A: if nLeafs==0 (root==0)
if bytes.Equal(t.root, t.emptyHash) {
@@ -212,7 +215,6 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
// CASE C: if nLeafs>=minLeafsThreshold && (nLeafs/nBuckets) < minLeafsThreshold
// available parallelization, will need to be a power of 2 (2**n)
var excedents []kv
l := int(math.Log2(float64(nCPU)))
if nLeafs >= minLeafsThreshold && (nLeafs/nCPU) < minLeafsThreshold {
// TODO move to own function
// 1. go down until level L (L=log2(nBuckets))
@@ -257,6 +259,11 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
return invalids, nil
}
// CASE D
if true { // TODO enter in CASE D if len(keysAtL)=nCPU, if not, CASE E
return t.caseD(nCPU, l, kvs)
}
// TODO store t.root into DB
// TODO update NLeafs from DB
@@ -289,13 +296,47 @@ func (t *Tree) caseB(l int, kvs []kv) ([]int, []kv, error) {
return invalids, kvsNonP2, nil
}
func (t *Tree) caseD(nCPU, l int, kvs []kv) ([]int, error) {
fmt.Println("CASE D", nCPU)
keysAtL, err := t.getKeysAtLevel(l + 1)
if err != nil {
return nil, err
}
buckets := splitInBuckets(kvs, nCPU)
var subRoots [][]byte
var invalids []int
for i := 0; i < len(keysAtL); i++ {
bucketTree := Tree{tx: t.tx, db: t.db, maxLevels: t.maxLevels, // maxLevels-l
hashFunction: t.hashFunction, root: keysAtL[i]}
for j := 0; j < len(buckets[i]); j++ {
if err = bucketTree.add(l, buckets[i][j].k, buckets[i][j].v); err != nil {
fmt.Println("failed", buckets[i][j].k[:4])
panic(err)
// invalids = append(invalids, buckets[i][j].pos)
}
}
subRoots = append(subRoots, bucketTree.root)
}
newRoot, err := t.upFromKeys(subRoots)
if err != nil {
return nil, err
}
t.root = newRoot
return invalids, nil
}
func splitInBuckets(kvs []kv, nBuckets int) [][]kv {
buckets := make([][]kv, nBuckets)
// 1. classify the keyvalues into buckets
for i := 0; i < len(kvs); i++ {
pair := kvs[i]
bucketnum := keyToBucket(pair.k, nBuckets)
// bucketnum := keyToBucket(pair.k, nBuckets)
bucketnum := keyToBucket(pair.keyPath, nBuckets)
buckets[bucketnum] = append(buckets[bucketnum], pair)
}
return buckets