Compare commits

..

2 Commits

Author SHA1 Message Date
Oleksandr Brezhniev
5aa2b0e977 Faster synchronization with asynchronous deletion of old checkpoints 2021-03-30 15:00:27 +03:00
Oleksandr Brezhniev
2125812e90 Faster synchronization with usage of HeaderByNumber instead of BlockByNumber 2021-03-23 23:09:31 +02:00
6 changed files with 130 additions and 22 deletions

View File

@@ -49,6 +49,8 @@ type KVDB struct {
CurrentIdx common.Idx CurrentIdx common.Idx
CurrentBatch common.BatchNum CurrentBatch common.BatchNum
m sync.Mutex m sync.Mutex
mutexDelOld sync.Mutex
wg sync.WaitGroup
last *Last last *Last
} }
@@ -444,10 +446,15 @@ func (k *KVDB) MakeCheckpoint() error {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
} }
// delete old checkpoints
if err := k.deleteOldCheckpoints(); err != nil { k.wg.Add(1)
return tracerr.Wrap(err) go func() {
delErr := k.DeleteOldCheckpoints()
if delErr != nil {
log.Errorw("delete old checkpoints failed", "err", delErr)
} }
k.wg.Done()
}()
return nil return nil
} }
@@ -509,9 +516,12 @@ func (k *KVDB) ListCheckpoints() ([]int, error) {
return checkpoints, nil return checkpoints, nil
} }
// deleteOldCheckpoints deletes old checkpoints when there are more than // DeleteOldCheckpoints deletes old checkpoints when there are more than
// `s.keep` checkpoints // `s.keep` checkpoints
func (k *KVDB) deleteOldCheckpoints() error { func (k *KVDB) DeleteOldCheckpoints() error {
k.mutexDelOld.Lock()
defer k.mutexDelOld.Unlock()
list, err := k.ListCheckpoints() list, err := k.ListCheckpoints()
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
@@ -584,4 +594,6 @@ func (k *KVDB) Close() {
if k.last != nil { if k.last != nil {
k.last.close() k.last.close()
} }
// wait for deletion of old checkpoints
k.wg.Wait()
} }

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"sync"
"testing" "testing"
"github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/common"
@@ -190,12 +191,67 @@ func TestDeleteOldCheckpoints(t *testing.T) {
for i := 0; i < numCheckpoints; i++ { for i := 0; i < numCheckpoints; i++ {
err = db.MakeCheckpoint() err = db.MakeCheckpoint()
require.NoError(t, err) require.NoError(t, err)
err = db.DeleteOldCheckpoints()
require.NoError(t, err)
checkpoints, err := db.ListCheckpoints() checkpoints, err := db.ListCheckpoints()
require.NoError(t, err) require.NoError(t, err)
assert.LessOrEqual(t, len(checkpoints), keep) assert.LessOrEqual(t, len(checkpoints), keep)
} }
} }
func TestConcurrentDeleteOldCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
keep := 16
db, err := NewKVDB(Config{Path: dir, Keep: keep})
require.NoError(t, err)
numCheckpoints := 32
var wg sync.WaitGroup
wg.Add(numCheckpoints)
// do checkpoints and check that we never have more than `keep`
// checkpoints.
// 1 async DeleteOldCheckpoint after 1 MakeCheckpoint
for i := 0; i < numCheckpoints; i++ {
err = db.MakeCheckpoint()
require.NoError(t, err)
go func() {
err = db.DeleteOldCheckpoints()
require.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
checkpoints, err := db.ListCheckpoints()
require.NoError(t, err)
assert.LessOrEqual(t, len(checkpoints), keep)
wg.Add(numCheckpoints)
// do checkpoints and check that we never have more than `keep`
// checkpoints
// 32 concurrent DeleteOldCheckpoint after 32 MakeCheckpoint
for i := 0; i < numCheckpoints; i++ {
err = db.MakeCheckpoint()
require.NoError(t, err)
}
for i := 0; i < numCheckpoints; i++ {
go func() {
err = db.DeleteOldCheckpoints()
require.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
checkpoints, err = db.ListCheckpoints()
require.NoError(t, err)
assert.LessOrEqual(t, len(checkpoints), keep)
}
func TestGetCurrentIdx(t *testing.T) { func TestGetCurrentIdx(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb") dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err) require.NoError(t, err)

View File

@@ -227,6 +227,12 @@ func (s *StateDB) MakeCheckpoint() error {
return s.db.MakeCheckpoint() return s.db.MakeCheckpoint()
} }
// DeleteOldCheckpoints deletes old checkpoints when there are more than
// `cfg.keep` checkpoints
func (s *StateDB) DeleteOldCheckpoints() error {
return s.db.DeleteOldCheckpoints()
}
// CurrentBatch returns the current in-memory CurrentBatch of the StateDB.db // CurrentBatch returns the current in-memory CurrentBatch of the StateDB.db
func (s *StateDB) CurrentBatch() common.BatchNum { func (s *StateDB) CurrentBatch() common.BatchNum {
return s.db.CurrentBatch return s.db.CurrentBatch

View File

@@ -7,6 +7,7 @@ import (
"math/big" "math/big"
"os" "os"
"strings" "strings"
"sync"
"testing" "testing"
ethCommon "github.com/ethereum/go-ethereum/common" ethCommon "github.com/ethereum/go-ethereum/common"
@@ -588,6 +589,48 @@ func TestDeleteOldCheckpoints(t *testing.T) {
for i := 0; i < numCheckpoints; i++ { for i := 0; i < numCheckpoints; i++ {
err = sdb.MakeCheckpoint() err = sdb.MakeCheckpoint()
require.NoError(t, err) require.NoError(t, err)
err := sdb.DeleteOldCheckpoints()
require.NoError(t, err)
checkpoints, err := sdb.db.ListCheckpoints()
require.NoError(t, err)
assert.LessOrEqual(t, len(checkpoints), keep)
}
}
// TestConcurrentDeleteOldCheckpoints performs almost the same test than
// kvdb/kvdb_test.go TestConcurrentDeleteOldCheckpoints, but over the StateDB
func TestConcurrentDeleteOldCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
keep := 16
sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
numCheckpoints := 32
// do checkpoints and check that we never have more than `keep`
// checkpoints
for i := 0; i < numCheckpoints; i++ {
err = sdb.MakeCheckpoint()
require.NoError(t, err)
wg := sync.WaitGroup{}
n := 10
wg.Add(n)
for j := 0; j < n; j++ {
go func() {
err := sdb.DeleteOldCheckpoints()
require.NoError(t, err)
checkpoints, err := sdb.db.ListCheckpoints()
require.NoError(t, err)
assert.LessOrEqual(t, len(checkpoints), keep)
wg.Done()
}()
_, err := sdb.db.ListCheckpoints()
// only checking here for absence of errors, not the count of checkpoints
require.NoError(t, err)
}
wg.Wait()
checkpoints, err := sdb.db.ListCheckpoints() checkpoints, err := sdb.db.ListCheckpoints()
require.NoError(t, err) require.NoError(t, err)
assert.LessOrEqual(t, len(checkpoints), keep) assert.LessOrEqual(t, len(checkpoints), keep)

View File

@@ -245,15 +245,15 @@ func (c *EthereumClient) EthBlockByNumber(ctx context.Context, number int64) (*c
if number == -1 { if number == -1 {
blockNum = nil blockNum = nil
} }
block, err := c.client.BlockByNumber(ctx, blockNum) header, err := c.client.HeaderByNumber(ctx, blockNum)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
b := &common.Block{ b := &common.Block{
Num: block.Number().Int64(), Num: header.Number.Int64(),
Timestamp: time.Unix(int64(block.Time()), 0), Timestamp: time.Unix(int64(header.Time), 0),
ParentHash: block.ParentHash(), ParentHash: header.ParentHash,
Hash: block.Hash(), Hash: header.Hash(),
} }
return b, nil return b, nil
} }

View File

@@ -23,12 +23,6 @@ const (
// errStrUnknownBlock is the string returned by geth when querying an // errStrUnknownBlock is the string returned by geth when querying an
// unknown block // unknown block
errStrUnknownBlock = "unknown block" errStrUnknownBlock = "unknown block"
// updateEthBlockNumThreshold is a threshold of number of ethereum blocks left to synchronize, such that
// if we have more blocks to sync than the defined value we can aggressively skip calling UpdateEth
updateEthBlockNumThreshold = 100
// While having more blocks to sync than updateEthBlockNumThreshold, UpdateEth will be called once in a
// defined number of blocks
updateEthFrequencyDivider = 100
) )
var ( var (
@@ -534,12 +528,9 @@ func (s *Synchronizer) Sync(ctx context.Context,
log.Debugf("ethBlock: num: %v, parent: %v, hash: %v", log.Debugf("ethBlock: num: %v, parent: %v, hash: %v",
ethBlock.Num, ethBlock.ParentHash.String(), ethBlock.Hash.String()) ethBlock.Num, ethBlock.ParentHash.String(), ethBlock.Hash.String())
if nextBlockNum+updateEthBlockNumThreshold >= s.stats.Eth.LastBlock.Num ||
nextBlockNum%updateEthFrequencyDivider == 0 {
if err := s.stats.UpdateEth(s.ethClient); err != nil { if err := s.stats.UpdateEth(s.ethClient); err != nil {
return nil, nil, tracerr.Wrap(err) return nil, nil, tracerr.Wrap(err)
} }
}
log.Debugw("Syncing...", log.Debugw("Syncing...",
"block", nextBlockNum, "block", nextBlockNum,