mirror of
https://github.com/arnaucube/arbo.git
synced 2026-01-08 15:01:29 +01:00
AddBatchOpt return invalid key positions
This commit is contained in:
47
addbatch.go
47
addbatch.go
@@ -172,10 +172,21 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
|||||||
|
|
||||||
// CASE A: if nLeafs==0 (root==0)
|
// CASE A: if nLeafs==0 (root==0)
|
||||||
if bytes.Equal(t.root, t.emptyHash) {
|
if bytes.Equal(t.root, t.emptyHash) {
|
||||||
// TODO if len(kvs) is not a power of 2, cut at the bigger power
|
// if len(kvs) is not a power of 2, cut at the bigger power
|
||||||
// of two under len(kvs), build the tree with that, and add
|
// of two under len(kvs), build the tree with that, and add
|
||||||
// later the excedents
|
// later the excedents
|
||||||
return t.buildTreeBottomUp(nCPU, kvs)
|
kvsP2, kvsNonP2 := cutPowerOfTwo(kvs)
|
||||||
|
invalids, err := t.buildTreeBottomUp(nCPU, kvsP2)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for i := 0; i < len(kvsNonP2); i++ {
|
||||||
|
err = t.add(0, kvsNonP2[i].k, kvsNonP2[i].v)
|
||||||
|
if err != nil {
|
||||||
|
invalids = append(invalids, kvsNonP2[i].pos)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return invalids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CASE B: if nLeafs<nBuckets
|
// CASE B: if nLeafs<nBuckets
|
||||||
@@ -381,6 +392,7 @@ func (t *Tree) kvsToKeysValues(kvs []kv) ([][]byte, [][]byte) {
|
|||||||
func (t *Tree) buildTreeBottomUp(nCPU int, kvs []kv) ([]int, error) {
|
func (t *Tree) buildTreeBottomUp(nCPU int, kvs []kv) ([]int, error) {
|
||||||
buckets := splitInBuckets(kvs, nCPU)
|
buckets := splitInBuckets(kvs, nCPU)
|
||||||
subRoots := make([][]byte, nCPU)
|
subRoots := make([][]byte, nCPU)
|
||||||
|
invalidsInBucket := make([][]int, nCPU)
|
||||||
txs := make([]db.Tx, nCPU)
|
txs := make([]db.Tx, nCPU)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@@ -397,53 +409,60 @@ func (t *Tree) buildTreeBottomUp(nCPU int, kvs []kv) ([]int, error) {
|
|||||||
bucketTree := Tree{tx: txs[cpu], db: t.db, maxLevels: t.maxLevels,
|
bucketTree := Tree{tx: txs[cpu], db: t.db, maxLevels: t.maxLevels,
|
||||||
hashFunction: t.hashFunction, root: t.emptyHash}
|
hashFunction: t.hashFunction, root: t.emptyHash}
|
||||||
|
|
||||||
// TODO use invalids array
|
currInvalids, err := bucketTree.buildTreeBottomUpSingleThread(buckets[cpu])
|
||||||
_, err = bucketTree.buildTreeBottomUpSingleThread(buckets[cpu])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err) // TODO
|
panic(err) // TODO
|
||||||
}
|
}
|
||||||
|
invalidsInBucket[cpu] = currInvalids
|
||||||
subRoots[cpu] = bucketTree.root
|
subRoots[cpu] = bucketTree.root
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
newRoot, err := t.upFromKeys(subRoots)
|
newRoot, err := t.upFromKeys(subRoots)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
t.root = newRoot
|
t.root = newRoot
|
||||||
return nil, err
|
|
||||||
|
var invalids []int
|
||||||
|
for i := 0; i < len(invalidsInBucket); i++ {
|
||||||
|
invalids = append(invalids, invalidsInBucket[i]...)
|
||||||
|
}
|
||||||
|
return invalids, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// keys & values must be sorted by path, and the array ks must be length
|
// buildTreeBottomUpSingleThread builds the tree with the given []kv from bottom
|
||||||
// multiple of 2
|
// to the root. keys & values must be sorted by path, and the array ks must be
|
||||||
// TODO return index of failed keyvaules
|
// length multiple of 2
|
||||||
func (t *Tree) buildTreeBottomUpSingleThread(kvs []kv) ([]int, error) {
|
func (t *Tree) buildTreeBottomUpSingleThread(kvs []kv) ([]int, error) {
|
||||||
// TODO check that log2(len(leafs)) < t.maxLevels, if not, maxLevels
|
// TODO check that log2(len(leafs)) < t.maxLevels, if not, maxLevels
|
||||||
// would be reached and should return error
|
// would be reached and should return error
|
||||||
|
|
||||||
|
var invalids []int
|
||||||
// build the leafs
|
// build the leafs
|
||||||
leafKeys := make([][]byte, len(kvs))
|
leafKeys := make([][]byte, len(kvs))
|
||||||
for i := 0; i < len(kvs); i++ {
|
for i := 0; i < len(kvs); i++ {
|
||||||
// TODO handle the case where Key&Value == 0
|
// TODO handle the case where Key&Value == 0
|
||||||
leafKey, leafValue, err := newLeafValue(t.hashFunction, kvs[i].k, kvs[i].v)
|
leafKey, leafValue, err := newLeafValue(t.hashFunction, kvs[i].k, kvs[i].v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
// return nil, err
|
||||||
|
invalids = append(invalids, kvs[i].pos)
|
||||||
}
|
}
|
||||||
// store leafKey & leafValue to db
|
// store leafKey & leafValue to db
|
||||||
if err := t.tx.Put(leafKey, leafValue); err != nil {
|
if err := t.tx.Put(leafKey, leafValue); err != nil {
|
||||||
return nil, err
|
// return nil, err
|
||||||
|
invalids = append(invalids, kvs[i].pos)
|
||||||
}
|
}
|
||||||
leafKeys[i] = leafKey
|
leafKeys[i] = leafKey
|
||||||
}
|
}
|
||||||
// TODO parallelize t.upFromKeys until level log2(nBuckets) is reached
|
|
||||||
r, err := t.upFromKeys(leafKeys)
|
r, err := t.upFromKeys(leafKeys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return invalids, err
|
||||||
}
|
}
|
||||||
t.root = r
|
t.root = r
|
||||||
return nil, nil
|
return invalids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// keys & values must be sorted by path, and the array ks must be length
|
// keys & values must be sorted by path, and the array ks must be length
|
||||||
|
|||||||
Reference in New Issue
Block a user