mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-08 11:56:46 +01:00
Compare commits
1 Commits
feature/fa
...
feature/fa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e68c64db75 |
@@ -49,8 +49,6 @@ type KVDB struct {
|
|||||||
CurrentIdx common.Idx
|
CurrentIdx common.Idx
|
||||||
CurrentBatch common.BatchNum
|
CurrentBatch common.BatchNum
|
||||||
m sync.Mutex
|
m sync.Mutex
|
||||||
mutexDelOld sync.Mutex
|
|
||||||
wg sync.WaitGroup
|
|
||||||
last *Last
|
last *Last
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -446,15 +444,10 @@ func (k *KVDB) MakeCheckpoint() error {
|
|||||||
return tracerr.Wrap(err)
|
return tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// delete old checkpoints
|
||||||
k.wg.Add(1)
|
if err := k.deleteOldCheckpoints(); err != nil {
|
||||||
go func() {
|
return tracerr.Wrap(err)
|
||||||
delErr := k.DeleteOldCheckpoints()
|
|
||||||
if delErr != nil {
|
|
||||||
log.Errorw("delete old checkpoints failed", "err", delErr)
|
|
||||||
}
|
}
|
||||||
k.wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -516,12 +509,9 @@ func (k *KVDB) ListCheckpoints() ([]int, error) {
|
|||||||
return checkpoints, nil
|
return checkpoints, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteOldCheckpoints deletes old checkpoints when there are more than
|
// deleteOldCheckpoints deletes old checkpoints when there are more than
|
||||||
// `s.keep` checkpoints
|
// `s.keep` checkpoints
|
||||||
func (k *KVDB) DeleteOldCheckpoints() error {
|
func (k *KVDB) deleteOldCheckpoints() error {
|
||||||
k.mutexDelOld.Lock()
|
|
||||||
defer k.mutexDelOld.Unlock()
|
|
||||||
|
|
||||||
list, err := k.ListCheckpoints()
|
list, err := k.ListCheckpoints()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return tracerr.Wrap(err)
|
return tracerr.Wrap(err)
|
||||||
@@ -594,6 +584,4 @@ func (k *KVDB) Close() {
|
|||||||
if k.last != nil {
|
if k.last != nil {
|
||||||
k.last.close()
|
k.last.close()
|
||||||
}
|
}
|
||||||
// wait for deletion of old checkpoints
|
|
||||||
k.wg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hermeznetwork/hermez-node/common"
|
"github.com/hermeznetwork/hermez-node/common"
|
||||||
@@ -191,67 +190,12 @@ func TestDeleteOldCheckpoints(t *testing.T) {
|
|||||||
for i := 0; i < numCheckpoints; i++ {
|
for i := 0; i < numCheckpoints; i++ {
|
||||||
err = db.MakeCheckpoint()
|
err = db.MakeCheckpoint()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = db.DeleteOldCheckpoints()
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkpoints, err := db.ListCheckpoints()
|
checkpoints, err := db.ListCheckpoints()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.LessOrEqual(t, len(checkpoints), keep)
|
assert.LessOrEqual(t, len(checkpoints), keep)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConcurrentDeleteOldCheckpoints(t *testing.T) {
|
|
||||||
dir, err := ioutil.TempDir("", "tmpdb")
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer require.NoError(t, os.RemoveAll(dir))
|
|
||||||
|
|
||||||
keep := 16
|
|
||||||
db, err := NewKVDB(Config{Path: dir, Keep: keep})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
numCheckpoints := 32
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(numCheckpoints)
|
|
||||||
|
|
||||||
// do checkpoints and check that we never have more than `keep`
|
|
||||||
// checkpoints.
|
|
||||||
// 1 async DeleteOldCheckpoint after 1 MakeCheckpoint
|
|
||||||
for i := 0; i < numCheckpoints; i++ {
|
|
||||||
err = db.MakeCheckpoint()
|
|
||||||
require.NoError(t, err)
|
|
||||||
go func() {
|
|
||||||
err = db.DeleteOldCheckpoints()
|
|
||||||
require.NoError(t, err)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
checkpoints, err := db.ListCheckpoints()
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.LessOrEqual(t, len(checkpoints), keep)
|
|
||||||
|
|
||||||
wg.Add(numCheckpoints)
|
|
||||||
|
|
||||||
// do checkpoints and check that we never have more than `keep`
|
|
||||||
// checkpoints
|
|
||||||
// 32 concurrent DeleteOldCheckpoint after 32 MakeCheckpoint
|
|
||||||
for i := 0; i < numCheckpoints; i++ {
|
|
||||||
err = db.MakeCheckpoint()
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
for i := 0; i < numCheckpoints; i++ {
|
|
||||||
go func() {
|
|
||||||
err = db.DeleteOldCheckpoints()
|
|
||||||
require.NoError(t, err)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
checkpoints, err = db.ListCheckpoints()
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.LessOrEqual(t, len(checkpoints), keep)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetCurrentIdx(t *testing.T) {
|
func TestGetCurrentIdx(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "tmpdb")
|
dir, err := ioutil.TempDir("", "tmpdb")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|||||||
@@ -227,12 +227,6 @@ func (s *StateDB) MakeCheckpoint() error {
|
|||||||
return s.db.MakeCheckpoint()
|
return s.db.MakeCheckpoint()
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteOldCheckpoints deletes old checkpoints when there are more than
|
|
||||||
// `cfg.keep` checkpoints
|
|
||||||
func (s *StateDB) DeleteOldCheckpoints() error {
|
|
||||||
return s.db.DeleteOldCheckpoints()
|
|
||||||
}
|
|
||||||
|
|
||||||
// CurrentBatch returns the current in-memory CurrentBatch of the StateDB.db
|
// CurrentBatch returns the current in-memory CurrentBatch of the StateDB.db
|
||||||
func (s *StateDB) CurrentBatch() common.BatchNum {
|
func (s *StateDB) CurrentBatch() common.BatchNum {
|
||||||
return s.db.CurrentBatch
|
return s.db.CurrentBatch
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
ethCommon "github.com/ethereum/go-ethereum/common"
|
ethCommon "github.com/ethereum/go-ethereum/common"
|
||||||
@@ -589,48 +588,6 @@ func TestDeleteOldCheckpoints(t *testing.T) {
|
|||||||
for i := 0; i < numCheckpoints; i++ {
|
for i := 0; i < numCheckpoints; i++ {
|
||||||
err = sdb.MakeCheckpoint()
|
err = sdb.MakeCheckpoint()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err := sdb.DeleteOldCheckpoints()
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkpoints, err := sdb.db.ListCheckpoints()
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.LessOrEqual(t, len(checkpoints), keep)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConcurrentDeleteOldCheckpoints performs almost the same test than
|
|
||||||
// kvdb/kvdb_test.go TestConcurrentDeleteOldCheckpoints, but over the StateDB
|
|
||||||
func TestConcurrentDeleteOldCheckpoints(t *testing.T) {
|
|
||||||
dir, err := ioutil.TempDir("", "tmpdb")
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer require.NoError(t, os.RemoveAll(dir))
|
|
||||||
|
|
||||||
keep := 16
|
|
||||||
sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
numCheckpoints := 32
|
|
||||||
// do checkpoints and check that we never have more than `keep`
|
|
||||||
// checkpoints
|
|
||||||
for i := 0; i < numCheckpoints; i++ {
|
|
||||||
err = sdb.MakeCheckpoint()
|
|
||||||
require.NoError(t, err)
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
n := 10
|
|
||||||
wg.Add(n)
|
|
||||||
for j := 0; j < n; j++ {
|
|
||||||
go func() {
|
|
||||||
err := sdb.DeleteOldCheckpoints()
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkpoints, err := sdb.db.ListCheckpoints()
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.LessOrEqual(t, len(checkpoints), keep)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
_, err := sdb.db.ListCheckpoints()
|
|
||||||
// only checking here for absence of errors, not the count of checkpoints
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
checkpoints, err := sdb.db.ListCheckpoints()
|
checkpoints, err := sdb.db.ListCheckpoints()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.LessOrEqual(t, len(checkpoints), keep)
|
assert.LessOrEqual(t, len(checkpoints), keep)
|
||||||
|
|||||||
@@ -245,15 +245,15 @@ func (c *EthereumClient) EthBlockByNumber(ctx context.Context, number int64) (*c
|
|||||||
if number == -1 {
|
if number == -1 {
|
||||||
blockNum = nil
|
blockNum = nil
|
||||||
}
|
}
|
||||||
header, err := c.client.HeaderByNumber(ctx, blockNum)
|
block, err := c.client.BlockByNumber(ctx, blockNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, tracerr.Wrap(err)
|
return nil, tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
b := &common.Block{
|
b := &common.Block{
|
||||||
Num: header.Number.Int64(),
|
Num: block.Number().Int64(),
|
||||||
Timestamp: time.Unix(int64(header.Time), 0),
|
Timestamp: time.Unix(int64(block.Time()), 0),
|
||||||
ParentHash: header.ParentHash,
|
ParentHash: block.ParentHash(),
|
||||||
Hash: header.Hash(),
|
Hash: block.Hash(),
|
||||||
}
|
}
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,6 +23,12 @@ const (
|
|||||||
// errStrUnknownBlock is the string returned by geth when querying an
|
// errStrUnknownBlock is the string returned by geth when querying an
|
||||||
// unknown block
|
// unknown block
|
||||||
errStrUnknownBlock = "unknown block"
|
errStrUnknownBlock = "unknown block"
|
||||||
|
// updateEthBlockNumThreshold is a threshold of number of ethereum blocks left to synchronize, such that
|
||||||
|
// if we have more blocks to sync than the defined value we can aggressively skip calling UpdateEth
|
||||||
|
updateEthBlockNumThreshold = 100
|
||||||
|
// While having more blocks to sync than updateEthBlockNumThreshold, UpdateEth will be called once in a
|
||||||
|
// defined number of blocks
|
||||||
|
updateEthFrequencyDivider = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -528,9 +534,12 @@ func (s *Synchronizer) Sync(ctx context.Context,
|
|||||||
log.Debugf("ethBlock: num: %v, parent: %v, hash: %v",
|
log.Debugf("ethBlock: num: %v, parent: %v, hash: %v",
|
||||||
ethBlock.Num, ethBlock.ParentHash.String(), ethBlock.Hash.String())
|
ethBlock.Num, ethBlock.ParentHash.String(), ethBlock.Hash.String())
|
||||||
|
|
||||||
|
if nextBlockNum+updateEthBlockNumThreshold >= s.stats.Eth.LastBlock.Num ||
|
||||||
|
nextBlockNum%updateEthFrequencyDivider == 0 {
|
||||||
if err := s.stats.UpdateEth(s.ethClient); err != nil {
|
if err := s.stats.UpdateEth(s.ethClient); err != nil {
|
||||||
return nil, nil, tracerr.Wrap(err)
|
return nil, nil, tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.Debugw("Syncing...",
|
log.Debugw("Syncing...",
|
||||||
"block", nextBlockNum,
|
"block", nextBlockNum,
|
||||||
|
|||||||
Reference in New Issue
Block a user