diff --git a/.golangci.yml b/.golangci.yml index 3212fdc..08dd609 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -12,6 +12,7 @@ linters: - goimports - lll - golint + - gocyclo linters-settings: lll: line-length: 100 diff --git a/addbatch.go b/addbatch.go index 345d080..475b3db 100644 --- a/addbatch.go +++ b/addbatch.go @@ -170,28 +170,16 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) { // nCPU nCPU := highestPowerOfTwo(runtime.NumCPU()) l := int(math.Log2(float64(nCPU))) + var invalids []int // CASE A: if nLeafs==0 (root==0) if bytes.Equal(t.root, t.emptyHash) { - // if len(kvs) is not a power of 2, cut at the bigger power - // of two under len(kvs), build the tree with that, and add - // later the excedents - kvsP2, kvsNonP2 := cutPowerOfTwo(kvs) - invalids, err := t.buildTreeBottomUp(nCPU, kvsP2) + invalids, err = t.caseA(nCPU, kvs) if err != nil { return nil, err } - for i := 0; i < len(kvsNonP2); i++ { - err = t.add(0, kvsNonP2[i].k, kvsNonP2[i].v) - if err != nil { - invalids = append(invalids, kvsNonP2[i].pos) - } - } - // store root to db - if err := t.tx.Put(dbKeyRoot, t.root); err != nil { - return nil, err - } - if err = t.tx.Commit(); err != nil { + + if err = t.finalizeAddBatch(); err != nil { return nil, err } return invalids, nil @@ -203,7 +191,8 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) { return nil, err } if nLeafs < minLeafsThreshold { // CASE B - invalids, excedents, err := t.caseB(nCPU, 0, kvs) + var excedents []kv + invalids, excedents, err = t.caseB(nCPU, 0, kvs) if err != nil { return nil, err } @@ -214,11 +203,8 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) { invalids = append(invalids, excedents[i].pos) } } - // store root to db - if err := t.tx.Put(dbKeyRoot, t.root); err != nil { - return nil, err - } - if err = t.tx.Commit(); err != nil { + + if err = t.finalizeAddBatch(); err != nil { return nil, err } return invalids, nil @@ -231,95 +217,40 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) { // CASE C: if nLeafs>=minLeafsThreshold && (nLeafs/nBuckets) < minLeafsThreshold // available parallelization, will need to be a power of 2 (2**n) - var excedents []kv if nLeafs >= minLeafsThreshold && (nLeafs/nCPU) < minLeafsThreshold && len(keysAtL) == nCPU { - // TODO move to own function - // 1. go down until level L (L=log2(nBuckets)) - - buckets := splitInBuckets(kvs, nCPU) - - // 2. use keys at level L as roots of the subtrees under each one - excedentsInBucket := make([][]kv, nCPU) - subRoots := make([][]byte, nCPU) - txs := make([]db.Tx, nCPU) - var wg sync.WaitGroup - wg.Add(nCPU) - for i := 0; i < nCPU; i++ { - go func(cpu int) { - var err error - txs[cpu], err = t.db.NewTx() - if err != nil { - panic(err) // TODO WIP - } - bucketTree := Tree{tx: txs[cpu], db: t.db, maxLevels: t.maxLevels, - hashFunction: t.hashFunction, root: keysAtL[cpu]} - - // 3. and do CASE B (with 1 cpu) for each - _, bucketExcedents, err := bucketTree.caseB(1, l, buckets[cpu]) - if err != nil { - panic(err) - // return nil, err - } - excedentsInBucket[cpu] = bucketExcedents - subRoots[cpu] = bucketTree.root - wg.Done() - }(i) - } - wg.Wait() - - // merge buckets txs into Tree.tx - for i := 0; i < len(txs); i++ { - if err := t.tx.Add(txs[i]); err != nil { - return nil, err - } - } - for i := 0; i < len(excedentsInBucket); i++ { - excedents = append(excedents, excedentsInBucket[i]...) - } - - // 4. go upFromKeys from the new roots of the subtrees - newRoot, err := t.upFromKeys(subRoots) + invalids, err = t.caseC(nCPU, l, keysAtL, kvs) if err != nil { return nil, err } - t.root = newRoot - - // add the key-values that have not been used yet - var invalids []int - for i := 0; i < len(excedents); i++ { - // Add until the level L - err = t.add(0, excedents[i].k, excedents[i].v) - if err != nil { - invalids = append(invalids, excedents[i].pos) // TODO WIP - } - } - // store root to db - if err := t.tx.Put(dbKeyRoot, t.root); err != nil { - return nil, err - } - if err = t.tx.Commit(); err != nil { + if err = t.finalizeAddBatch(); err != nil { return nil, err } return invalids, nil } - var invalids []int // CASE E if len(keysAtL) != nCPU { // CASE E: add one key at each bucket, and then do CASE D buckets := splitInBuckets(kvs, nCPU) kvs = []kv{} for i := 0; i < len(buckets); i++ { - err = t.add(0, buckets[i][0].k, buckets[i][0].v) - if err != nil { - invalids = append(invalids, buckets[i][0].pos) - // TODO if err, add another key-value from the - // same bucket + // add one leaf of the bucket, if there is an error when + // adding the k-v, try to add the next one of the bucket + // (until one is added) + var inserted int + for j := 0; j < len(buckets[i]); j++ { + if err := t.add(0, buckets[i][j].k, buckets[i][j].v); err == nil { + inserted = j + break + } } - kvs = append(kvs, buckets[i][1:]...) + + // put the buckets elements except the inserted one + kvs = append(kvs, buckets[i][:inserted]...) + kvs = append(kvs, buckets[i][inserted+1:]...) } keysAtL, err = t.getKeysAtLevel(l + 1) if err != nil { @@ -327,24 +258,6 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) { } } - if nCPU == 1 { // CASE D, but with 1 cpu - for i := 0; i < len(keys); i++ { - err = t.add(0, keys[i], values[i]) - if err != nil { - invalids = append(invalids, i) - } - } - // store root to db - if err := t.tx.Put(dbKeyRoot, t.root); err != nil { - return nil, err - } - - if err = t.tx.Commit(); err != nil { - return nil, err - } - return invalids, nil - } - // CASE D if len(keysAtL) == nCPU { // enter in CASE D if len(keysAtL)=nCPU, if not, CASE E invalidsCaseD, err := t.caseD(nCPU, l, keysAtL, kvs) @@ -352,12 +265,8 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) { return nil, err } invalids = append(invalids, invalidsCaseD...) - // store root to db - if err := t.tx.Put(dbKeyRoot, t.root); err != nil { - return nil, err - } - if err = t.tx.Commit(); err != nil { + if err = t.finalizeAddBatch(); err != nil { return nil, err } return invalids, nil @@ -368,6 +277,35 @@ func (t *Tree) AddBatchOpt(keys, values [][]byte) ([]int, error) { return nil, fmt.Errorf("UNIMPLEMENTED") } +func (t *Tree) finalizeAddBatch() error { + // store root to db + if err := t.tx.Put(dbKeyRoot, t.root); err != nil { + return err + } + // commit db tx + if err := t.tx.Commit(); err != nil { + return err + } + return nil +} + +func (t *Tree) caseA(nCPU int, kvs []kv) ([]int, error) { + // if len(kvs) is not a power of 2, cut at the bigger power + // of two under len(kvs), build the tree with that, and add + // later the excedents + kvsP2, kvsNonP2 := cutPowerOfTwo(kvs) + invalids, err := t.buildTreeBottomUp(nCPU, kvsP2) + if err != nil { + return nil, err + } + for i := 0; i < len(kvsNonP2); i++ { + if err = t.add(0, kvsNonP2[i].k, kvsNonP2[i].v); err != nil { + invalids = append(invalids, kvsNonP2[i].pos) + } + } + return invalids, nil +} + func (t *Tree) caseB(nCPU, l int, kvs []kv) ([]int, []kv, error) { // get already existing keys aKs, aVs, err := t.getLeafs(t.root) @@ -402,7 +340,80 @@ func (t *Tree) caseB(nCPU, l int, kvs []kv) ([]int, []kv, error) { return invalids, kvsNonP2, nil } +func (t *Tree) caseC(nCPU, l int, keysAtL [][]byte, kvs []kv) ([]int, error) { + // 1. go down until level L (L=log2(nBuckets)): keysAtL + + var excedents []kv + buckets := splitInBuckets(kvs, nCPU) + + // 2. use keys at level L as roots of the subtrees under each one + excedentsInBucket := make([][]kv, nCPU) + subRoots := make([][]byte, nCPU) + txs := make([]db.Tx, nCPU) + var wg sync.WaitGroup + wg.Add(nCPU) + for i := 0; i < nCPU; i++ { + go func(cpu int) { + var err error + txs[cpu], err = t.db.NewTx() + if err != nil { + panic(err) // TODO WIP + } + bucketTree := Tree{tx: txs[cpu], db: t.db, maxLevels: t.maxLevels, + hashFunction: t.hashFunction, root: keysAtL[cpu]} + + // 3. do CASE B (with 1 cpu) for each key at level L + _, bucketExcedents, err := bucketTree.caseB(1, l, buckets[cpu]) + if err != nil { + panic(err) + // return nil, err + } + excedentsInBucket[cpu] = bucketExcedents + subRoots[cpu] = bucketTree.root + wg.Done() + }(i) + } + wg.Wait() + + // merge buckets txs into Tree.tx + for i := 0; i < len(txs); i++ { + if err := t.tx.Add(txs[i]); err != nil { + return nil, err + } + } + for i := 0; i < len(excedentsInBucket); i++ { + excedents = append(excedents, excedentsInBucket[i]...) + } + + // 4. go upFromKeys from the new roots of the subtrees + newRoot, err := t.upFromKeys(subRoots) + if err != nil { + return nil, err + } + t.root = newRoot + + // add the key-values that have not been used yet + var invalids []int + for i := 0; i < len(excedents); i++ { + // Add until the level L + if err = t.add(0, excedents[i].k, excedents[i].v); err != nil { + invalids = append(invalids, excedents[i].pos) // TODO WIP + } + } + return invalids, nil +} + func (t *Tree) caseD(nCPU, l int, keysAtL [][]byte, kvs []kv) ([]int, error) { + if nCPU == 1 { // CASE D, but with 1 cpu + var invalids []int + for i := 0; i < len(kvs); i++ { + if err := t.add(0, kvs[i].k, kvs[i].v); err != nil { + invalids = append(invalids, kvs[i].pos) + } + } + return invalids, nil + } + buckets := splitInBuckets(kvs, nCPU) subRoots := make([][]byte, nCPU) diff --git a/addbatch_test.go b/addbatch_test.go index a90d042..11a0fa8 100644 --- a/addbatch_test.go +++ b/addbatch_test.go @@ -517,3 +517,4 @@ func TestHighestPowerOfTwo(t *testing.T) { // adding the rest of keys with loop over normal Add, and with AddBatch // TODO test adding batch with repeated keys in the batch +// TODO test adding batch with multiple invalid keys