mirror of
https://github.com/arnaucube/arbo.git
synced 2026-01-09 07:21:28 +01:00
Simplify cyclomatic complexity of AddBatch
This commit is contained in:
@@ -12,6 +12,7 @@ linters:
|
||||
- goimports
|
||||
- lll
|
||||
- golint
|
||||
- gocyclo
|
||||
linters-settings:
|
||||
lll:
|
||||
line-length: 100
|
||||
|
||||
241
addbatch.go
241
addbatch.go
@@ -170,28 +170,16 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
||||
// nCPU
|
||||
nCPU := highestPowerOfTwo(runtime.NumCPU())
|
||||
l := int(math.Log2(float64(nCPU)))
|
||||
var invalids []int
|
||||
|
||||
// CASE A: if nLeafs==0 (root==0)
|
||||
if bytes.Equal(t.root, t.emptyHash) {
|
||||
// if len(kvs) is not a power of 2, cut at the bigger power
|
||||
// of two under len(kvs), build the tree with that, and add
|
||||
// later the excedents
|
||||
kvsP2, kvsNonP2 := cutPowerOfTwo(kvs)
|
||||
invalids, err := t.buildTreeBottomUp(nCPU, kvsP2)
|
||||
invalids, err = t.caseA(nCPU, kvs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(kvsNonP2); i++ {
|
||||
err = t.add(0, kvsNonP2[i].k, kvsNonP2[i].v)
|
||||
if err != nil {
|
||||
invalids = append(invalids, kvsNonP2[i].pos)
|
||||
}
|
||||
}
|
||||
// store root to db
|
||||
if err := t.tx.Put(dbKeyRoot, t.root); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = t.tx.Commit(); err != nil {
|
||||
|
||||
if err = t.finalizeAddBatch(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return invalids, nil
|
||||
@@ -203,7 +191,8 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
||||
return nil, err
|
||||
}
|
||||
if nLeafs < minLeafsThreshold { // CASE B
|
||||
invalids, excedents, err := t.caseB(nCPU, 0, kvs)
|
||||
var excedents []kv
|
||||
invalids, excedents, err = t.caseB(nCPU, 0, kvs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -214,11 +203,8 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
||||
invalids = append(invalids, excedents[i].pos)
|
||||
}
|
||||
}
|
||||
// store root to db
|
||||
if err := t.tx.Put(dbKeyRoot, t.root); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = t.tx.Commit(); err != nil {
|
||||
|
||||
if err = t.finalizeAddBatch(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return invalids, nil
|
||||
@@ -231,95 +217,40 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
||||
|
||||
// CASE C: if nLeafs>=minLeafsThreshold && (nLeafs/nBuckets) < minLeafsThreshold
|
||||
// available parallelization, will need to be a power of 2 (2**n)
|
||||
var excedents []kv
|
||||
if nLeafs >= minLeafsThreshold &&
|
||||
(nLeafs/nCPU) < minLeafsThreshold &&
|
||||
len(keysAtL) == nCPU {
|
||||
// TODO move to own function
|
||||
// 1. go down until level L (L=log2(nBuckets))
|
||||
|
||||
buckets := splitInBuckets(kvs, nCPU)
|
||||
|
||||
// 2. use keys at level L as roots of the subtrees under each one
|
||||
excedentsInBucket := make([][]kv, nCPU)
|
||||
subRoots := make([][]byte, nCPU)
|
||||
txs := make([]db.Tx, nCPU)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(nCPU)
|
||||
for i := 0; i < nCPU; i++ {
|
||||
go func(cpu int) {
|
||||
var err error
|
||||
txs[cpu], err = t.db.NewTx()
|
||||
if err != nil {
|
||||
panic(err) // TODO WIP
|
||||
}
|
||||
bucketTree := Tree{tx: txs[cpu], db: t.db, maxLevels: t.maxLevels,
|
||||
hashFunction: t.hashFunction, root: keysAtL[cpu]}
|
||||
|
||||
// 3. and do CASE B (with 1 cpu) for each
|
||||
_, bucketExcedents, err := bucketTree.caseB(1, l, buckets[cpu])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
// return nil, err
|
||||
}
|
||||
excedentsInBucket[cpu] = bucketExcedents
|
||||
subRoots[cpu] = bucketTree.root
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// merge buckets txs into Tree.tx
|
||||
for i := 0; i < len(txs); i++ {
|
||||
if err := t.tx.Add(txs[i]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(excedentsInBucket); i++ {
|
||||
excedents = append(excedents, excedentsInBucket[i]...)
|
||||
}
|
||||
|
||||
// 4. go upFromKeys from the new roots of the subtrees
|
||||
newRoot, err := t.upFromKeys(subRoots)
|
||||
invalids, err = t.caseC(nCPU, l, keysAtL, kvs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.root = newRoot
|
||||
|
||||
// add the key-values that have not been used yet
|
||||
var invalids []int
|
||||
for i := 0; i < len(excedents); i++ {
|
||||
// Add until the level L
|
||||
err = t.add(0, excedents[i].k, excedents[i].v)
|
||||
if err != nil {
|
||||
invalids = append(invalids, excedents[i].pos) // TODO WIP
|
||||
}
|
||||
}
|
||||
// store root to db
|
||||
if err := t.tx.Put(dbKeyRoot, t.root); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = t.tx.Commit(); err != nil {
|
||||
if err = t.finalizeAddBatch(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return invalids, nil
|
||||
}
|
||||
|
||||
var invalids []int
|
||||
// CASE E
|
||||
if len(keysAtL) != nCPU {
|
||||
// CASE E: add one key at each bucket, and then do CASE D
|
||||
buckets := splitInBuckets(kvs, nCPU)
|
||||
kvs = []kv{}
|
||||
for i := 0; i < len(buckets); i++ {
|
||||
err = t.add(0, buckets[i][0].k, buckets[i][0].v)
|
||||
if err != nil {
|
||||
invalids = append(invalids, buckets[i][0].pos)
|
||||
// TODO if err, add another key-value from the
|
||||
// same bucket
|
||||
// add one leaf of the bucket, if there is an error when
|
||||
// adding the k-v, try to add the next one of the bucket
|
||||
// (until one is added)
|
||||
var inserted int
|
||||
for j := 0; j < len(buckets[i]); j++ {
|
||||
if err := t.add(0, buckets[i][j].k, buckets[i][j].v); err == nil {
|
||||
inserted = j
|
||||
break
|
||||
}
|
||||
kvs = append(kvs, buckets[i][1:]...)
|
||||
}
|
||||
|
||||
// put the buckets elements except the inserted one
|
||||
kvs = append(kvs, buckets[i][:inserted]...)
|
||||
kvs = append(kvs, buckets[i][inserted+1:]...)
|
||||
}
|
||||
keysAtL, err = t.getKeysAtLevel(l + 1)
|
||||
if err != nil {
|
||||
@@ -327,24 +258,6 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if nCPU == 1 { // CASE D, but with 1 cpu
|
||||
for i := 0; i < len(keys); i++ {
|
||||
err = t.add(0, keys[i], values[i])
|
||||
if err != nil {
|
||||
invalids = append(invalids, i)
|
||||
}
|
||||
}
|
||||
// store root to db
|
||||
if err := t.tx.Put(dbKeyRoot, t.root); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = t.tx.Commit(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return invalids, nil
|
||||
}
|
||||
|
||||
// CASE D
|
||||
if len(keysAtL) == nCPU { // enter in CASE D if len(keysAtL)=nCPU, if not, CASE E
|
||||
invalidsCaseD, err := t.caseD(nCPU, l, keysAtL, kvs)
|
||||
@@ -352,12 +265,8 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
||||
return nil, err
|
||||
}
|
||||
invalids = append(invalids, invalidsCaseD...)
|
||||
// store root to db
|
||||
if err := t.tx.Put(dbKeyRoot, t.root); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = t.tx.Commit(); err != nil {
|
||||
if err = t.finalizeAddBatch(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return invalids, nil
|
||||
@@ -368,6 +277,35 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) {
|
||||
return nil, fmt.Errorf("UNIMPLEMENTED")
|
||||
}
|
||||
|
||||
func (t *Tree) finalizeAddBatch() error {
|
||||
// store root to db
|
||||
if err := t.tx.Put(dbKeyRoot, t.root); err != nil {
|
||||
return err
|
||||
}
|
||||
// commit db tx
|
||||
if err := t.tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Tree) caseA(nCPU int, kvs []kv) ([]int, error) {
|
||||
// if len(kvs) is not a power of 2, cut at the bigger power
|
||||
// of two under len(kvs), build the tree with that, and add
|
||||
// later the excedents
|
||||
kvsP2, kvsNonP2 := cutPowerOfTwo(kvs)
|
||||
invalids, err := t.buildTreeBottomUp(nCPU, kvsP2)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(kvsNonP2); i++ {
|
||||
if err = t.add(0, kvsNonP2[i].k, kvsNonP2[i].v); err != nil {
|
||||
invalids = append(invalids, kvsNonP2[i].pos)
|
||||
}
|
||||
}
|
||||
return invalids, nil
|
||||
}
|
||||
|
||||
func (t *Tree) caseB(nCPU, l int, kvs []kv) ([]int, []kv, error) {
|
||||
// get already existing keys
|
||||
aKs, aVs, err := t.getLeafs(t.root)
|
||||
@@ -402,7 +340,80 @@ func (t *Tree) caseB(nCPU, l int, kvs []kv) ([]int, []kv, error) {
|
||||
return invalids, kvsNonP2, nil
|
||||
}
|
||||
|
||||
func (t *Tree) caseC(nCPU, l int, keysAtL [][]byte, kvs []kv) ([]int, error) {
|
||||
// 1. go down until level L (L=log2(nBuckets)): keysAtL
|
||||
|
||||
var excedents []kv
|
||||
buckets := splitInBuckets(kvs, nCPU)
|
||||
|
||||
// 2. use keys at level L as roots of the subtrees under each one
|
||||
excedentsInBucket := make([][]kv, nCPU)
|
||||
subRoots := make([][]byte, nCPU)
|
||||
txs := make([]db.Tx, nCPU)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(nCPU)
|
||||
for i := 0; i < nCPU; i++ {
|
||||
go func(cpu int) {
|
||||
var err error
|
||||
txs[cpu], err = t.db.NewTx()
|
||||
if err != nil {
|
||||
panic(err) // TODO WIP
|
||||
}
|
||||
bucketTree := Tree{tx: txs[cpu], db: t.db, maxLevels: t.maxLevels,
|
||||
hashFunction: t.hashFunction, root: keysAtL[cpu]}
|
||||
|
||||
// 3. do CASE B (with 1 cpu) for each key at level L
|
||||
_, bucketExcedents, err := bucketTree.caseB(1, l, buckets[cpu])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
// return nil, err
|
||||
}
|
||||
excedentsInBucket[cpu] = bucketExcedents
|
||||
subRoots[cpu] = bucketTree.root
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// merge buckets txs into Tree.tx
|
||||
for i := 0; i < len(txs); i++ {
|
||||
if err := t.tx.Add(txs[i]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(excedentsInBucket); i++ {
|
||||
excedents = append(excedents, excedentsInBucket[i]...)
|
||||
}
|
||||
|
||||
// 4. go upFromKeys from the new roots of the subtrees
|
||||
newRoot, err := t.upFromKeys(subRoots)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.root = newRoot
|
||||
|
||||
// add the key-values that have not been used yet
|
||||
var invalids []int
|
||||
for i := 0; i < len(excedents); i++ {
|
||||
// Add until the level L
|
||||
if err = t.add(0, excedents[i].k, excedents[i].v); err != nil {
|
||||
invalids = append(invalids, excedents[i].pos) // TODO WIP
|
||||
}
|
||||
}
|
||||
return invalids, nil
|
||||
}
|
||||
|
||||
func (t *Tree) caseD(nCPU, l int, keysAtL [][]byte, kvs []kv) ([]int, error) {
|
||||
if nCPU == 1 { // CASE D, but with 1 cpu
|
||||
var invalids []int
|
||||
for i := 0; i < len(kvs); i++ {
|
||||
if err := t.add(0, kvs[i].k, kvs[i].v); err != nil {
|
||||
invalids = append(invalids, kvs[i].pos)
|
||||
}
|
||||
}
|
||||
return invalids, nil
|
||||
}
|
||||
|
||||
buckets := splitInBuckets(kvs, nCPU)
|
||||
|
||||
subRoots := make([][]byte, nCPU)
|
||||
|
||||
@@ -517,3 +517,4 @@ func TestHighestPowerOfTwo(t *testing.T) {
|
||||
// adding the rest of keys with loop over normal Add, and with AddBatch
|
||||
|
||||
// TODO test adding batch with repeated keys in the batch
|
||||
// TODO test adding batch with multiple invalid keys
|
||||
|
||||
Reference in New Issue
Block a user