You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

688 lines
15 KiB

  1. // Package arbo > vt.go implements the Virtual Tree, which computes a tree
  2. // without computing any hash. With the idea of once all the leafs are placed in
  3. // their positions, the hashes can be computed, avoiding computing a node hash
  4. // more than one time.
  5. package arbo
  6. import (
  7. "bytes"
  8. "encoding/hex"
  9. "fmt"
  10. "io"
  11. "math"
  12. "runtime"
  13. "sync"
  14. )
  15. type node struct {
  16. l *node
  17. r *node
  18. k []byte
  19. v []byte
  20. path []bool
  21. h []byte
  22. }
  23. type params struct {
  24. maxLevels int
  25. hashFunction HashFunction
  26. emptyHash []byte
  27. dbg *dbgStats
  28. }
  29. type kv struct {
  30. pos int // original position in the inputted array
  31. keyPath []byte
  32. k []byte
  33. v []byte
  34. }
  35. func (p *params) keysValuesToKvs(ks, vs [][]byte) ([]kv, error) {
  36. if len(ks) != len(vs) {
  37. return nil, fmt.Errorf("len(keys)!=len(values) (%d!=%d)",
  38. len(ks), len(vs))
  39. }
  40. kvs := make([]kv, len(ks))
  41. for i := 0; i < len(ks); i++ {
  42. keyPath := make([]byte, p.hashFunction.Len())
  43. copy(keyPath[:], ks[i])
  44. kvs[i].pos = i
  45. kvs[i].keyPath = keyPath
  46. kvs[i].k = ks[i]
  47. kvs[i].v = vs[i]
  48. }
  49. return kvs, nil
  50. }
  51. // vt stands for virtual tree. It's a tree that does not have any computed hash
  52. // while placing the leafs. Once all the leafs are placed, it computes all the
  53. // hashes. In this way, each node hash is only computed one time (at the end)
  54. // and the tree is computed in memory.
  55. type vt struct {
  56. root *node
  57. params *params
  58. }
  59. func newVT(maxLevels int, hash HashFunction) vt {
  60. return vt{
  61. root: nil,
  62. params: &params{
  63. maxLevels: maxLevels,
  64. hashFunction: hash,
  65. emptyHash: make([]byte, hash.Len()), // empty
  66. },
  67. }
  68. }
  69. // addBatch adds a batch of key-values to the VirtualTree. Returns an array
  70. // containing the indexes of the keys failed to add. Does not include the
  71. // computation of hashes of the nodes neither the storage of the key-values of
  72. // the tree into the db. After addBatch, vt.computeHashes should be called to
  73. // compute the hashes of all the nodes of the tree.
  74. func (t *vt) addBatch(ks, vs [][]byte) ([]int, error) {
  75. nCPU := flp2(runtime.NumCPU())
  76. if nCPU == 1 || len(ks) < nCPU {
  77. var invalids []int
  78. for i := 0; i < len(ks); i++ {
  79. if err := t.add(0, ks[i], vs[i]); err != nil {
  80. invalids = append(invalids, i)
  81. }
  82. }
  83. return invalids, nil
  84. }
  85. l := int(math.Log2(float64(nCPU)))
  86. kvs, err := t.params.keysValuesToKvs(ks, vs)
  87. if err != nil {
  88. return nil, err
  89. }
  90. buckets := splitInBuckets(kvs, nCPU)
  91. nodesAtL, err := t.getNodesAtLevel(l)
  92. if err != nil {
  93. return nil, err
  94. }
  95. if len(nodesAtL) != nCPU && t.root != nil {
  96. /*
  97. Already populated Tree but Unbalanced
  98. - Need to fill M1 and M2, and then will be able to continue with the flow
  99. - Search for M1 & M2 in the inputed Keys
  100. - Add M1 & M2 to the Tree
  101. - From here can continue with the flow
  102. R
  103. / \
  104. / \
  105. / \
  106. * *
  107. | \
  108. | \
  109. | \
  110. L: M1 * M2 * (where M1 and M2 are empty)
  111. / | /
  112. / | /
  113. / | /
  114. A * *
  115. / \ | \
  116. / \ | \
  117. / \ | \
  118. B * * C
  119. / \ |\
  120. ... ... | \
  121. | \
  122. D E
  123. */
  124. // add one key at each bucket, and then continue with the flow
  125. for i := 0; i < len(buckets); i++ {
  126. // add one leaf of the bucket, if there is an error when
  127. // adding the k-v, try to add the next one of the bucket
  128. // (until one is added)
  129. inserted := -1
  130. for j := 0; j < len(buckets[i]); j++ {
  131. if err := t.add(0, buckets[i][j].k, buckets[i][j].v); err == nil {
  132. inserted = j
  133. break
  134. }
  135. }
  136. // remove the inserted element from buckets[i]
  137. if inserted != -1 {
  138. buckets[i] = append(buckets[i][:inserted], buckets[i][inserted+1:]...)
  139. }
  140. }
  141. nodesAtL, err = t.getNodesAtLevel(l)
  142. if err != nil {
  143. return nil, err
  144. }
  145. }
  146. if len(nodesAtL) != nCPU {
  147. panic("should not happen") // TODO TMP
  148. }
  149. subRoots := make([]*node, nCPU)
  150. invalidsInBucket := make([][]int, nCPU)
  151. var wg sync.WaitGroup
  152. wg.Add(nCPU)
  153. for i := 0; i < nCPU; i++ {
  154. go func(cpu int) {
  155. bucketVT := newVT(t.params.maxLevels-l, t.params.hashFunction)
  156. bucketVT.root = nodesAtL[cpu]
  157. for j := 0; j < len(buckets[cpu]); j++ {
  158. if err = bucketVT.add(l, buckets[cpu][j].k, buckets[cpu][j].v); err != nil {
  159. invalidsInBucket[cpu] = append(invalidsInBucket[cpu], buckets[cpu][j].pos)
  160. }
  161. }
  162. subRoots[cpu] = bucketVT.root
  163. wg.Done()
  164. }(i)
  165. }
  166. wg.Wait()
  167. var invalids []int
  168. for i := 0; i < len(invalidsInBucket); i++ {
  169. invalids = append(invalids, invalidsInBucket[i]...)
  170. }
  171. newRootNode, err := upFromNodes(subRoots)
  172. if err != nil {
  173. return nil, err
  174. }
  175. t.root = newRootNode
  176. return invalids, nil
  177. }
  178. func (t *vt) getNodesAtLevel(l int) ([]*node, error) {
  179. if t.root == nil {
  180. var r []*node
  181. nChilds := int(math.Pow(2, float64(l))) //nolint:gomnd
  182. for i := 0; i < nChilds; i++ {
  183. r = append(r, nil)
  184. }
  185. return r, nil
  186. }
  187. return t.root.getNodesAtLevel(0, l)
  188. }
  189. func (n *node) getNodesAtLevel(currLvl, l int) ([]*node, error) {
  190. if n == nil {
  191. var r []*node
  192. nChilds := int(math.Pow(2, float64(l-currLvl))) //nolint:gomnd
  193. for i := 0; i < nChilds; i++ {
  194. r = append(r, nil)
  195. }
  196. return r, nil
  197. }
  198. typ := n.typ()
  199. if currLvl == l && typ != vtEmpty {
  200. return []*node{n}, nil
  201. }
  202. if currLvl >= l {
  203. panic("should not reach this point") // TODO TMP
  204. }
  205. var nodes []*node
  206. nodesL, err := n.l.getNodesAtLevel(currLvl+1, l)
  207. if err != nil {
  208. return nil, err
  209. }
  210. nodes = append(nodes, nodesL...)
  211. nodesR, err := n.r.getNodesAtLevel(currLvl+1, l)
  212. if err != nil {
  213. return nil, err
  214. }
  215. nodes = append(nodes, nodesR...)
  216. return nodes, nil
  217. }
  218. // upFromNodes builds the tree from the bottom to up
  219. func upFromNodes(ns []*node) (*node, error) {
  220. if len(ns) == 1 {
  221. return ns[0], nil
  222. }
  223. var res []*node
  224. for i := 0; i < len(ns); i += 2 {
  225. if (ns[i].typ() == vtEmpty && ns[i+1].typ() == vtEmpty) ||
  226. (ns[i].typ() == vtLeaf && ns[i+1].typ() == vtEmpty) {
  227. // when both sub nodes are empty, the parent is also empty
  228. // or
  229. // when 1st sub node is a leaf but the 2nd is empty, the
  230. // leaf is used as parent
  231. res = append(res, ns[i])
  232. continue
  233. }
  234. if ns[i].typ() == vtEmpty && ns[i+1].typ() == vtLeaf {
  235. // when 2nd sub node is a leaf but the 1st is empty, the
  236. // leaf is used as 'parent'
  237. res = append(res, ns[i+1])
  238. continue
  239. }
  240. n := &node{
  241. l: ns[i],
  242. r: ns[i+1],
  243. }
  244. res = append(res, n)
  245. }
  246. return upFromNodes(res)
  247. }
  248. // add adds a key&value as a leaf in the VirtualTree
  249. func (t *vt) add(fromLvl int, k, v []byte) error {
  250. leaf := newLeafNode(t.params, k, v)
  251. if t.root == nil {
  252. t.root = leaf
  253. return nil
  254. }
  255. if err := t.root.add(t.params, fromLvl, leaf); err != nil {
  256. return err
  257. }
  258. return nil
  259. }
  260. // computeHashes should be called after all the vt.add is used, once all the
  261. // leafs are in the tree. Computes the hashes of the tree, parallelizing in the
  262. // available CPUs.
  263. func (t *vt) computeHashes() ([][2][]byte, error) {
  264. var err error
  265. nCPU := flp2(runtime.NumCPU())
  266. l := int(math.Log2(float64(nCPU)))
  267. nodesAtL, err := t.getNodesAtLevel(l)
  268. if err != nil {
  269. return nil, err
  270. }
  271. subRoots := make([]*node, nCPU)
  272. bucketPairs := make([][][2][]byte, nCPU)
  273. dbgStatsPerBucket := make([]*dbgStats, nCPU)
  274. var wg sync.WaitGroup
  275. wg.Add(nCPU)
  276. for i := 0; i < nCPU; i++ {
  277. go func(cpu int) {
  278. bucketVT := newVT(t.params.maxLevels-l, t.params.hashFunction)
  279. bucketVT.params.dbg = newDbgStats()
  280. bucketVT.root = nodesAtL[cpu]
  281. bucketPairs[cpu], err = bucketVT.root.computeHashes(l,
  282. t.params.maxLevels, bucketVT.params, bucketPairs[cpu])
  283. if err != nil {
  284. // TODO WIP
  285. panic("TODO" + err.Error())
  286. }
  287. subRoots[cpu] = bucketVT.root
  288. dbgStatsPerBucket[cpu] = bucketVT.params.dbg
  289. wg.Done()
  290. }(i)
  291. }
  292. wg.Wait()
  293. for i := 0; i < len(dbgStatsPerBucket); i++ {
  294. t.params.dbg.add(dbgStatsPerBucket[i])
  295. }
  296. var pairs [][2][]byte
  297. for i := 0; i < len(bucketPairs); i++ {
  298. pairs = append(pairs, bucketPairs[i]...)
  299. }
  300. nodesAtL, err = t.getNodesAtLevel(l)
  301. if err != nil {
  302. return nil, err
  303. }
  304. for i := 0; i < len(nodesAtL); i++ {
  305. nodesAtL = subRoots
  306. }
  307. pairs, err = t.root.computeHashes(0, l, t.params, pairs)
  308. if err != nil {
  309. return nil, err
  310. }
  311. return pairs, nil
  312. }
  313. func newLeafNode(p *params, k, v []byte) *node {
  314. keyPath := make([]byte, p.hashFunction.Len())
  315. copy(keyPath[:], k)
  316. path := getPath(p.maxLevels, keyPath)
  317. n := &node{
  318. k: k,
  319. v: v,
  320. path: path,
  321. }
  322. return n
  323. }
  324. type virtualNodeType int
  325. const (
  326. vtEmpty = 0 // for convenience uses same value that PrefixValueEmpty
  327. vtLeaf = 1 // for convenience uses same value that PrefixValueLeaf
  328. vtMid = 2 // for convenience uses same value that PrefixValueIntermediate
  329. )
  330. func (n *node) typ() virtualNodeType {
  331. if n == nil {
  332. return vtEmpty // TODO decide if return 'vtEmpty' or an error
  333. }
  334. if n.l == nil && n.r == nil && n.k != nil {
  335. return vtLeaf
  336. }
  337. if n.l != nil || n.r != nil {
  338. return vtMid
  339. }
  340. return vtEmpty
  341. }
  342. func (n *node) add(p *params, currLvl int, leaf *node) error {
  343. if currLvl > p.maxLevels-1 {
  344. return ErrMaxVirtualLevel
  345. }
  346. if n == nil {
  347. // n = leaf // TMP!
  348. return nil
  349. }
  350. t := n.typ()
  351. switch t {
  352. case vtMid:
  353. if leaf.path[currLvl] {
  354. //right
  355. if n.r == nil {
  356. // empty sub-node, add the leaf here
  357. n.r = leaf
  358. return nil
  359. }
  360. if err := n.r.add(p, currLvl+1, leaf); err != nil {
  361. return err
  362. }
  363. } else {
  364. if n.l == nil {
  365. // empty sub-node, add the leaf here
  366. n.l = leaf
  367. return nil
  368. }
  369. if err := n.l.add(p, currLvl+1, leaf); err != nil {
  370. return err
  371. }
  372. }
  373. case vtLeaf:
  374. if bytes.Equal(n.k, leaf.k) {
  375. return fmt.Errorf("%s. Existing node: %s, trying to add node: %s",
  376. ErrKeyAlreadyExists, hex.EncodeToString(n.k),
  377. hex.EncodeToString(leaf.k))
  378. }
  379. oldLeaf := &node{
  380. k: n.k,
  381. v: n.v,
  382. path: n.path,
  383. }
  384. // remove values from current node (converting it to mid node)
  385. n.k = nil
  386. n.v = nil
  387. n.h = nil
  388. n.path = nil
  389. if err := n.downUntilDivergence(p, currLvl, oldLeaf, leaf); err != nil {
  390. return err
  391. }
  392. case vtEmpty:
  393. panic(fmt.Errorf("EMPTY %v", n)) // TODO TMP
  394. default:
  395. return fmt.Errorf("ERR") // TODO TMP
  396. }
  397. return nil
  398. }
  399. func (n *node) downUntilDivergence(p *params, currLvl int, oldLeaf, newLeaf *node) error {
  400. if currLvl > p.maxLevels-1 {
  401. return ErrMaxVirtualLevel
  402. }
  403. if oldLeaf.path[currLvl] != newLeaf.path[currLvl] {
  404. // reached divergence in next level
  405. if newLeaf.path[currLvl] {
  406. n.l = oldLeaf
  407. n.r = newLeaf
  408. } else {
  409. n.l = newLeaf
  410. n.r = oldLeaf
  411. }
  412. return nil
  413. }
  414. // no divergence yet, continue going down
  415. if newLeaf.path[currLvl] {
  416. // right
  417. n.r = &node{}
  418. if err := n.r.downUntilDivergence(p, currLvl+1, oldLeaf, newLeaf); err != nil {
  419. return err
  420. }
  421. } else {
  422. // left
  423. n.l = &node{}
  424. if err := n.l.downUntilDivergence(p, currLvl+1, oldLeaf, newLeaf); err != nil {
  425. return err
  426. }
  427. }
  428. return nil
  429. }
  430. func splitInBuckets(kvs []kv, nBuckets int) [][]kv {
  431. buckets := make([][]kv, nBuckets)
  432. // 1. classify the keyvalues into buckets
  433. for i := 0; i < len(kvs); i++ {
  434. pair := kvs[i]
  435. // bucketnum := keyToBucket(pair.k, nBuckets)
  436. bucketnum := keyToBucket(pair.keyPath, nBuckets)
  437. buckets[bucketnum] = append(buckets[bucketnum], pair)
  438. }
  439. return buckets
  440. }
  441. // TODO rename in a more 'real' name (calculate bucket from/for key)
  442. func keyToBucket(k []byte, nBuckets int) int {
  443. nLevels := int(math.Log2(float64(nBuckets)))
  444. b := make([]int, nBuckets)
  445. for i := 0; i < nBuckets; i++ {
  446. b[i] = i
  447. }
  448. r := b
  449. mid := len(r) / 2 //nolint:gomnd
  450. for i := 0; i < nLevels; i++ {
  451. if int(k[i/8]&(1<<(i%8))) != 0 {
  452. r = r[mid:]
  453. mid = len(r) / 2 //nolint:gomnd
  454. } else {
  455. r = r[:mid]
  456. mid = len(r) / 2 //nolint:gomnd
  457. }
  458. }
  459. return r[0]
  460. }
  461. // flp2 computes the floor power of 2, the highest power of 2 under the given
  462. // value.
  463. func flp2(n int) int {
  464. res := 0
  465. for i := n; i >= 1; i-- {
  466. if (i & (i - 1)) == 0 {
  467. res = i
  468. break
  469. }
  470. }
  471. return res
  472. }
  473. // computeHashes computes the hashes under the node from which is called the
  474. // method. Returns an array of key-values to store in the db
  475. func (n *node) computeHashes(currLvl, maxLvl int, p *params, pairs [][2][]byte) (
  476. [][2][]byte, error) {
  477. if n == nil || currLvl >= maxLvl {
  478. // no need to compute any hash
  479. return pairs, nil
  480. }
  481. if pairs == nil {
  482. pairs = [][2][]byte{}
  483. }
  484. var err error
  485. t := n.typ()
  486. switch t {
  487. case vtLeaf:
  488. p.dbg.incHash()
  489. leafKey, leafValue, err := newLeafValue(p.hashFunction, n.k, n.v)
  490. if err != nil {
  491. return pairs, err
  492. }
  493. n.h = leafKey
  494. kv := [2][]byte{leafKey, leafValue}
  495. pairs = append(pairs, kv)
  496. case vtMid:
  497. if n.l != nil {
  498. pairs, err = n.l.computeHashes(currLvl+1, maxLvl, p, pairs)
  499. if err != nil {
  500. return pairs, err
  501. }
  502. } else {
  503. n.l = &node{
  504. h: p.emptyHash,
  505. }
  506. }
  507. if n.r != nil {
  508. pairs, err = n.r.computeHashes(currLvl+1, maxLvl, p, pairs)
  509. if err != nil {
  510. return pairs, err
  511. }
  512. } else {
  513. n.r = &node{
  514. h: p.emptyHash,
  515. }
  516. }
  517. // once the sub nodes are computed, can compute the current node
  518. // hash
  519. p.dbg.incHash()
  520. k, v, err := newIntermediate(p.hashFunction, n.l.h, n.r.h)
  521. if err != nil {
  522. return nil, err
  523. }
  524. n.h = k
  525. kv := [2][]byte{k, v}
  526. pairs = append(pairs, kv)
  527. case vtEmpty:
  528. default:
  529. return nil, fmt.Errorf("ERR:n.computeHashes type (%d) no match", t) // TODO TMP
  530. }
  531. return pairs, nil
  532. }
  533. //nolint:unused
  534. func (t *vt) graphviz(w io.Writer) error {
  535. fmt.Fprintf(w, `digraph hierarchy {
  536. node [fontname=Monospace,fontsize=10,shape=box]
  537. `)
  538. if _, err := t.root.graphviz(w, t.params, 0); err != nil {
  539. return err
  540. }
  541. fmt.Fprintf(w, "}\n")
  542. return nil
  543. }
  544. //nolint:unused
  545. func (n *node) graphviz(w io.Writer, p *params, nEmpties int) (int, error) {
  546. if n == nil {
  547. return nEmpties, nil
  548. }
  549. t := n.typ()
  550. switch t {
  551. case vtLeaf:
  552. leafKey, _, err := newLeafValue(p.hashFunction, n.k, n.v)
  553. if err != nil {
  554. return nEmpties, err
  555. }
  556. fmt.Fprintf(w, "\"%p\" [style=filled,label=\"%v\"];\n", n, hex.EncodeToString(leafKey[:nChars]))
  557. k := n.k
  558. v := n.v
  559. if len(n.k) >= nChars {
  560. k = n.k[:nChars]
  561. }
  562. if len(n.v) >= nChars {
  563. v = n.v[:nChars]
  564. }
  565. fmt.Fprintf(w, "\"%p\" -> {\"k:%v\\nv:%v\"}\n", n,
  566. hex.EncodeToString(k),
  567. hex.EncodeToString(v))
  568. fmt.Fprintf(w, "\"k:%v\\nv:%v\" [style=dashed]\n",
  569. hex.EncodeToString(k),
  570. hex.EncodeToString(v))
  571. case vtMid:
  572. fmt.Fprintf(w, "\"%p\" [label=\"\"];\n", n)
  573. lStr := fmt.Sprintf("%p", n.l)
  574. rStr := fmt.Sprintf("%p", n.r)
  575. eStr := ""
  576. if n.l == nil {
  577. lStr = fmt.Sprintf("empty%v", nEmpties)
  578. eStr += fmt.Sprintf("\"%v\" [style=dashed,label=0];\n",
  579. lStr)
  580. nEmpties++
  581. }
  582. if n.r == nil {
  583. rStr = fmt.Sprintf("empty%v", nEmpties)
  584. eStr += fmt.Sprintf("\"%v\" [style=dashed,label=0];\n",
  585. rStr)
  586. nEmpties++
  587. }
  588. fmt.Fprintf(w, "\"%p\" -> {\"%v\" \"%v\"}\n", n, lStr, rStr)
  589. fmt.Fprint(w, eStr)
  590. nEmpties, err := n.l.graphviz(w, p, nEmpties)
  591. if err != nil {
  592. return nEmpties, err
  593. }
  594. nEmpties, err = n.r.graphviz(w, p, nEmpties)
  595. if err != nil {
  596. return nEmpties, err
  597. }
  598. case vtEmpty:
  599. default:
  600. return nEmpties, fmt.Errorf("ERR")
  601. }
  602. return nEmpties, nil
  603. }
  604. //nolint:unused
  605. func (t *vt) printGraphviz() error {
  606. w := bytes.NewBufferString("")
  607. fmt.Fprintf(w,
  608. "--------\nGraphviz:\n")
  609. err := t.graphviz(w)
  610. if err != nil {
  611. fmt.Println(w)
  612. return err
  613. }
  614. fmt.Fprintf(w,
  615. "End of Graphviz --------\n")
  616. fmt.Println(w)
  617. return nil
  618. }