mirror of
https://github.com/arnaucube/arbo.git
synced 2026-01-23 20:23:44 +01:00
Add AddBatch CaseC
CASE C: ALMOST CASE B --> if Tree has few Leafs (but numLeafs>=minLeafsThreshold)
==============================================================================
- Use A, B, G, F as Roots of subtrees
- Do CASE B for each subtree
- Then go from L to the Root
R
/ \
/ \
/ \
* *
/ | / \
/ | / \
/ | / \
L: A B G D
/ \
/ \
/ \
C *
/ \
/ \
/ \
... ... (nLeafs >= minLeafsThreshold)
This commit is contained in:
227
addbatch.go
227
addbatch.go
@@ -3,6 +3,7 @@ package arbo
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
)
|
||||
|
||||
@@ -25,11 +26,24 @@ the leafs)
|
||||
- Do CASE A for the new Tree, giving the already existing key&values (leafs)
|
||||
from the original Tree + the new key&values to be added from the AddBatch call
|
||||
|
||||
R
|
||||
/ \
|
||||
A *
|
||||
/ \
|
||||
B C
|
||||
|
||||
R R
|
||||
/ \ / \
|
||||
A * / \
|
||||
/ \ / \
|
||||
B C * *
|
||||
/ | / \
|
||||
/ | / \
|
||||
/ | / \
|
||||
L: A B G D
|
||||
/ \
|
||||
/ \
|
||||
/ \
|
||||
C *
|
||||
/ \
|
||||
/ \
|
||||
/ \
|
||||
... ... (nLeafs < minLeafsThreshold)
|
||||
|
||||
|
||||
CASE C: ALMOST CASE B --> if Tree has few Leafs (but numLeafs>=minLeafsThreshold)
|
||||
@@ -54,7 +68,7 @@ L: A B G D
|
||||
/ \
|
||||
/ \
|
||||
/ \
|
||||
D E
|
||||
... ... (nLeafs >= minLeafsThreshold)
|
||||
|
||||
|
||||
|
||||
@@ -123,6 +137,11 @@ Algorithm decision
|
||||
|
||||
*/
|
||||
|
||||
const (
|
||||
minLeafsThreshold = uint64(100) // nolint:gomnd // TMP WIP this will be autocalculated
|
||||
nBuckets = uint64(4) // TMP WIP this will be autocalculated from
|
||||
)
|
||||
|
||||
// AddBatchOpt is the WIP implementation of the AddBatch method in a more
|
||||
// optimized approach.
|
||||
func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
||||
@@ -141,44 +160,151 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.tx, err = t.db.NewTx()
|
||||
t.tx, err = t.db.NewTx() // TODO add t.tx.Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if nLeafs==0 (root==0): CASE A
|
||||
// CASE A: if nLeafs==0 (root==0)
|
||||
if bytes.Equal(t.root, t.emptyHash) {
|
||||
// sort keys & values by path
|
||||
sortKvs(kvs)
|
||||
return t.buildTreeBottomUp(kvs)
|
||||
}
|
||||
|
||||
// if nLeafs<nBuckets: CASE B
|
||||
// CASE B: if nLeafs<nBuckets
|
||||
nLeafs, err := t.GetNLeafs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
minLeafsThreshold := uint64(100) // nolint:gomnd // TMP WIP
|
||||
if nLeafs < minLeafsThreshold {
|
||||
// get already existing keys
|
||||
aKs, aVs, err := t.getLeafs()
|
||||
if nLeafs < minLeafsThreshold { // CASE B
|
||||
invalids, excedents, err := t.caseB(0, kvs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
aKvs, err := t.keysValuesToKvs(aKs, aVs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// add the excedents
|
||||
for i := 0; i < len(excedents); i++ {
|
||||
err = t.add(0, excedents[i].k, excedents[i].v)
|
||||
if err != nil {
|
||||
invalids = append(invalids, excedents[i].pos)
|
||||
}
|
||||
}
|
||||
// add already existing key-values to the inputted key-values
|
||||
kvs = append(kvs, aKvs...)
|
||||
// proceed with CASE A
|
||||
sortKvs(kvs)
|
||||
return t.buildTreeBottomUp(kvs)
|
||||
return invalids, nil
|
||||
}
|
||||
|
||||
// CASE C: if nLeafs>=minLeafsThreshold && (nLeafs/nBuckets) < minLeafsThreshold
|
||||
// available parallelization, will need to be a power of 2 (2**n)
|
||||
var excedents []kv
|
||||
l := int(math.Log2(float64(nBuckets)))
|
||||
if nLeafs >= minLeafsThreshold && (nLeafs/nBuckets) < minLeafsThreshold {
|
||||
// TODO move to own function
|
||||
// 1. go down until level L (L=log2(nBuckets))
|
||||
keysAtL, err := t.getKeysAtLevel(l + 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buckets := splitInBuckets(kvs, nBuckets)
|
||||
|
||||
// 2. use keys at level L as roots of the subtrees under each one
|
||||
var subRoots [][]byte
|
||||
// TODO parallelize
|
||||
for i := 0; i < len(keysAtL); i++ {
|
||||
bucketTree := Tree{tx: t.tx, db: t.db, maxLevels: t.maxLevels,
|
||||
hashFunction: t.hashFunction, root: keysAtL[i]}
|
||||
|
||||
// 3. and do CASE B for each
|
||||
_, bucketExcedents, err := bucketTree.caseB(l, buckets[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
excedents = append(excedents, bucketExcedents...)
|
||||
subRoots = append(subRoots, bucketTree.root)
|
||||
}
|
||||
// 4. go upFromKeys from the new roots of the subtrees
|
||||
newRoot, err := t.upFromKeys(subRoots)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.root = newRoot
|
||||
|
||||
var invalids []int
|
||||
for i := 0; i < len(excedents); i++ {
|
||||
// Add until the level L
|
||||
err = t.add(0, excedents[i].k, excedents[i].v)
|
||||
if err != nil {
|
||||
invalids = append(invalids, excedents[i].pos) // TODO WIP
|
||||
}
|
||||
}
|
||||
|
||||
return invalids, nil
|
||||
}
|
||||
|
||||
// TODO store t.root into DB
|
||||
// TODO update NLeafs from DB
|
||||
|
||||
return nil, fmt.Errorf("UNIMPLEMENTED")
|
||||
}
|
||||
|
||||
func (t *Tree) caseB(l int, kvs []kv) ([]int, []kv, error) {
|
||||
// get already existing keys
|
||||
aKs, aVs, err := t.getLeafs(t.root)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
aKvs, err := t.keysValuesToKvs(aKs, aVs)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// add already existing key-values to the inputted key-values
|
||||
kvs = append(kvs, aKvs...)
|
||||
|
||||
// proceed with CASE A
|
||||
sortKvs(kvs)
|
||||
|
||||
// cutPowerOfTwo, the excedent add it as normal Tree.Add
|
||||
kvsP2, kvsNonP2 := cutPowerOfTwo(kvs)
|
||||
invalids, err := t.buildTreeBottomUp(kvsP2)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// return the excedents which will be added at the full tree at the end
|
||||
return invalids, kvsNonP2, nil
|
||||
}
|
||||
|
||||
func splitInBuckets(kvs []kv, nBuckets uint64) [][]kv {
|
||||
buckets := make([][]kv, nBuckets)
|
||||
// 1. classify the keyvalues into buckets
|
||||
for i := 0; i < len(kvs); i++ {
|
||||
pair := kvs[i]
|
||||
|
||||
bucketnum := keyToBucket(pair.k, int(nBuckets))
|
||||
buckets[bucketnum] = append(buckets[bucketnum], pair)
|
||||
}
|
||||
return buckets
|
||||
}
|
||||
|
||||
// TODO rename in a more 'real' name (calculate bucket from/for key)
|
||||
func keyToBucket(k []byte, nBuckets int) int {
|
||||
nLevels := int(math.Log2(float64(nBuckets)))
|
||||
b := make([]int, nBuckets)
|
||||
for i := 0; i < nBuckets; i++ {
|
||||
b[i] = i
|
||||
}
|
||||
r := b
|
||||
mid := len(r) / 2 //nolint:gomnd
|
||||
for i := 0; i < nLevels; i++ {
|
||||
if int(k[i/8]&(1<<(i%8))) != 0 {
|
||||
r = r[mid:]
|
||||
mid = len(r) / 2 //nolint:gomnd
|
||||
} else {
|
||||
r = r[:mid]
|
||||
mid = len(r) / 2 //nolint:gomnd
|
||||
}
|
||||
}
|
||||
return r[0]
|
||||
}
|
||||
|
||||
type kv struct {
|
||||
pos int // original position in the array
|
||||
keyPath []byte
|
||||
@@ -241,7 +367,8 @@ func (t *Tree) kvsToKeysValues(kvs []kv) ([][]byte, [][]byte) {
|
||||
}
|
||||
*/
|
||||
|
||||
// keys & values must be sorted by path, and must be length multiple of 2
|
||||
// keys & values must be sorted by path, and the array ks must be length
|
||||
// multiple of 2
|
||||
// TODO return index of failed keyvaules
|
||||
func (t *Tree) buildTreeBottomUp(kvs []kv) ([]int, error) {
|
||||
// build the leafs
|
||||
@@ -258,6 +385,7 @@ func (t *Tree) buildTreeBottomUp(kvs []kv) ([]int, error) {
|
||||
}
|
||||
leafKeys[i] = leafKey
|
||||
}
|
||||
// TODO parallelize t.upFromKeys until level log2(nBuckets) is reached
|
||||
r, err := t.upFromKeys(leafKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -266,6 +394,8 @@ func (t *Tree) buildTreeBottomUp(kvs []kv) ([]int, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// keys & values must be sorted by path, and the array ks must be length
|
||||
// multiple of 2
|
||||
func (t *Tree) upFromKeys(ks [][]byte) ([]byte, error) {
|
||||
if len(ks) == 1 {
|
||||
return ks[0], nil
|
||||
@@ -287,9 +417,9 @@ func (t *Tree) upFromKeys(ks [][]byte) ([]byte, error) {
|
||||
return t.upFromKeys(rKs)
|
||||
}
|
||||
|
||||
func (t *Tree) getLeafs() ([][]byte, [][]byte, error) {
|
||||
func (t *Tree) getLeafs(root []byte) ([][]byte, [][]byte, error) {
|
||||
var ks, vs [][]byte
|
||||
err := t.Iterate(func(k, v []byte) {
|
||||
err := t.iter(root, func(k, v []byte) {
|
||||
if v[0] != PrefixValueLeaf {
|
||||
return
|
||||
}
|
||||
@@ -299,3 +429,52 @@ func (t *Tree) getLeafs() ([][]byte, [][]byte, error) {
|
||||
})
|
||||
return ks, vs, err
|
||||
}
|
||||
|
||||
func (t *Tree) getKeysAtLevel(l int) ([][]byte, error) {
|
||||
var keys [][]byte
|
||||
err := t.iterWithStop(t.root, 0, func(currLvl int, k, v []byte) bool {
|
||||
if currLvl == l {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
if currLvl >= l {
|
||||
return true // to stop the iter from going down
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
return keys, err
|
||||
}
|
||||
|
||||
// cutPowerOfTwo returns []kv of length that is a power of 2, and a second []kv
|
||||
// with the extra elements that don't fit in a power of 2 length
|
||||
func cutPowerOfTwo(kvs []kv) ([]kv, []kv) {
|
||||
x := len(kvs)
|
||||
if (x & (x - 1)) != 0 {
|
||||
p2 := highestPowerOfTwo(x)
|
||||
return kvs[:p2], kvs[p2:]
|
||||
}
|
||||
return kvs, nil
|
||||
}
|
||||
|
||||
func highestPowerOfTwo(n int) int {
|
||||
res := 0
|
||||
for i := n; i >= 1; i-- {
|
||||
if (i & (i - 1)) == 0 {
|
||||
res = i
|
||||
break
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// func computeSimpleAddCost(nLeafs int) int {
|
||||
// // nLvls 2^nLvls
|
||||
// nLvls := int(math.Log2(float64(nLeafs)))
|
||||
// return nLvls * int(math.Pow(2, float64(nLvls)))
|
||||
// }
|
||||
//
|
||||
// func computeBottomUpAddCost(nLeafs int) int {
|
||||
// // 2^nLvls * 2 - 1
|
||||
// nLvls := int(math.Log2(float64(nLeafs)))
|
||||
// return (int(math.Pow(2, float64(nLvls))) * 2) - 1
|
||||
// }
|
||||
|
||||
Reference in New Issue
Block a user