diff --git a/db/kvdb/kvdb.go b/db/kvdb/kvdb.go index a470ccb..3535376 100644 --- a/db/kvdb/kvdb.go +++ b/db/kvdb/kvdb.go @@ -49,6 +49,8 @@ type KVDB struct { CurrentIdx common.Idx CurrentBatch common.BatchNum m sync.Mutex + mutexDelOld sync.Mutex + wg sync.WaitGroup last *Last } @@ -444,10 +446,15 @@ func (k *KVDB) MakeCheckpoint() error { return tracerr.Wrap(err) } } - // delete old checkpoints - if err := k.deleteOldCheckpoints(); err != nil { - return tracerr.Wrap(err) - } + + k.wg.Add(1) + go func() { + delErr := k.DeleteOldCheckpoints() + if delErr != nil { + log.Errorw("delete old checkpoints failed", "err", delErr) + } + k.wg.Done() + }() return nil } @@ -509,9 +516,12 @@ func (k *KVDB) ListCheckpoints() ([]int, error) { return checkpoints, nil } -// deleteOldCheckpoints deletes old checkpoints when there are more than +// DeleteOldCheckpoints deletes old checkpoints when there are more than // `s.keep` checkpoints -func (k *KVDB) deleteOldCheckpoints() error { +func (k *KVDB) DeleteOldCheckpoints() error { + k.mutexDelOld.Lock() + defer k.mutexDelOld.Unlock() + list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) @@ -584,4 +594,6 @@ func (k *KVDB) Close() { if k.last != nil { k.last.close() } + // wait for deletion of old checkpoints + k.wg.Wait() } diff --git a/db/kvdb/kvdb_test.go b/db/kvdb/kvdb_test.go index 5bbb485..9aacb31 100644 --- a/db/kvdb/kvdb_test.go +++ b/db/kvdb/kvdb_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "os" + "sync" "testing" "github.com/hermeznetwork/hermez-node/common" @@ -190,12 +191,67 @@ func TestDeleteOldCheckpoints(t *testing.T) { for i := 0; i < numCheckpoints; i++ { err = db.MakeCheckpoint() require.NoError(t, err) + err = db.DeleteOldCheckpoints() + require.NoError(t, err) checkpoints, err := db.ListCheckpoints() require.NoError(t, err) assert.LessOrEqual(t, len(checkpoints), keep) } } +func TestConcurrentDeleteOldCheckpoints(t *testing.T) { + dir, err := ioutil.TempDir("", "tmpdb") + require.NoError(t, err) + defer require.NoError(t, os.RemoveAll(dir)) + + keep := 16 + db, err := NewKVDB(Config{Path: dir, Keep: keep}) + require.NoError(t, err) + + numCheckpoints := 32 + + var wg sync.WaitGroup + wg.Add(numCheckpoints) + + // do checkpoints and check that we never have more than `keep` + // checkpoints. + // 1 async DeleteOldCheckpoint after 1 MakeCheckpoint + for i := 0; i < numCheckpoints; i++ { + err = db.MakeCheckpoint() + require.NoError(t, err) + go func() { + err = db.DeleteOldCheckpoints() + require.NoError(t, err) + wg.Done() + }() + } + wg.Wait() + checkpoints, err := db.ListCheckpoints() + require.NoError(t, err) + assert.LessOrEqual(t, len(checkpoints), keep) + + wg.Add(numCheckpoints) + + // do checkpoints and check that we never have more than `keep` + // checkpoints + // 32 concurrent DeleteOldCheckpoint after 32 MakeCheckpoint + for i := 0; i < numCheckpoints; i++ { + err = db.MakeCheckpoint() + require.NoError(t, err) + } + for i := 0; i < numCheckpoints; i++ { + go func() { + err = db.DeleteOldCheckpoints() + require.NoError(t, err) + wg.Done() + }() + } + wg.Wait() + checkpoints, err = db.ListCheckpoints() + require.NoError(t, err) + assert.LessOrEqual(t, len(checkpoints), keep) +} + func TestGetCurrentIdx(t *testing.T) { dir, err := ioutil.TempDir("", "tmpdb") require.NoError(t, err) diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index 53370a1..b11e74a 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -227,6 +227,12 @@ func (s *StateDB) MakeCheckpoint() error { return s.db.MakeCheckpoint() } +// DeleteOldCheckpoints deletes old checkpoints when there are more than +// `cfg.keep` checkpoints +func (s *StateDB) DeleteOldCheckpoints() error { + return s.db.DeleteOldCheckpoints() +} + // CurrentBatch returns the current in-memory CurrentBatch of the StateDB.db func (s *StateDB) CurrentBatch() common.BatchNum { return s.db.CurrentBatch diff --git a/db/statedb/statedb_test.go b/db/statedb/statedb_test.go index 0cf46ab..bdc4320 100644 --- a/db/statedb/statedb_test.go +++ b/db/statedb/statedb_test.go @@ -7,6 +7,7 @@ import ( "math/big" "os" "strings" + "sync" "testing" ethCommon "github.com/ethereum/go-ethereum/common" @@ -588,6 +589,48 @@ func TestDeleteOldCheckpoints(t *testing.T) { for i := 0; i < numCheckpoints; i++ { err = sdb.MakeCheckpoint() require.NoError(t, err) + err := sdb.DeleteOldCheckpoints() + require.NoError(t, err) + checkpoints, err := sdb.db.ListCheckpoints() + require.NoError(t, err) + assert.LessOrEqual(t, len(checkpoints), keep) + } +} + +// TestConcurrentDeleteOldCheckpoints performs almost the same test than +// kvdb/kvdb_test.go TestConcurrentDeleteOldCheckpoints, but over the StateDB +func TestConcurrentDeleteOldCheckpoints(t *testing.T) { + dir, err := ioutil.TempDir("", "tmpdb") + require.NoError(t, err) + defer require.NoError(t, os.RemoveAll(dir)) + + keep := 16 + sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) + require.NoError(t, err) + + numCheckpoints := 32 + // do checkpoints and check that we never have more than `keep` + // checkpoints + for i := 0; i < numCheckpoints; i++ { + err = sdb.MakeCheckpoint() + require.NoError(t, err) + wg := sync.WaitGroup{} + n := 10 + wg.Add(n) + for j := 0; j < n; j++ { + go func() { + err := sdb.DeleteOldCheckpoints() + require.NoError(t, err) + checkpoints, err := sdb.db.ListCheckpoints() + require.NoError(t, err) + assert.LessOrEqual(t, len(checkpoints), keep) + wg.Done() + }() + _, err := sdb.db.ListCheckpoints() + // only checking here for absence of errors, not the count of checkpoints + require.NoError(t, err) + } + wg.Wait() checkpoints, err := sdb.db.ListCheckpoints() require.NoError(t, err) assert.LessOrEqual(t, len(checkpoints), keep)