mirror of
https://github.com/arnaucube/arbo.git
synced 2026-01-09 07:21:28 +01:00
Update AddBatch to return invalids index + error
This commit is contained in:
11
tree.go
11
tree.go
@@ -165,10 +165,17 @@ func (t *Tree) editable() bool {
|
|||||||
return t.snapshotRoot == nil
|
return t.snapshotRoot == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Invalid is used when a key-value can not be added trough AddBatch, and
|
||||||
|
// contains the index of the key-value and the error.
|
||||||
|
type Invalid struct {
|
||||||
|
Index int
|
||||||
|
Error error
|
||||||
|
}
|
||||||
|
|
||||||
// AddBatch adds a batch of key-values to the Tree. Returns an array containing
|
// AddBatch adds a batch of key-values to the Tree. Returns an array containing
|
||||||
// the indexes of the keys failed to add. Supports empty values as input
|
// the indexes of the keys failed to add. Supports empty values as input
|
||||||
// parameters, which is equivalent to 0 valued byte array.
|
// parameters, which is equivalent to 0 valued byte array.
|
||||||
func (t *Tree) AddBatch(keys, values [][]byte) ([]int, error) {
|
func (t *Tree) AddBatch(keys, values [][]byte) ([]Invalid, error) {
|
||||||
wTx := t.db.WriteTx()
|
wTx := t.db.WriteTx()
|
||||||
defer wTx.Discard()
|
defer wTx.Discard()
|
||||||
|
|
||||||
@@ -182,7 +189,7 @@ func (t *Tree) AddBatch(keys, values [][]byte) ([]int, error) {
|
|||||||
// AddBatchWithTx does the same than the AddBatch method, but allowing to pass
|
// AddBatchWithTx does the same than the AddBatch method, but allowing to pass
|
||||||
// the db.WriteTx that is used. The db.WriteTx will not be committed inside
|
// the db.WriteTx that is used. The db.WriteTx will not be committed inside
|
||||||
// this method.
|
// this method.
|
||||||
func (t *Tree) AddBatchWithTx(wTx db.WriteTx, keys, values [][]byte) ([]int, error) {
|
func (t *Tree) AddBatchWithTx(wTx db.WriteTx, keys, values [][]byte) ([]Invalid, error) {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
||||||
|
|||||||
23
vt.go
23
vt.go
@@ -37,20 +37,17 @@ type kv struct {
|
|||||||
v []byte
|
v []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *params) keysValuesToKvs(ks, vs [][]byte) ([]kv, []int, error) {
|
func (p *params) keysValuesToKvs(ks, vs [][]byte) ([]kv, []Invalid, error) {
|
||||||
if len(ks) != len(vs) {
|
if len(ks) != len(vs) {
|
||||||
return nil, nil, fmt.Errorf("len(keys)!=len(values) (%d!=%d)",
|
return nil, nil, fmt.Errorf("len(keys)!=len(values) (%d!=%d)",
|
||||||
len(ks), len(vs))
|
len(ks), len(vs))
|
||||||
}
|
}
|
||||||
var invalids []int
|
var invalids []Invalid
|
||||||
var kvs []kv
|
var kvs []kv
|
||||||
for i := 0; i < len(ks); i++ {
|
for i := 0; i < len(ks); i++ {
|
||||||
keyPath, err := keyPathFromKey(p.maxLevels, ks[i])
|
keyPath, err := keyPathFromKey(p.maxLevels, ks[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO in a future iteration, invalids will contain
|
invalids = append(invalids, Invalid{i, err})
|
||||||
// the reason of the error of why each index is
|
|
||||||
// invalid.
|
|
||||||
invalids = append(invalids, i)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,13 +87,13 @@ func newVT(maxLevels int, hash HashFunction) vt {
|
|||||||
// computation of hashes of the nodes neither the storage of the key-values of
|
// computation of hashes of the nodes neither the storage of the key-values of
|
||||||
// the tree into the db. After addBatch, vt.computeHashes should be called to
|
// the tree into the db. After addBatch, vt.computeHashes should be called to
|
||||||
// compute the hashes of all the nodes of the tree.
|
// compute the hashes of all the nodes of the tree.
|
||||||
func (t *vt) addBatch(ks, vs [][]byte) ([]int, error) {
|
func (t *vt) addBatch(ks, vs [][]byte) ([]Invalid, error) {
|
||||||
nCPU := flp2(runtime.NumCPU())
|
nCPU := flp2(runtime.NumCPU())
|
||||||
if nCPU == 1 || len(ks) < nCPU {
|
if nCPU == 1 || len(ks) < nCPU {
|
||||||
var invalids []int
|
var invalids []Invalid
|
||||||
for i := 0; i < len(ks); i++ {
|
for i := 0; i < len(ks); i++ {
|
||||||
if err := t.add(0, ks[i], vs[i]); err != nil {
|
if err := t.add(0, ks[i], vs[i]); err != nil {
|
||||||
invalids = append(invalids, i)
|
invalids = append(invalids, Invalid{i, err})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return invalids, nil
|
return invalids, nil
|
||||||
@@ -177,7 +174,7 @@ func (t *vt) addBatch(ks, vs [][]byte) ([]int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
subRoots := make([]*node, nCPU)
|
subRoots := make([]*node, nCPU)
|
||||||
invalidsInBucket := make([][]int, nCPU)
|
invalidsInBucket := make([][]Invalid, nCPU)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(nCPU)
|
wg.Add(nCPU)
|
||||||
@@ -186,8 +183,10 @@ func (t *vt) addBatch(ks, vs [][]byte) ([]int, error) {
|
|||||||
bucketVT := newVT(t.params.maxLevels, t.params.hashFunction)
|
bucketVT := newVT(t.params.maxLevels, t.params.hashFunction)
|
||||||
bucketVT.root = nodesAtL[cpu]
|
bucketVT.root = nodesAtL[cpu]
|
||||||
for j := 0; j < len(buckets[cpu]); j++ {
|
for j := 0; j < len(buckets[cpu]); j++ {
|
||||||
if err := bucketVT.add(l, buckets[cpu][j].k, buckets[cpu][j].v); err != nil {
|
if err := bucketVT.add(l, buckets[cpu][j].k,
|
||||||
invalidsInBucket[cpu] = append(invalidsInBucket[cpu], buckets[cpu][j].pos)
|
buckets[cpu][j].v); err != nil {
|
||||||
|
invalidsInBucket[cpu] = append(invalidsInBucket[cpu],
|
||||||
|
Invalid{buckets[cpu][j].pos, err})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
subRoots[cpu] = bucketVT.root
|
subRoots[cpu] = bucketVT.root
|
||||||
|
|||||||
Reference in New Issue
Block a user