Browse Source

Merge pull request #423 from hermeznetwork/feature/integration27

Delete old checkpoints in stateDB automatically & Don't log errors when context done
feature/sql-semaphore1
arnau 3 years ago
committed by GitHub
parent
commit
cbcd387f98
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 216 additions and 81 deletions
  1. +1
    -1
      api/api_test.go
  2. +2
    -1
      batchbuilder/batchbuilder.go
  3. +1
    -1
      batchbuilder/batchbuilder_test.go
  4. +1
    -0
      cli/node/cfg.buidler.toml
  5. +2
    -1
      config/config.go
  6. +1
    -1
      coordinator/coordinator_test.go
  7. +2
    -2
      coordinator/purger_test.go
  8. +87
    -21
      db/statedb/statedb.go
  9. +67
    -14
      db/statedb/statedb_test.go
  10. +6
    -6
      db/statedb/txprocessors_test.go
  11. +1
    -1
      db/statedb/utils_test.go
  12. +10
    -10
      db/statedb/zkinputsgen_test.go
  13. +27
    -14
      node/node.go
  14. +1
    -1
      priceupdater/priceupdater.go
  15. +3
    -3
      prover/prover.go
  16. +1
    -1
      synchronizer/synchronizer_test.go
  17. +1
    -1
      test/debugapi/debugapi_test.go
  18. +1
    -1
      txselector/txselector.go
  19. +1
    -1
      txselector/txselector_test.go

+ 1
- 1
api/api_test.go

@ -212,7 +212,7 @@ func TestMain(m *testing.M) {
} }
}() }()
chainID := uint16(0) chainID := uint16(0)
sdb, err := statedb.NewStateDB(dir, statedb.TypeTxSelector, 0, chainID)
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0, chainID)
if err != nil { if err != nil {
panic(err) panic(err)
} }

+ 2
- 1
batchbuilder/batchbuilder.go

@ -29,7 +29,8 @@ type ConfigBatch struct {
// NewBatchBuilder constructs a new BatchBuilder, and executes the bb.Reset // NewBatchBuilder constructs a new BatchBuilder, and executes the bb.Reset
// method // method
func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, configCircuits []ConfigCircuit, batchNum common.BatchNum, nLevels uint64) (*BatchBuilder, error) { func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, configCircuits []ConfigCircuit, batchNum common.BatchNum, nLevels uint64) (*BatchBuilder, error) {
localStateDB, err := statedb.NewLocalStateDB(dbpath, synchronizerStateDB, statedb.TypeBatchBuilder, int(nLevels))
localStateDB, err := statedb.NewLocalStateDB(dbpath, 128, synchronizerStateDB,
statedb.TypeBatchBuilder, int(nLevels))
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }

+ 1
- 1
batchbuilder/batchbuilder_test.go

@ -16,7 +16,7 @@ func TestBatchBuilder(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir)) defer assert.Nil(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
synchDB, err := statedb.NewStateDB(dir, statedb.TypeBatchBuilder, 0, chainID)
synchDB, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 0, chainID)
assert.Nil(t, err) assert.Nil(t, err)
bbDir, err := ioutil.TempDir("", "tmpBatchBuilderDB") bbDir, err := ioutil.TempDir("", "tmpBatchBuilderDB")

+ 1
- 0
cli/node/cfg.buidler.toml

@ -14,6 +14,7 @@ APIAddress = "localhost:12345"
[StateDB] [StateDB]
Path = "/tmp/iden3-test/hermez/statedb" Path = "/tmp/iden3-test/hermez/statedb"
Keep = 256
[PostgreSQL] [PostgreSQL]
Port = 5432 Port = 5432

+ 2
- 1
config/config.go

@ -102,7 +102,8 @@ type Node struct {
Type string `valudate:"required"` Type string `valudate:"required"`
} `validate:"required"` } `validate:"required"`
StateDB struct { StateDB struct {
Path string
Path string `validate:"required"`
Keep int `validate:"required"`
} `validate:"required"` } `validate:"required"`
PostgreSQL struct { PostgreSQL struct {
Port int `validate:"required"` Port int `validate:"required"`

+ 1
- 1
coordinator/coordinator_test.go

@ -98,7 +98,7 @@ func newTestModules(t *testing.T) modules {
syncDBPath, err = ioutil.TempDir("", "tmpSyncDB") syncDBPath, err = ioutil.TempDir("", "tmpSyncDB")
require.NoError(t, err) require.NoError(t, err)
deleteme = append(deleteme, syncDBPath) deleteme = append(deleteme, syncDBPath)
syncStateDB, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, 48, chainID)
syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48, chainID)
assert.NoError(t, err) assert.NoError(t, err)
pass := os.Getenv("POSTGRES_PASS") pass := os.Getenv("POSTGRES_PASS")

+ 2
- 2
coordinator/purger_test.go

@ -28,12 +28,12 @@ func newStateDB(t *testing.T) *statedb.LocalStateDB {
syncDBPath, err := ioutil.TempDir("", "tmpSyncDB") syncDBPath, err := ioutil.TempDir("", "tmpSyncDB")
require.NoError(t, err) require.NoError(t, err)
deleteme = append(deleteme, syncDBPath) deleteme = append(deleteme, syncDBPath)
syncStateDB, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, 48, chainID)
syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48, chainID)
assert.NoError(t, err) assert.NoError(t, err)
stateDBPath, err := ioutil.TempDir("", "tmpStateDB") stateDBPath, err := ioutil.TempDir("", "tmpStateDB")
require.NoError(t, err) require.NoError(t, err)
deleteme = append(deleteme, stateDBPath) deleteme = append(deleteme, stateDBPath)
stateDB, err := statedb.NewLocalStateDB(stateDBPath, syncStateDB, statedb.TypeTxSelector, 0)
stateDB, err := statedb.NewLocalStateDB(stateDBPath, 128, syncStateDB, statedb.TypeTxSelector, 0)
require.NoError(t, err) require.NoError(t, err)
return stateDB return stateDB
} }

+ 87
- 21
db/statedb/statedb.go

@ -3,9 +3,12 @@ package statedb
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"math/big" "math/big"
"os" "os"
"strconv"
"path"
"sort"
"strings"
"github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/hermez-node/log"
@ -51,10 +54,10 @@ var (
const ( const (
// PathBatchNum defines the subpath of the Batch Checkpoint in the // PathBatchNum defines the subpath of the Batch Checkpoint in the
// subpath of the StateDB // subpath of the StateDB
PathBatchNum = "/BatchNum"
PathBatchNum = "BatchNum"
// PathCurrent defines the subpath of the current Batch in the subpath // PathCurrent defines the subpath of the current Batch in the subpath
// of the StateDB // of the StateDB
PathCurrent = "/current"
PathCurrent = "current"
// TypeSynchronizer defines a StateDB used by the Synchronizer, that // TypeSynchronizer defines a StateDB used by the Synchronizer, that
// generates the ExitTree when processing the txs // generates the ExitTree when processing the txs
TypeSynchronizer = "synchronizer" TypeSynchronizer = "synchronizer"
@ -85,14 +88,16 @@ type StateDB struct {
// AccumulatedFees contains the accumulated fees for each token (Coord // AccumulatedFees contains the accumulated fees for each token (Coord
// Idx) in the processed batch // Idx) in the processed batch
AccumulatedFees map[common.Idx]*big.Int AccumulatedFees map[common.Idx]*big.Int
keep int
} }
// NewStateDB creates a new StateDB, allowing to use an in-memory or in-disk // NewStateDB creates a new StateDB, allowing to use an in-memory or in-disk
// storage
func NewStateDB(path string, typ TypeStateDB, nLevels int, chainID uint16) (*StateDB, error) {
// storage. Checkpoints older than the value defined by `keep` will be
// deleted.
func NewStateDB(pathDB string, keep int, typ TypeStateDB, nLevels int, chainID uint16) (*StateDB, error) {
var sto *pebble.PebbleStorage var sto *pebble.PebbleStorage
var err error var err error
sto, err = pebble.NewPebbleStorage(path+PathCurrent, false)
sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
@ -109,11 +114,12 @@ func NewStateDB(path string, typ TypeStateDB, nLevels int, chainID uint16) (*Sta
} }
sdb := &StateDB{ sdb := &StateDB{
path: path,
path: pathDB,
db: sto, db: sto,
mt: mt, mt: mt,
typ: typ, typ: typ,
chainID: chainID, chainID: chainID,
keep: keep,
} }
// load currentBatch // load currentBatch
@ -170,10 +176,9 @@ func (s *StateDB) MakeCheckpoint() error {
s.currentBatch++ s.currentBatch++
log.Debugw("Making StateDB checkpoint", "batch", s.currentBatch, "type", s.typ) log.Debugw("Making StateDB checkpoint", "batch", s.currentBatch, "type", s.typ)
checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(s.currentBatch))
checkpointPath := path.Join(s.path, fmt.Sprintf("%s%d", PathBatchNum, s.currentBatch))
err := s.setCurrentBatch()
if err != nil {
if err := s.setCurrentBatch(); err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@ -188,8 +193,11 @@ func (s *StateDB) MakeCheckpoint() error {
} }
// execute Checkpoint // execute Checkpoint
err = s.db.Pebble().Checkpoint(checkpointPath)
if err != nil {
if err := s.db.Pebble().Checkpoint(checkpointPath); err != nil {
return tracerr.Wrap(err)
}
// delete old checkpoints
if err := s.deleteOldCheckpoints(); err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@ -198,7 +206,7 @@ func (s *StateDB) MakeCheckpoint() error {
// DeleteCheckpoint removes if exist the checkpoint of the given batchNum // DeleteCheckpoint removes if exist the checkpoint of the given batchNum
func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error { func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error {
checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(batchNum))
checkpointPath := path.Join(s.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
@ -207,6 +215,55 @@ func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error {
return os.RemoveAll(checkpointPath) return os.RemoveAll(checkpointPath)
} }
// listCheckpoints returns the list of batchNums of the checkpoints, sorted.
// If there's a gap between the list of checkpoints, an error is returned.
func (s *StateDB) listCheckpoints() ([]int, error) {
files, err := ioutil.ReadDir(s.path)
if err != nil {
return nil, tracerr.Wrap(err)
}
checkpoints := []int{}
var checkpoint int
pattern := fmt.Sprintf("%s%%d", PathBatchNum)
for _, file := range files {
fileName := file.Name()
if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) {
if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil {
return nil, tracerr.Wrap(err)
}
checkpoints = append(checkpoints, checkpoint)
}
}
sort.Ints(checkpoints)
if len(checkpoints) > 0 {
first := checkpoints[0]
for _, checkpoint := range checkpoints[1:] {
first++
if checkpoint != first {
return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint))
}
}
}
return checkpoints, nil
}
// deleteOldCheckpoints deletes old checkpoints when there are more than
// `s.keep` checkpoints
func (s *StateDB) deleteOldCheckpoints() error {
list, err := s.listCheckpoints()
if err != nil {
return tracerr.Wrap(err)
}
if len(list) > s.keep {
for _, checkpoint := range list[:len(list)-s.keep] {
if err := s.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
return tracerr.Wrap(err)
}
}
}
return nil
}
func pebbleMakeCheckpoint(source, dest string) error { func pebbleMakeCheckpoint(source, dest string) error {
// Remove dest folder (if it exists) before doing the checkpoint // Remove dest folder (if it exists) before doing the checkpoint
if _, err := os.Stat(dest); !os.IsNotExist(err) { if _, err := os.Stat(dest); !os.IsNotExist(err) {
@ -252,7 +309,7 @@ func (s *StateDB) Reset(batchNum common.BatchNum) error {
// deleted when MakeCheckpoint overwrites them. `closeCurrent` will close the // deleted when MakeCheckpoint overwrites them. `closeCurrent` will close the
// currently opened db before doing the reset. // currently opened db before doing the reset.
func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error { func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
currentPath := s.path + PathCurrent
currentPath := path.Join(s.path, PathCurrent)
if closeCurrent { if closeCurrent {
if err := s.db.Pebble().Close(); err != nil { if err := s.db.Pebble().Close(); err != nil {
@ -264,6 +321,12 @@ func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
// remove all checkpoints > batchNum
for i := batchNum + 1; i <= s.currentBatch; i++ {
if err := s.DeleteCheckpoint(i); err != nil {
return tracerr.Wrap(err)
}
}
if batchNum == 0 { if batchNum == 0 {
// if batchNum == 0, open the new fresh 'current' // if batchNum == 0, open the new fresh 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false) sto, err := pebble.NewPebbleStorage(currentPath, false)
@ -285,7 +348,7 @@ func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
return nil return nil
} }
checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(batchNum))
checkpointPath := path.Join(s.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
// copy 'BatchNumX' to 'current' // copy 'BatchNumX' to 'current'
err = pebbleMakeCheckpoint(checkpointPath, currentPath) err = pebbleMakeCheckpoint(checkpointPath, currentPath)
if err != nil { if err != nil {
@ -521,9 +584,11 @@ type LocalStateDB struct {
} }
// NewLocalStateDB returns a new LocalStateDB connected to the given // NewLocalStateDB returns a new LocalStateDB connected to the given
// synchronizerDB
func NewLocalStateDB(path string, synchronizerDB *StateDB, typ TypeStateDB, nLevels int) (*LocalStateDB, error) {
s, err := NewStateDB(path, typ, nLevels, synchronizerDB.chainID)
// synchronizerDB. Checkpoints older than the value defined by `keep` will be
// deleted.
func NewLocalStateDB(path string, keep int, synchronizerDB *StateDB, typ TypeStateDB,
nLevels int) (*LocalStateDB, error) {
s, err := NewStateDB(path, keep, typ, nLevels, synchronizerDB.chainID)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
@ -541,9 +606,10 @@ func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) er
return nil return nil
} }
synchronizerCheckpointPath := l.synchronizerStateDB.path + PathBatchNum + strconv.Itoa(int(batchNum))
checkpointPath := l.path + PathBatchNum + strconv.Itoa(int(batchNum))
currentPath := l.path + PathCurrent
synchronizerCheckpointPath := path.Join(l.synchronizerStateDB.path,
fmt.Sprintf("%s%d", PathBatchNum, batchNum))
checkpointPath := path.Join(l.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
currentPath := path.Join(l.path, PathCurrent)
if fromSynchronizer { if fromSynchronizer {
// use checkpoint from SynchronizerStateDB // use checkpoint from SynchronizerStateDB

+ 67
- 14
db/statedb/statedb_test.go

@ -46,7 +46,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err) assert.NoError(t, err)
// test values // test values
@ -68,7 +68,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
// call NewStateDB which should get the db at the last checkpoint state // call NewStateDB which should get the db at the last checkpoint state
// executing a Reset (discarding the last 'testkey0'&'testvalue0' data) // executing a Reset (discarding the last 'testkey0'&'testvalue0' data)
sdb, err = NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err) assert.NoError(t, err)
v, err = sdb.db.Get(k0) v, err = sdb.db.Get(k0)
assert.NotNil(t, err) assert.NotNil(t, err)
@ -110,7 +110,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
// call NewStateDB which should get the db at the last checkpoint state // call NewStateDB which should get the db at the last checkpoint state
// executing a Reset (discarding the last 'testkey1'&'testvalue1' data) // executing a Reset (discarding the last 'testkey1'&'testvalue1' data)
sdb, err = NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err) assert.NoError(t, err)
v, err = sdb.db.Get(k0) v, err = sdb.db.Get(k0)
@ -129,7 +129,7 @@ func TestStateDBWithoutMT(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err) assert.NoError(t, err)
// create test accounts // create test accounts
@ -184,7 +184,7 @@ func TestStateDBWithMT(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err) assert.NoError(t, err)
// create test accounts // create test accounts
@ -237,7 +237,7 @@ func TestCheckpoints(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err) assert.NoError(t, err)
// create test accounts // create test accounts
@ -291,20 +291,20 @@ func TestCheckpoints(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, common.BatchNum(4), cb) assert.Equal(t, common.BatchNum(4), cb)
err = sdb.DeleteCheckpoint(common.BatchNum(9))
err = sdb.DeleteCheckpoint(common.BatchNum(1))
assert.NoError(t, err) assert.NoError(t, err)
err = sdb.DeleteCheckpoint(common.BatchNum(10))
err = sdb.DeleteCheckpoint(common.BatchNum(2))
assert.NoError(t, err) assert.NoError(t, err)
err = sdb.DeleteCheckpoint(common.BatchNum(9)) // does not exist, should return err
err = sdb.DeleteCheckpoint(common.BatchNum(1)) // does not exist, should return err
assert.NotNil(t, err) assert.NotNil(t, err)
err = sdb.DeleteCheckpoint(common.BatchNum(11)) // does not exist, should return err
err = sdb.DeleteCheckpoint(common.BatchNum(2)) // does not exist, should return err
assert.NotNil(t, err) assert.NotNil(t, err)
// Create a LocalStateDB from the initial StateDB // Create a LocalStateDB from the initial StateDB
dirLocal, err := ioutil.TempDir("", "ldb") dirLocal, err := ioutil.TempDir("", "ldb")
require.NoError(t, err) require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dirLocal)) defer assert.NoError(t, os.RemoveAll(dirLocal))
ldb, err := NewLocalStateDB(dirLocal, sdb, TypeBatchBuilder, 32)
ldb, err := NewLocalStateDB(dirLocal, 128, sdb, TypeBatchBuilder, 32)
assert.NoError(t, err) assert.NoError(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
@ -325,7 +325,7 @@ func TestCheckpoints(t *testing.T) {
dirLocal2, err := ioutil.TempDir("", "ldb2") dirLocal2, err := ioutil.TempDir("", "ldb2")
require.NoError(t, err) require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dirLocal2)) defer assert.NoError(t, os.RemoveAll(dirLocal2))
ldb2, err := NewLocalStateDB(dirLocal2, sdb, TypeBatchBuilder, 32)
ldb2, err := NewLocalStateDB(dirLocal2, 128, sdb, TypeBatchBuilder, 32)
assert.NoError(t, err) assert.NoError(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
@ -355,7 +355,7 @@ func TestStateDBGetAccounts(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err) assert.NoError(t, err)
// create test accounts // create test accounts
@ -403,7 +403,7 @@ func TestCheckAccountsTreeTestVectors(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
require.NoError(t, err) require.NoError(t, err)
ay0 := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(253), nil), big.NewInt(1)) ay0 := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(253), nil), big.NewInt(1))
@ -469,3 +469,56 @@ func TestCheckAccountsTreeTestVectors(t *testing.T) {
// root value generated by js version: // root value generated by js version:
assert.Equal(t, "17298264051379321456969039521810887093935433569451713402227686942080129181291", sdb.mt.Root().BigInt().String()) assert.Equal(t, "17298264051379321456969039521810887093935433569451713402227686942080129181291", sdb.mt.Root().BigInt().String())
} }
func TestListCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
require.NoError(t, err)
numCheckpoints := 16
// do checkpoints
for i := 0; i < numCheckpoints; i++ {
err = sdb.MakeCheckpoint()
require.NoError(t, err)
}
list, err := sdb.listCheckpoints()
require.NoError(t, err)
assert.Equal(t, numCheckpoints, len(list))
assert.Equal(t, 1, list[0])
assert.Equal(t, numCheckpoints, list[len(list)-1])
numReset := 10
err = sdb.Reset(common.BatchNum(numReset))
require.NoError(t, err)
list, err = sdb.listCheckpoints()
require.NoError(t, err)
assert.Equal(t, numReset, len(list))
assert.Equal(t, 1, list[0])
assert.Equal(t, numReset, list[len(list)-1])
}
func TestDeleteOldCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
keep := 16
sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32, chainID)
require.NoError(t, err)
numCheckpoints := 32
// do checkpoints and check that we never have more than `keep`
// checkpoints
for i := 0; i < numCheckpoints; i++ {
err = sdb.MakeCheckpoint()
require.NoError(t, err)
checkpoints, err := sdb.listCheckpoints()
require.NoError(t, err)
assert.LessOrEqual(t, len(checkpoints), keep)
}
}

+ 6
- 6
db/statedb/txprocessors_test.go

@ -29,7 +29,7 @@ func TestComputeEffectiveAmounts(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err) assert.NoError(t, err)
set := ` set := `
@ -203,7 +203,7 @@ func TestProcessTxsBalances(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err) assert.NoError(t, err)
// generate test transactions from test.SetBlockchain0 code // generate test transactions from test.SetBlockchain0 code
@ -336,7 +336,7 @@ func TestProcessTxsSynchronizer(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err) assert.NoError(t, err)
// generate test transactions from test.SetBlockchain0 code // generate test transactions from test.SetBlockchain0 code
@ -465,7 +465,7 @@ func TestProcessTxsBatchBuilder(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID)
assert.NoError(t, err) assert.NoError(t, err)
// generate test transactions from test.SetBlockchain0 code // generate test transactions from test.SetBlockchain0 code
@ -554,7 +554,7 @@ func TestProcessTxsRootTestVectors(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID)
assert.NoError(t, err) assert.NoError(t, err)
// same values than in the js test // same values than in the js test
@ -603,7 +603,7 @@ func TestCreateAccountDepositMaxValue(t *testing.T) {
nLevels := 16 nLevels := 16
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.NoError(t, err) assert.NoError(t, err)
users := generateJsUsers(t) users := generateJsUsers(t)

+ 1
- 1
db/statedb/utils_test.go

@ -21,7 +21,7 @@ func TestGetIdx(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err) assert.NoError(t, err)
var sk babyjub.PrivateKey var sk babyjub.PrivateKey

+ 10
- 10
db/statedb/zkinputsgen_test.go

@ -79,7 +79,7 @@ func TestZKInputsHashTestVector0(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir)) defer assert.Nil(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID)
assert.Nil(t, err) assert.Nil(t, err)
// same values than in the js test // same values than in the js test
@ -154,7 +154,7 @@ func TestZKInputsHashTestVector1(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir)) defer assert.Nil(t, os.RemoveAll(dir))
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID)
assert.Nil(t, err) assert.Nil(t, err)
// same values than in the js test // same values than in the js test
@ -254,7 +254,7 @@ func TestZKInputsEmpty(t *testing.T) {
nLevels := 16 nLevels := 16
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err) assert.Nil(t, err)
ptc := ProcessTxsConfig{ ptc := ProcessTxsConfig{
@ -403,7 +403,7 @@ func TestZKInputs0(t *testing.T) {
nLevels := 16 nLevels := 16
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err) assert.Nil(t, err)
// same values than in the js test // same values than in the js test
@ -491,7 +491,7 @@ func TestZKInputs1(t *testing.T) {
nLevels := 16 nLevels := 16
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err) assert.Nil(t, err)
// same values than in the js test // same values than in the js test
@ -598,7 +598,7 @@ func TestZKInputs2(t *testing.T) {
nLevels := 16 nLevels := 16
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err) assert.Nil(t, err)
// same values than in the js test // same values than in the js test
@ -742,7 +742,7 @@ func TestZKInputs3(t *testing.T) {
nLevels := 16 nLevels := 16
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err) assert.Nil(t, err)
// same values than in the js test // same values than in the js test
@ -886,7 +886,7 @@ func TestZKInputs4(t *testing.T) {
nLevels := 16 nLevels := 16
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err) assert.Nil(t, err)
// same values than in the js test // same values than in the js test
@ -1040,7 +1040,7 @@ func TestZKInputs5(t *testing.T) {
nLevels := 16 nLevels := 16
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err) assert.Nil(t, err)
// same values than in the js test // same values than in the js test
@ -1164,7 +1164,7 @@ func TestZKInputs6(t *testing.T) {
nLevels := 16 nLevels := 16
chainID := uint16(0) chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err) assert.Nil(t, err)
// Coordinator Idx where to send the fees // Coordinator Idx where to send the fees

+ 27
- 14
node/node.go

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"strings"
"sync" "sync"
"time" "time"
@ -131,7 +130,13 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
} }
chainIDU16 := uint16(chainIDU64) chainIDU16 := uint16(chainIDU64)
stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, statedb.TypeSynchronizer, 32, chainIDU16)
const safeStateDBKeep = 128
if cfg.StateDB.Keep < safeStateDBKeep {
return nil, tracerr.Wrap(fmt.Errorf("cfg.StateDB.Keep = %v < %v, which is unsafe",
cfg.StateDB.Keep, safeStateDBKeep))
}
stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, cfg.StateDB.Keep,
statedb.TypeSynchronizer, 32, chainIDU16)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
@ -408,22 +413,17 @@ func (n *Node) handleReorg(stats *synchronizer.Stats) {
// TODO(Edu): Consider keeping the `lastBlock` inside synchronizer so that we // TODO(Edu): Consider keeping the `lastBlock` inside synchronizer so that we
// don't have to pass it around. // don't have to pass it around.
func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration) {
blockData, discarded, err := n.sync.Sync2(n.ctx, lastBlock)
func (n *Node) syncLoopFn(ctx context.Context, lastBlock *common.Block) (*common.Block, time.Duration, error) {
blockData, discarded, err := n.sync.Sync2(ctx, lastBlock)
stats := n.sync.Stats() stats := n.sync.Stats()
if err != nil { if err != nil {
// case: error // case: error
if strings.Contains(err.Error(), "context canceled") {
log.Warnw("Synchronizer.Sync", "err", err)
} else {
log.Errorw("Synchronizer.Sync", "err", err)
}
return nil, n.cfg.Synchronizer.SyncLoopInterval.Duration
return nil, n.cfg.Synchronizer.SyncLoopInterval.Duration, tracerr.Wrap(err)
} else if discarded != nil { } else if discarded != nil {
// case: reorg // case: reorg
log.Infow("Synchronizer.Sync reorg", "discarded", *discarded) log.Infow("Synchronizer.Sync reorg", "discarded", *discarded)
n.handleReorg(stats) n.handleReorg(stats)
return nil, time.Duration(0)
return nil, time.Duration(0), nil
} else if blockData != nil { } else if blockData != nil {
// case: new block // case: new block
n.handleNewBlock(stats, synchronizer.SCVariablesPtr{ n.handleNewBlock(stats, synchronizer.SCVariablesPtr{
@ -431,10 +431,10 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration
Auction: blockData.Auction.Vars, Auction: blockData.Auction.Vars,
WDelayer: blockData.WDelayer.Vars, WDelayer: blockData.WDelayer.Vars,
}, blockData.Rollup.Batches) }, blockData.Rollup.Batches)
return &blockData.Block, time.Duration(0)
return &blockData.Block, time.Duration(0), nil
} else { } else {
// case: no block // case: no block
return lastBlock, n.cfg.Synchronizer.SyncLoopInterval.Duration
return lastBlock, n.cfg.Synchronizer.SyncLoopInterval.Duration, nil
} }
} }
@ -453,6 +453,7 @@ func (n *Node) StartSynchronizer() {
n.wg.Add(1) n.wg.Add(1)
go func() { go func() {
var err error
var lastBlock *common.Block var lastBlock *common.Block
waitDuration := time.Duration(0) waitDuration := time.Duration(0)
for { for {
@ -462,7 +463,13 @@ func (n *Node) StartSynchronizer() {
n.wg.Done() n.wg.Done()
return return
case <-time.After(waitDuration): case <-time.After(waitDuration):
lastBlock, waitDuration = n.syncLoopFn(lastBlock)
if lastBlock, waitDuration, err = n.syncLoopFn(n.ctx,
lastBlock); err != nil {
if n.ctx.Err() != nil {
continue
}
log.Errorw("Synchronizer.Sync", "err", err)
}
} }
} }
}() }()
@ -495,6 +502,9 @@ func (n *Node) StartDebugAPI() {
n.wg.Done() n.wg.Done()
}() }()
if err := n.debugAPI.Run(n.ctx); err != nil { if err := n.debugAPI.Run(n.ctx); err != nil {
if n.ctx.Err() != nil {
return
}
log.Fatalw("DebugAPI.Run", "err", err) log.Fatalw("DebugAPI.Run", "err", err)
} }
}() }()
@ -510,6 +520,9 @@ func (n *Node) StartNodeAPI() {
n.wg.Done() n.wg.Done()
}() }()
if err := n.nodeAPI.Run(n.ctx); err != nil { if err := n.nodeAPI.Run(n.ctx); err != nil {
if n.ctx.Err() != nil {
return
}
log.Fatalw("NodeAPI.Run", "err", err) log.Fatalw("NodeAPI.Run", "err", err)
} }
}() }()

+ 1
- 1
priceupdater/priceupdater.go

@ -68,7 +68,7 @@ func getTokenPriceBitfinex(ctx context.Context, client *sling.Sling,
return 0, tracerr.Wrap(err) return 0, tracerr.Wrap(err)
} }
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
return 0, fmt.Errorf("http response is not is %v", res.StatusCode)
return 0, tracerr.Wrap(fmt.Errorf("http response is not is %v", res.StatusCode))
} }
return state[6], nil return state[6], nil
} }

+ 3
- 3
prover/prover.go

@ -48,7 +48,7 @@ func (p *Proof) UnmarshalJSON(data []byte) error {
p.PiA[1] = (*big.Int)(proof.PiA[1]) p.PiA[1] = (*big.Int)(proof.PiA[1])
p.PiA[2] = (*big.Int)(proof.PiA[2]) p.PiA[2] = (*big.Int)(proof.PiA[2])
if p.PiA[2].Int64() != 1 { if p.PiA[2].Int64() != 1 {
return fmt.Errorf("Expected PiA[2] == 1, but got %v", p.PiA[2])
return tracerr.Wrap(fmt.Errorf("Expected PiA[2] == 1, but got %v", p.PiA[2]))
} }
p.PiB[0][0] = (*big.Int)(proof.PiB[0][0]) p.PiB[0][0] = (*big.Int)(proof.PiB[0][0])
p.PiB[0][1] = (*big.Int)(proof.PiB[0][1]) p.PiB[0][1] = (*big.Int)(proof.PiB[0][1])
@ -57,13 +57,13 @@ func (p *Proof) UnmarshalJSON(data []byte) error {
p.PiB[2][0] = (*big.Int)(proof.PiB[2][0]) p.PiB[2][0] = (*big.Int)(proof.PiB[2][0])
p.PiB[2][1] = (*big.Int)(proof.PiB[2][1]) p.PiB[2][1] = (*big.Int)(proof.PiB[2][1])
if p.PiB[2][0].Int64() != 1 || p.PiB[2][1].Int64() != 0 { if p.PiB[2][0].Int64() != 1 || p.PiB[2][1].Int64() != 0 {
return fmt.Errorf("Expected PiB[2] == [1, 0], but got %v", p.PiB[2])
return tracerr.Wrap(fmt.Errorf("Expected PiB[2] == [1, 0], but got %v", p.PiB[2]))
} }
p.PiC[0] = (*big.Int)(proof.PiC[0]) p.PiC[0] = (*big.Int)(proof.PiC[0])
p.PiC[1] = (*big.Int)(proof.PiC[1]) p.PiC[1] = (*big.Int)(proof.PiC[1])
p.PiC[2] = (*big.Int)(proof.PiC[2]) p.PiC[2] = (*big.Int)(proof.PiC[2])
if p.PiC[2].Int64() != 1 { if p.PiC[2].Int64() != 1 {
return fmt.Errorf("Expected PiC[2] == 1, but got %v", p.PiC[2])
return tracerr.Wrap(fmt.Errorf("Expected PiC[2] == 1, but got %v", p.PiC[2]))
} }
// TODO: Assert ones and zeroes // TODO: Assert ones and zeroes
p.Protocol = proof.Protocol p.Protocol = proof.Protocol

+ 1
- 1
synchronizer/synchronizer_test.go

@ -290,7 +290,7 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) {
require.NoError(t, err) require.NoError(t, err)
deleteme = append(deleteme, dir) deleteme = append(deleteme, dir)
stateDB, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32, chainID)
stateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32, chainID)
require.NoError(t, err) require.NoError(t, err)
// Init History DB // Init History DB

+ 1
- 1
test/debugapi/debugapi_test.go

@ -45,7 +45,7 @@ func TestDebugAPI(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
chainID := uint16(0) chainID := uint16(0)
sdb, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32, chainID)
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32, chainID)
require.Nil(t, err) require.Nil(t, err)
err = sdb.MakeCheckpoint() // Make a checkpoint to increment the batchNum err = sdb.MakeCheckpoint() // Make a checkpoint to increment the batchNum
require.Nil(t, err) require.Nil(t, err)

+ 1
- 1
txselector/txselector.go

@ -72,7 +72,7 @@ type TxSelector struct {
// NewTxSelector returns a *TxSelector // NewTxSelector returns a *TxSelector
func NewTxSelector(coordAccount *CoordAccount, dbpath string, func NewTxSelector(coordAccount *CoordAccount, dbpath string,
synchronizerStateDB *statedb.StateDB, l2 *l2db.L2DB) (*TxSelector, error) { synchronizerStateDB *statedb.StateDB, l2 *l2db.L2DB) (*TxSelector, error) {
localAccountsDB, err := statedb.NewLocalStateDB(dbpath,
localAccountsDB, err := statedb.NewLocalStateDB(dbpath, 128,
synchronizerStateDB, statedb.TypeTxSelector, 0) // without merkletree synchronizerStateDB, statedb.TypeTxSelector, 0) // without merkletree
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)

+ 1
- 1
txselector/txselector_test.go

@ -29,7 +29,7 @@ func initTest(t *testing.T, chainID uint16, testSet string) *TxSelector {
dir, err := ioutil.TempDir("", "tmpdb") dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err) require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir)) defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, statedb.TypeTxSelector, 0, chainID)
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0, chainID)
require.NoError(t, err) require.NoError(t, err)
txselDir, err := ioutil.TempDir("", "tmpTxSelDB") txselDir, err := ioutil.TempDir("", "tmpTxSelDB")

Loading…
Cancel
Save