Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago Fix eth events query and sync inconsistent state
- kvdb
- Fix path in Last when doing `setNew`
- Only close if db != nil, and after closing, always set db to nil
- This will avoid a panic in the case where the db is closed but
there's an error soon after, and a future call tries to close
again. This is because pebble.Close() will panic if the db is
already closed.
- Avoid calling pebble methods when a the Storage interface already
implements that method (like Close).
- statedb
- In test, avoid calling KVDB method if the same method is available for
the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
- In *EventByBlock methods, take blockHash as input argument and use it
when querying the event logs. Previously the blockHash was only taken
from the logs results *only if* there was any log. This caused the
following issue: if there was no logs, it was not possible to know if
the result was from the expected block or an uncle block! By querying
logs by blockHash we make sure that even if there are no logs, they
are from the right block.
- Note that now the function can either be called with a
blockNum or blockHash, but not both at the same time.
- sync
- If there's an error during call to Sync call resetState, which
internally resets the stateDB to avoid stale checkpoints (and a
corresponding invalid increase in the StateDB batchNum).
- During a Sync, after very batch processed, make sure that the StateDB
currentBatch corresponds to the batchNum in the smart contract
log/event.
3 years ago |
|
// Package kvdb provides a key-value database with Checkpoints & Resets system
package kvdb
import ( "fmt" "io/ioutil" "os" "path" "sort" "strings" "sync"
"github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/tracerr" "github.com/iden3/go-merkletree/db" "github.com/iden3/go-merkletree/db/pebble" )
const ( // PathBatchNum defines the subpath of the Batch Checkpoint in the
// subpath of the KVDB
PathBatchNum = "BatchNum" // PathCurrent defines the subpath of the current Batch in the subpath
// of the KVDB
PathCurrent = "current" // PathLast defines the subpath of the last Batch in the subpath
// of the StateDB
PathLast = "last" )
var ( // KeyCurrentBatch is used as key in the db to store the current BatchNum
KeyCurrentBatch = []byte("k:currentbatch") // keyCurrentIdx is used as key in the db to store the CurrentIdx
keyCurrentIdx = []byte("k:idx") )
// KVDB represents the Key-Value DB object
type KVDB struct { path string db *pebble.Storage // CurrentIdx holds the current Idx that the BatchBuilder is using
CurrentIdx common.Idx CurrentBatch common.BatchNum keep int m sync.Mutex last *Last }
// Last is a consistent view to the last batch of the stateDB that can
// be queried concurrently.
type Last struct { db *pebble.Storage path string rw sync.RWMutex }
func (k *Last) setNew() error { k.rw.Lock() defer k.rw.Unlock() if k.db != nil { k.db.Close() k.db = nil } lastPath := path.Join(k.path, PathLast) if err := os.RemoveAll(lastPath); err != nil { return tracerr.Wrap(err) } db, err := pebble.NewPebbleStorage(lastPath, false) if err != nil { return tracerr.Wrap(err) } k.db = db return nil }
func (k *Last) set(kvdb *KVDB, batchNum common.BatchNum) error { k.rw.Lock() defer k.rw.Unlock() if k.db != nil { k.db.Close() k.db = nil } lastPath := path.Join(k.path, PathLast) if err := kvdb.MakeCheckpointFromTo(batchNum, lastPath); err != nil { return tracerr.Wrap(err) } db, err := pebble.NewPebbleStorage(lastPath, false) if err != nil { return tracerr.Wrap(err) } k.db = db return nil }
func (k *Last) close() { k.rw.Lock() defer k.rw.Unlock() if k.db != nil { k.db.Close() k.db = nil } }
// NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
// Checkpoints older than the value defined by `keep` will be deleted.
func NewKVDB(pathDB string, keep int) (*KVDB, error) { var sto *pebble.Storage var err error sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false) if err != nil { return nil, tracerr.Wrap(err) }
kvdb := &KVDB{ path: pathDB, db: sto, keep: keep, last: &Last{ path: pathDB, }, } // load currentBatch
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() if err != nil { return nil, tracerr.Wrap(err) }
// make reset (get checkpoint) at currentBatch
err = kvdb.reset(kvdb.CurrentBatch, true) if err != nil { return nil, tracerr.Wrap(err) }
return kvdb, nil }
// LastRead is a thread-safe method to query the last KVDB
func (kvdb *KVDB) LastRead(fn func(db *pebble.Storage) error) error { kvdb.last.rw.RLock() defer kvdb.last.rw.RUnlock() return fn(kvdb.last.db) }
// DB returns the *pebble.Storage from the KVDB
func (kvdb *KVDB) DB() *pebble.Storage { return kvdb.db }
// StorageWithPrefix returns the db.Storage with the given prefix from the
// current KVDB
func (kvdb *KVDB) StorageWithPrefix(prefix []byte) db.Storage { return kvdb.db.WithPrefix(prefix) }
// Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
// not delete the checkpoints between old current and the new current, those
// checkpoints will remain in the storage, and eventually will be deleted when
// MakeCheckpoint overwrites them.
func (kvdb *KVDB) Reset(batchNum common.BatchNum) error { return kvdb.reset(batchNum, true) }
// reset resets the KVDB to the checkpoint at the given batchNum. Reset does
// not delete the checkpoints between old current and the new current, those
// checkpoints will remain in the storage, and eventually will be deleted when
// MakeCheckpoint overwrites them. `closeCurrent` will close the currently
// opened db before doing the reset.
func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { currentPath := path.Join(kvdb.path, PathCurrent)
if closeCurrent && kvdb.db != nil { kvdb.db.Close() kvdb.db = nil } // remove 'current'
if err := os.RemoveAll(currentPath); err != nil { return tracerr.Wrap(err) } // remove all checkpoints > batchNum
list, err := kvdb.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } // Find first batch that is greater than batchNum, and delete
// everything after that
start := 0 for ; start < len(list); start++ { if common.BatchNum(list[start]) > batchNum { break } } for _, bn := range list[start:] { if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil { return tracerr.Wrap(err) } }
if batchNum == 0 { // if batchNum == 0, open the new fresh 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false) if err != nil { return tracerr.Wrap(err) } kvdb.db = sto kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
kvdb.CurrentBatch = 0 if err := kvdb.last.setNew(); err != nil { return tracerr.Wrap(err) }
return nil }
// copy 'batchNum' to 'current'
if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil { return tracerr.Wrap(err) } // copy 'batchNum' to 'last'
if err := kvdb.last.set(kvdb, batchNum); err != nil { return tracerr.Wrap(err) }
// open the new 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false) if err != nil { return tracerr.Wrap(err) } kvdb.db = sto
// get currentBatch num
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() if err != nil { return tracerr.Wrap(err) } // idx is obtained from the statedb reset
kvdb.CurrentIdx, err = kvdb.GetCurrentIdx() if err != nil { return tracerr.Wrap(err) }
return nil }
// ResetFromSynchronizer performs a reset in the KVDB getting the state from
// synchronizerKVDB for the given batchNum.
func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error { if synchronizerKVDB == nil { return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil")) }
currentPath := path.Join(kvdb.path, PathCurrent) if kvdb.db != nil { kvdb.db.Close() kvdb.db = nil }
// remove 'current'
if err := os.RemoveAll(currentPath); err != nil { return tracerr.Wrap(err) } // remove all checkpoints
list, err := kvdb.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } for _, bn := range list { if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil { return tracerr.Wrap(err) } }
if batchNum == 0 { // if batchNum == 0, open the new fresh 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false) if err != nil { return tracerr.Wrap(err) } kvdb.db = sto kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
kvdb.CurrentBatch = 0
return nil }
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
// copy synchronizer'BatchNumX' to 'BatchNumX'
if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil { return tracerr.Wrap(err) }
// copy 'BatchNumX' to 'current'
err = kvdb.MakeCheckpointFromTo(batchNum, currentPath) if err != nil { return tracerr.Wrap(err) }
// open the new 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false) if err != nil { return tracerr.Wrap(err) } kvdb.db = sto
// get currentBatch num
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() if err != nil { return tracerr.Wrap(err) } // get currentIdx
kvdb.CurrentIdx, err = kvdb.GetCurrentIdx() if err != nil { return tracerr.Wrap(err) }
return nil }
// GetCurrentBatch returns the current BatchNum stored in the KVDB
func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) { cbBytes, err := kvdb.db.Get(KeyCurrentBatch) if tracerr.Unwrap(err) == db.ErrNotFound { return 0, nil } if err != nil { return 0, tracerr.Wrap(err) } return common.BatchNumFromBytes(cbBytes) }
// setCurrentBatch stores the current BatchNum in the KVDB
func (kvdb *KVDB) setCurrentBatch() error { tx, err := kvdb.db.NewTx() if err != nil { return tracerr.Wrap(err) } err = tx.Put(KeyCurrentBatch, kvdb.CurrentBatch.Bytes()) if err != nil { return tracerr.Wrap(err) } if err := tx.Commit(); err != nil { return tracerr.Wrap(err) } return nil }
// GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
// used for an Account in the KVDB.
func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) { idxBytes, err := kvdb.db.Get(keyCurrentIdx) if tracerr.Unwrap(err) == db.ErrNotFound { return common.RollupConstReservedIDx, nil // 255, nil
} if err != nil { return 0, tracerr.Wrap(err) } return common.IdxFromBytes(idxBytes[:]) }
// SetCurrentIdx stores Idx in the KVDB
func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error { kvdb.CurrentIdx = idx
tx, err := kvdb.db.NewTx() if err != nil { return tracerr.Wrap(err) } idxBytes, err := idx.Bytes() if err != nil { return tracerr.Wrap(err) } err = tx.Put(keyCurrentIdx, idxBytes[:]) if err != nil { return tracerr.Wrap(err) } if err := tx.Commit(); err != nil { return tracerr.Wrap(err) } return nil }
// MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
// Internally this advances & stores the current BatchNum, and then stores a
// Checkpoint of the current state of the KVDB.
func (kvdb *KVDB) MakeCheckpoint() error { // advance currentBatch
kvdb.CurrentBatch++
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, kvdb.CurrentBatch))
if err := kvdb.setCurrentBatch(); err != nil { return tracerr.Wrap(err) }
// if checkpoint BatchNum already exist in disk, delete it
if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) { if err := os.RemoveAll(checkpointPath); err != nil { return tracerr.Wrap(err) } } else if err != nil && !os.IsNotExist(err) { return tracerr.Wrap(err) }
// execute Checkpoint
if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil { return tracerr.Wrap(err) } // copy 'CurrentBatch' to 'last'
if err := kvdb.last.set(kvdb, kvdb.CurrentBatch); err != nil { return tracerr.Wrap(err) } // delete old checkpoints
if err := kvdb.deleteOldCheckpoints(); err != nil { return tracerr.Wrap(err) }
return nil }
// DeleteCheckpoint removes if exist the checkpoint of the given batchNum
func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) }
return os.RemoveAll(checkpointPath) }
// ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
// If there's a gap between the list of checkpoints, an error is returned.
func (kvdb *KVDB) ListCheckpoints() ([]int, error) { files, err := ioutil.ReadDir(kvdb.path) if err != nil { return nil, tracerr.Wrap(err) } checkpoints := []int{} var checkpoint int pattern := fmt.Sprintf("%s%%d", PathBatchNum) for _, file := range files { fileName := file.Name() if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) { if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil { return nil, tracerr.Wrap(err) } checkpoints = append(checkpoints, checkpoint) } } sort.Ints(checkpoints) if len(checkpoints) > 0 { first := checkpoints[0] for _, checkpoint := range checkpoints[1:] { first++ if checkpoint != first { log.Errorw("GAP", "checkpoints", checkpoints) return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint)) } } } return checkpoints, nil }
// deleteOldCheckpoints deletes old checkpoints when there are more than
// `s.keep` checkpoints
func (kvdb *KVDB) deleteOldCheckpoints() error { list, err := kvdb.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } if len(list) > kvdb.keep { for _, checkpoint := range list[:len(list)-kvdb.keep] { if err := kvdb.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { return tracerr.Wrap(err) } } } return nil }
// MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum
// to the dest folder. This method is locking, so it can be called from
// multiple places at the same time.
func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error { source := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) if _, err := os.Stat(source); os.IsNotExist(err) { // if kvdb does not have checkpoint at batchNum, return err
return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) } // By locking we allow calling MakeCheckpointFromTo from multiple
// places at the same time for the same stateDB. This allows the
// synchronizer to do a reset to a batchNum at the same time as the
// pipeline is doing a txSelector.Reset and batchBuilder.Reset from
// synchronizer to the same batchNum
kvdb.m.Lock() defer kvdb.m.Unlock() return pebbleMakeCheckpoint(source, dest) }
func pebbleMakeCheckpoint(source, dest string) error { // Remove dest folder (if it exists) before doing the checkpoint
if _, err := os.Stat(dest); !os.IsNotExist(err) { if err := os.RemoveAll(dest); err != nil { return tracerr.Wrap(err) } } else if err != nil && !os.IsNotExist(err) { return tracerr.Wrap(err) }
sto, err := pebble.NewPebbleStorage(source, false) if err != nil { return tracerr.Wrap(err) } defer sto.Close()
// execute Checkpoint
err = sto.Pebble().Checkpoint(dest) if err != nil { return tracerr.Wrap(err) }
return nil }
// Close the DB
func (kvdb *KVDB) Close() { if kvdb.db != nil { kvdb.db.Close() kvdb.db = nil } kvdb.last.close() }
|