Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago |
|
// Package kvdb provides a key-value database with Checkpoints & Resets system
package kvdb
import ( "fmt" "io/ioutil" "os" "path" "sort" "strings" "sync"
"github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/tracerr" "github.com/iden3/go-merkletree/db" "github.com/iden3/go-merkletree/db/pebble" )
const ( // PathBatchNum defines the subpath of the Batch Checkpoint in the
// subpath of the KVDB
PathBatchNum = "BatchNum" // PathCurrent defines the subpath of the current Batch in the subpath
// of the KVDB
PathCurrent = "current" // PathLast defines the subpath of the last Batch in the subpath
// of the StateDB
PathLast = "last" // DefaultKeep is the default value for the Keep parameter
DefaultKeep = 128 )
var ( // KeyCurrentBatch is used as key in the db to store the current BatchNum
KeyCurrentBatch = []byte("k:currentbatch") // keyCurrentIdx is used as key in the db to store the CurrentIdx
keyCurrentIdx = []byte("k:idx") // ErrNoLast is returned when the KVDB has been configured to not have
// a Last checkpoint but a Last method is used
ErrNoLast = fmt.Errorf("no last checkpoint") )
// KVDB represents the Key-Value DB object
type KVDB struct { cfg Config db *pebble.Storage // CurrentIdx holds the current Idx that the BatchBuilder is using
CurrentIdx common.Idx CurrentBatch common.BatchNum m sync.Mutex last *Last }
// Last is a consistent view to the last batch of the stateDB that can
// be queried concurrently.
type Last struct { db *pebble.Storage path string rw sync.RWMutex }
func (k *Last) setNew() error { k.rw.Lock() defer k.rw.Unlock() if k.db != nil { k.db.Close() k.db = nil } lastPath := path.Join(k.path, PathLast) if err := os.RemoveAll(lastPath); err != nil { return tracerr.Wrap(err) } db, err := pebble.NewPebbleStorage(lastPath, false) if err != nil { return tracerr.Wrap(err) } k.db = db return nil }
func (k *Last) set(kvdb *KVDB, batchNum common.BatchNum) error { k.rw.Lock() defer k.rw.Unlock() if k.db != nil { k.db.Close() k.db = nil } lastPath := path.Join(k.path, PathLast) if err := kvdb.MakeCheckpointFromTo(batchNum, lastPath); err != nil { return tracerr.Wrap(err) } db, err := pebble.NewPebbleStorage(lastPath, false) if err != nil { return tracerr.Wrap(err) } k.db = db return nil }
func (k *Last) close() { k.rw.Lock() defer k.rw.Unlock() if k.db != nil { k.db.Close() k.db = nil } }
// Config of the KVDB
type Config struct { // Path where the checkpoints will be stored
Path string // Keep is the number of old checkpoints to keep. If 0, all
// checkpoints are kept.
Keep int // At every checkpoint, check that there are no gaps between the
// checkpoints
NoGapsCheck bool // NoLast skips having an opened DB with a checkpoint to the last
// batchNum for thread-safe reads.
NoLast bool }
// NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
// Checkpoints older than the value defined by `keep` will be deleted.
// func NewKVDB(pathDB string, keep int) (*KVDB, error) {
func NewKVDB(cfg Config) (*KVDB, error) { var sto *pebble.Storage var err error sto, err = pebble.NewPebbleStorage(path.Join(cfg.Path, PathCurrent), false) if err != nil { return nil, tracerr.Wrap(err) }
var last *Last if !cfg.NoLast { last = &Last{ path: cfg.Path, } } kvdb := &KVDB{ cfg: cfg, db: sto, last: last, } // load currentBatch
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() if err != nil { return nil, tracerr.Wrap(err) }
// make reset (get checkpoint) at currentBatch
err = kvdb.reset(kvdb.CurrentBatch, true) if err != nil { return nil, tracerr.Wrap(err) }
return kvdb, nil }
// LastRead is a thread-safe method to query the last KVDB
func (k *KVDB) LastRead(fn func(db *pebble.Storage) error) error { if k.last == nil { return tracerr.Wrap(ErrNoLast) } k.last.rw.RLock() defer k.last.rw.RUnlock() return fn(k.last.db) }
// DB returns the *pebble.Storage from the KVDB
func (k *KVDB) DB() *pebble.Storage { return k.db }
// StorageWithPrefix returns the db.Storage with the given prefix from the
// current KVDB
func (k *KVDB) StorageWithPrefix(prefix []byte) db.Storage { return k.db.WithPrefix(prefix) }
// Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
// not delete the checkpoints between old current and the new current, those
// checkpoints will remain in the storage, and eventually will be deleted when
// MakeCheckpoint overwrites them.
func (k *KVDB) Reset(batchNum common.BatchNum) error { return k.reset(batchNum, true) }
// reset resets the KVDB to the checkpoint at the given batchNum. Reset does
// not delete the checkpoints between old current and the new current, those
// checkpoints will remain in the storage, and eventually will be deleted when
// MakeCheckpoint overwrites them. `closeCurrent` will close the currently
// opened db before doing the reset.
func (k *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { currentPath := path.Join(k.cfg.Path, PathCurrent)
if closeCurrent && k.db != nil { k.db.Close() k.db = nil } // remove 'current'
if err := os.RemoveAll(currentPath); err != nil { return tracerr.Wrap(err) } // remove all checkpoints > batchNum
list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } // Find first batch that is greater than batchNum, and delete
// everything after that
start := 0 for ; start < len(list); start++ { if common.BatchNum(list[start]) > batchNum { break } } for _, bn := range list[start:] { if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil { return tracerr.Wrap(err) } }
if batchNum == 0 { // if batchNum == 0, open the new fresh 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false) if err != nil { return tracerr.Wrap(err) } k.db = sto k.CurrentIdx = common.RollupConstReservedIDx // 255
k.CurrentBatch = 0 if k.last != nil { if err := k.last.setNew(); err != nil { return tracerr.Wrap(err) } }
return nil }
// copy 'batchNum' to 'current'
if err := k.MakeCheckpointFromTo(batchNum, currentPath); err != nil { return tracerr.Wrap(err) } // copy 'batchNum' to 'last'
if k.last != nil { if err := k.last.set(k, batchNum); err != nil { return tracerr.Wrap(err) } }
// open the new 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false) if err != nil { return tracerr.Wrap(err) } k.db = sto
// get currentBatch num
k.CurrentBatch, err = k.GetCurrentBatch() if err != nil { return tracerr.Wrap(err) } // idx is obtained from the statedb reset
k.CurrentIdx, err = k.GetCurrentIdx() if err != nil { return tracerr.Wrap(err) }
return nil }
// ResetFromSynchronizer performs a reset in the KVDB getting the state from
// synchronizerKVDB for the given batchNum.
func (k *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error { if synchronizerKVDB == nil { return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil")) }
currentPath := path.Join(k.cfg.Path, PathCurrent) if k.db != nil { k.db.Close() k.db = nil }
// remove 'current'
if err := os.RemoveAll(currentPath); err != nil { return tracerr.Wrap(err) } // remove all checkpoints
list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } for _, bn := range list { if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil { return tracerr.Wrap(err) } }
if batchNum == 0 { // if batchNum == 0, open the new fresh 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false) if err != nil { return tracerr.Wrap(err) } k.db = sto k.CurrentIdx = common.RollupConstReservedIDx // 255
k.CurrentBatch = 0
return nil }
checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
// copy synchronizer'BatchNumX' to 'BatchNumX'
if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil { return tracerr.Wrap(err) }
// copy 'BatchNumX' to 'current'
err = k.MakeCheckpointFromTo(batchNum, currentPath) if err != nil { return tracerr.Wrap(err) }
// open the new 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false) if err != nil { return tracerr.Wrap(err) } k.db = sto
// get currentBatch num
k.CurrentBatch, err = k.GetCurrentBatch() if err != nil { return tracerr.Wrap(err) } // get currentIdx
k.CurrentIdx, err = k.GetCurrentIdx() if err != nil { return tracerr.Wrap(err) }
return nil }
// GetCurrentBatch returns the current BatchNum stored in the KVDB
func (k *KVDB) GetCurrentBatch() (common.BatchNum, error) { cbBytes, err := k.db.Get(KeyCurrentBatch) if tracerr.Unwrap(err) == db.ErrNotFound { return 0, nil } if err != nil { return 0, tracerr.Wrap(err) } return common.BatchNumFromBytes(cbBytes) }
// setCurrentBatch stores the current BatchNum in the KVDB
func (k *KVDB) setCurrentBatch() error { tx, err := k.db.NewTx() if err != nil { return tracerr.Wrap(err) } err = tx.Put(KeyCurrentBatch, k.CurrentBatch.Bytes()) if err != nil { return tracerr.Wrap(err) } if err := tx.Commit(); err != nil { return tracerr.Wrap(err) } return nil }
// GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
// used for an Account in the k.
func (k *KVDB) GetCurrentIdx() (common.Idx, error) { idxBytes, err := k.db.Get(keyCurrentIdx) if tracerr.Unwrap(err) == db.ErrNotFound { return common.RollupConstReservedIDx, nil // 255, nil
} if err != nil { return 0, tracerr.Wrap(err) } return common.IdxFromBytes(idxBytes[:]) }
// SetCurrentIdx stores Idx in the KVDB
func (k *KVDB) SetCurrentIdx(idx common.Idx) error { k.CurrentIdx = idx
tx, err := k.db.NewTx() if err != nil { return tracerr.Wrap(err) } idxBytes, err := idx.Bytes() if err != nil { return tracerr.Wrap(err) } err = tx.Put(keyCurrentIdx, idxBytes[:]) if err != nil { return tracerr.Wrap(err) } if err := tx.Commit(); err != nil { return tracerr.Wrap(err) } return nil }
// MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
// Internally this advances & stores the current BatchNum, and then stores a
// Checkpoint of the current state of the k.
func (k *KVDB) MakeCheckpoint() error { // advance currentBatch
k.CurrentBatch++
checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, k.CurrentBatch))
if err := k.setCurrentBatch(); err != nil { return tracerr.Wrap(err) }
// if checkpoint BatchNum already exist in disk, delete it
if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { } else if err != nil { return tracerr.Wrap(err) } else { if err := os.RemoveAll(checkpointPath); err != nil { return tracerr.Wrap(err) } }
// execute Checkpoint
if err := k.db.Pebble().Checkpoint(checkpointPath); err != nil { return tracerr.Wrap(err) } // copy 'CurrentBatch' to 'last'
if k.last != nil { if err := k.last.set(k, k.CurrentBatch); err != nil { return tracerr.Wrap(err) } } // delete old checkpoints
if err := k.deleteOldCheckpoints(); err != nil { return tracerr.Wrap(err) }
return nil }
// CheckpointExists returns true if the checkpoint exists
func (k *KVDB) CheckpointExists(batchNum common.BatchNum) (bool, error) { source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) if _, err := os.Stat(source); os.IsNotExist(err) { return false, nil } else if err != nil { return false, err } return true, nil }
// DeleteCheckpoint removes if exist the checkpoint of the given batchNum
func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) } else if err != nil { return tracerr.Wrap(err) }
return os.RemoveAll(checkpointPath) }
// ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
// If there's a gap between the list of checkpoints, an error is returned.
func (k *KVDB) ListCheckpoints() ([]int, error) { files, err := ioutil.ReadDir(k.cfg.Path) if err != nil { return nil, tracerr.Wrap(err) } checkpoints := []int{} var checkpoint int pattern := fmt.Sprintf("%s%%d", PathBatchNum) for _, file := range files { fileName := file.Name() if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) { if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil { return nil, tracerr.Wrap(err) } checkpoints = append(checkpoints, checkpoint) } } sort.Ints(checkpoints) if !k.cfg.NoGapsCheck && len(checkpoints) > 0 { first := checkpoints[0] for _, checkpoint := range checkpoints[1:] { first++ if checkpoint != first { log.Errorw("gap between checkpoints", "checkpoints", checkpoints) return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint)) } } } return checkpoints, nil }
// deleteOldCheckpoints deletes old checkpoints when there are more than
// `s.keep` checkpoints
func (k *KVDB) deleteOldCheckpoints() error { list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } if k.cfg.Keep > 0 && len(list) > k.cfg.Keep { for _, checkpoint := range list[:len(list)-k.cfg.Keep] { if err := k.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { return tracerr.Wrap(err) } } } return nil }
// MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum
// to the dest folder. This method is locking, so it can be called from
// multiple places at the same time.
func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error { source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) if _, err := os.Stat(source); os.IsNotExist(err) { // if kvdb does not have checkpoint at batchNum, return err
return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) } else if err != nil { return tracerr.Wrap(err) } // By locking we allow calling MakeCheckpointFromTo from multiple
// places at the same time for the same stateDB. This allows the
// synchronizer to do a reset to a batchNum at the same time as the
// pipeline is doing a txSelector.Reset and batchBuilder.Reset from
// synchronizer to the same batchNum
k.m.Lock() defer k.m.Unlock() return pebbleMakeCheckpoint(source, dest) }
func pebbleMakeCheckpoint(source, dest string) error { // Remove dest folder (if it exists) before doing the checkpoint
if _, err := os.Stat(dest); os.IsNotExist(err) { } else if err != nil { return tracerr.Wrap(err) } else { if err := os.RemoveAll(dest); err != nil { return tracerr.Wrap(err) } }
sto, err := pebble.NewPebbleStorage(source, false) if err != nil { return tracerr.Wrap(err) } defer sto.Close()
// execute Checkpoint
err = sto.Pebble().Checkpoint(dest) if err != nil { return tracerr.Wrap(err) }
return nil }
// Close the DB
func (k *KVDB) Close() { if k.db != nil { k.db.Close() k.db = nil } if k.last != nil { k.last.close() } }
|