You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

585 lines
15 KiB

Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
  1. // Package kvdb provides a key-value database with Checkpoints & Resets system
  2. package kvdb
  3. import (
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "github.com/hermeznetwork/hermez-node/common"
  12. "github.com/hermeznetwork/hermez-node/log"
  13. "github.com/hermeznetwork/tracerr"
  14. "github.com/iden3/go-merkletree/db"
  15. "github.com/iden3/go-merkletree/db/pebble"
  16. )
  17. const (
  18. // PathBatchNum defines the subpath of the Batch Checkpoint in the
  19. // subpath of the KVDB
  20. PathBatchNum = "BatchNum"
  21. // PathCurrent defines the subpath of the current Batch in the subpath
  22. // of the KVDB
  23. PathCurrent = "current"
  24. // PathLast defines the subpath of the last Batch in the subpath
  25. // of the StateDB
  26. PathLast = "last"
  27. // DefaultKeep is the default value for the Keep parameter
  28. DefaultKeep = 128
  29. )
  30. var (
  31. // KeyCurrentBatch is used as key in the db to store the current BatchNum
  32. KeyCurrentBatch = []byte("k:currentbatch")
  33. // keyCurrentIdx is used as key in the db to store the CurrentIdx
  34. keyCurrentIdx = []byte("k:idx")
  35. // ErrNoLast is returned when the KVDB has been configured to not have
  36. // a Last checkpoint but a Last method is used
  37. ErrNoLast = fmt.Errorf("no last checkpoint")
  38. )
  39. // KVDB represents the Key-Value DB object
  40. type KVDB struct {
  41. cfg Config
  42. db *pebble.Storage
  43. // CurrentIdx holds the current Idx that the BatchBuilder is using
  44. CurrentIdx common.Idx
  45. CurrentBatch common.BatchNum
  46. m sync.Mutex
  47. last *Last
  48. }
  49. // Last is a consistent view to the last batch of the stateDB that can
  50. // be queried concurrently.
  51. type Last struct {
  52. db *pebble.Storage
  53. path string
  54. rw sync.RWMutex
  55. }
  56. func (k *Last) setNew() error {
  57. k.rw.Lock()
  58. defer k.rw.Unlock()
  59. if k.db != nil {
  60. k.db.Close()
  61. k.db = nil
  62. }
  63. lastPath := path.Join(k.path, PathLast)
  64. if err := os.RemoveAll(lastPath); err != nil {
  65. return tracerr.Wrap(err)
  66. }
  67. db, err := pebble.NewPebbleStorage(lastPath, false)
  68. if err != nil {
  69. return tracerr.Wrap(err)
  70. }
  71. k.db = db
  72. return nil
  73. }
  74. func (k *Last) set(kvdb *KVDB, batchNum common.BatchNum) error {
  75. k.rw.Lock()
  76. defer k.rw.Unlock()
  77. if k.db != nil {
  78. k.db.Close()
  79. k.db = nil
  80. }
  81. lastPath := path.Join(k.path, PathLast)
  82. if err := kvdb.MakeCheckpointFromTo(batchNum, lastPath); err != nil {
  83. return tracerr.Wrap(err)
  84. }
  85. db, err := pebble.NewPebbleStorage(lastPath, false)
  86. if err != nil {
  87. return tracerr.Wrap(err)
  88. }
  89. k.db = db
  90. return nil
  91. }
  92. func (k *Last) close() {
  93. k.rw.Lock()
  94. defer k.rw.Unlock()
  95. if k.db != nil {
  96. k.db.Close()
  97. k.db = nil
  98. }
  99. }
  100. // Config of the KVDB
  101. type Config struct {
  102. // Path where the checkpoints will be stored
  103. Path string
  104. // Keep is the number of old checkpoints to keep. If 0, all
  105. // checkpoints are kept.
  106. Keep int
  107. // At every checkpoint, check that there are no gaps between the
  108. // checkpoints
  109. NoGapsCheck bool
  110. // NoLast skips having an opened DB with a checkpoint to the last
  111. // batchNum for thread-safe reads.
  112. NoLast bool
  113. }
  114. // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
  115. // Checkpoints older than the value defined by `keep` will be deleted.
  116. // func NewKVDB(pathDB string, keep int) (*KVDB, error) {
  117. func NewKVDB(cfg Config) (*KVDB, error) {
  118. var sto *pebble.Storage
  119. var err error
  120. sto, err = pebble.NewPebbleStorage(path.Join(cfg.Path, PathCurrent), false)
  121. if err != nil {
  122. return nil, tracerr.Wrap(err)
  123. }
  124. var last *Last
  125. if !cfg.NoLast {
  126. last = &Last{
  127. path: cfg.Path,
  128. }
  129. }
  130. kvdb := &KVDB{
  131. cfg: cfg,
  132. db: sto,
  133. last: last,
  134. }
  135. // load currentBatch
  136. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  137. if err != nil {
  138. return nil, tracerr.Wrap(err)
  139. }
  140. // make reset (get checkpoint) at currentBatch
  141. err = kvdb.reset(kvdb.CurrentBatch, true)
  142. if err != nil {
  143. return nil, tracerr.Wrap(err)
  144. }
  145. return kvdb, nil
  146. }
  147. // LastRead is a thread-safe method to query the last KVDB
  148. func (k *KVDB) LastRead(fn func(db *pebble.Storage) error) error {
  149. if k.last == nil {
  150. return tracerr.Wrap(ErrNoLast)
  151. }
  152. k.last.rw.RLock()
  153. defer k.last.rw.RUnlock()
  154. return fn(k.last.db)
  155. }
  156. // DB returns the *pebble.Storage from the KVDB
  157. func (k *KVDB) DB() *pebble.Storage {
  158. return k.db
  159. }
  160. // StorageWithPrefix returns the db.Storage with the given prefix from the
  161. // current KVDB
  162. func (k *KVDB) StorageWithPrefix(prefix []byte) db.Storage {
  163. return k.db.WithPrefix(prefix)
  164. }
  165. // Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  166. // not delete the checkpoints between old current and the new current, those
  167. // checkpoints will remain in the storage, and eventually will be deleted when
  168. // MakeCheckpoint overwrites them.
  169. func (k *KVDB) Reset(batchNum common.BatchNum) error {
  170. return k.reset(batchNum, true)
  171. }
  172. // reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  173. // not delete the checkpoints between old current and the new current, those
  174. // checkpoints will remain in the storage, and eventually will be deleted when
  175. // MakeCheckpoint overwrites them. `closeCurrent` will close the currently
  176. // opened db before doing the reset.
  177. func (k *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
  178. currentPath := path.Join(k.cfg.Path, PathCurrent)
  179. if closeCurrent && k.db != nil {
  180. k.db.Close()
  181. k.db = nil
  182. }
  183. // remove 'current'
  184. if err := os.RemoveAll(currentPath); err != nil {
  185. return tracerr.Wrap(err)
  186. }
  187. // remove all checkpoints > batchNum
  188. list, err := k.ListCheckpoints()
  189. if err != nil {
  190. return tracerr.Wrap(err)
  191. }
  192. // Find first batch that is greater than batchNum, and delete
  193. // everything after that
  194. start := 0
  195. for ; start < len(list); start++ {
  196. if common.BatchNum(list[start]) > batchNum {
  197. break
  198. }
  199. }
  200. for _, bn := range list[start:] {
  201. if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  202. return tracerr.Wrap(err)
  203. }
  204. }
  205. if batchNum == 0 {
  206. // if batchNum == 0, open the new fresh 'current'
  207. sto, err := pebble.NewPebbleStorage(currentPath, false)
  208. if err != nil {
  209. return tracerr.Wrap(err)
  210. }
  211. k.db = sto
  212. k.CurrentIdx = common.RollupConstReservedIDx // 255
  213. k.CurrentBatch = 0
  214. if k.last != nil {
  215. if err := k.last.setNew(); err != nil {
  216. return tracerr.Wrap(err)
  217. }
  218. }
  219. return nil
  220. }
  221. // copy 'batchNum' to 'current'
  222. if err := k.MakeCheckpointFromTo(batchNum, currentPath); err != nil {
  223. return tracerr.Wrap(err)
  224. }
  225. // copy 'batchNum' to 'last'
  226. if k.last != nil {
  227. if err := k.last.set(k, batchNum); err != nil {
  228. return tracerr.Wrap(err)
  229. }
  230. }
  231. // open the new 'current'
  232. sto, err := pebble.NewPebbleStorage(currentPath, false)
  233. if err != nil {
  234. return tracerr.Wrap(err)
  235. }
  236. k.db = sto
  237. // get currentBatch num
  238. k.CurrentBatch, err = k.GetCurrentBatch()
  239. if err != nil {
  240. return tracerr.Wrap(err)
  241. }
  242. // idx is obtained from the statedb reset
  243. k.CurrentIdx, err = k.GetCurrentIdx()
  244. if err != nil {
  245. return tracerr.Wrap(err)
  246. }
  247. return nil
  248. }
  249. // ResetFromSynchronizer performs a reset in the KVDB getting the state from
  250. // synchronizerKVDB for the given batchNum.
  251. func (k *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error {
  252. if synchronizerKVDB == nil {
  253. return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil"))
  254. }
  255. currentPath := path.Join(k.cfg.Path, PathCurrent)
  256. if k.db != nil {
  257. k.db.Close()
  258. k.db = nil
  259. }
  260. // remove 'current'
  261. if err := os.RemoveAll(currentPath); err != nil {
  262. return tracerr.Wrap(err)
  263. }
  264. // remove all checkpoints
  265. list, err := k.ListCheckpoints()
  266. if err != nil {
  267. return tracerr.Wrap(err)
  268. }
  269. for _, bn := range list {
  270. if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  271. return tracerr.Wrap(err)
  272. }
  273. }
  274. if batchNum == 0 {
  275. // if batchNum == 0, open the new fresh 'current'
  276. sto, err := pebble.NewPebbleStorage(currentPath, false)
  277. if err != nil {
  278. return tracerr.Wrap(err)
  279. }
  280. k.db = sto
  281. k.CurrentIdx = common.RollupConstReservedIDx // 255
  282. k.CurrentBatch = 0
  283. return nil
  284. }
  285. checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  286. // copy synchronizer'BatchNumX' to 'BatchNumX'
  287. if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil {
  288. return tracerr.Wrap(err)
  289. }
  290. // copy 'BatchNumX' to 'current'
  291. err = k.MakeCheckpointFromTo(batchNum, currentPath)
  292. if err != nil {
  293. return tracerr.Wrap(err)
  294. }
  295. // open the new 'current'
  296. sto, err := pebble.NewPebbleStorage(currentPath, false)
  297. if err != nil {
  298. return tracerr.Wrap(err)
  299. }
  300. k.db = sto
  301. // get currentBatch num
  302. k.CurrentBatch, err = k.GetCurrentBatch()
  303. if err != nil {
  304. return tracerr.Wrap(err)
  305. }
  306. // get currentIdx
  307. k.CurrentIdx, err = k.GetCurrentIdx()
  308. if err != nil {
  309. return tracerr.Wrap(err)
  310. }
  311. return nil
  312. }
  313. // GetCurrentBatch returns the current BatchNum stored in the KVDB
  314. func (k *KVDB) GetCurrentBatch() (common.BatchNum, error) {
  315. cbBytes, err := k.db.Get(KeyCurrentBatch)
  316. if tracerr.Unwrap(err) == db.ErrNotFound {
  317. return 0, nil
  318. }
  319. if err != nil {
  320. return 0, tracerr.Wrap(err)
  321. }
  322. return common.BatchNumFromBytes(cbBytes)
  323. }
  324. // setCurrentBatch stores the current BatchNum in the KVDB
  325. func (k *KVDB) setCurrentBatch() error {
  326. tx, err := k.db.NewTx()
  327. if err != nil {
  328. return tracerr.Wrap(err)
  329. }
  330. err = tx.Put(KeyCurrentBatch, k.CurrentBatch.Bytes())
  331. if err != nil {
  332. return tracerr.Wrap(err)
  333. }
  334. if err := tx.Commit(); err != nil {
  335. return tracerr.Wrap(err)
  336. }
  337. return nil
  338. }
  339. // GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
  340. // used for an Account in the k.
  341. func (k *KVDB) GetCurrentIdx() (common.Idx, error) {
  342. idxBytes, err := k.db.Get(keyCurrentIdx)
  343. if tracerr.Unwrap(err) == db.ErrNotFound {
  344. return common.RollupConstReservedIDx, nil // 255, nil
  345. }
  346. if err != nil {
  347. return 0, tracerr.Wrap(err)
  348. }
  349. return common.IdxFromBytes(idxBytes[:])
  350. }
  351. // SetCurrentIdx stores Idx in the KVDB
  352. func (k *KVDB) SetCurrentIdx(idx common.Idx) error {
  353. k.CurrentIdx = idx
  354. tx, err := k.db.NewTx()
  355. if err != nil {
  356. return tracerr.Wrap(err)
  357. }
  358. idxBytes, err := idx.Bytes()
  359. if err != nil {
  360. return tracerr.Wrap(err)
  361. }
  362. err = tx.Put(keyCurrentIdx, idxBytes[:])
  363. if err != nil {
  364. return tracerr.Wrap(err)
  365. }
  366. if err := tx.Commit(); err != nil {
  367. return tracerr.Wrap(err)
  368. }
  369. return nil
  370. }
  371. // MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
  372. // Internally this advances & stores the current BatchNum, and then stores a
  373. // Checkpoint of the current state of the k.
  374. func (k *KVDB) MakeCheckpoint() error {
  375. // advance currentBatch
  376. k.CurrentBatch++
  377. checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, k.CurrentBatch))
  378. if err := k.setCurrentBatch(); err != nil {
  379. return tracerr.Wrap(err)
  380. }
  381. // if checkpoint BatchNum already exist in disk, delete it
  382. if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
  383. } else if err != nil {
  384. return tracerr.Wrap(err)
  385. } else {
  386. if err := os.RemoveAll(checkpointPath); err != nil {
  387. return tracerr.Wrap(err)
  388. }
  389. }
  390. // execute Checkpoint
  391. if err := k.db.Pebble().Checkpoint(checkpointPath); err != nil {
  392. return tracerr.Wrap(err)
  393. }
  394. // copy 'CurrentBatch' to 'last'
  395. if k.last != nil {
  396. if err := k.last.set(k, k.CurrentBatch); err != nil {
  397. return tracerr.Wrap(err)
  398. }
  399. }
  400. // delete old checkpoints
  401. if err := k.deleteOldCheckpoints(); err != nil {
  402. return tracerr.Wrap(err)
  403. }
  404. return nil
  405. }
  406. // CheckpointExists returns true if the checkpoint exists
  407. func (k *KVDB) CheckpointExists(batchNum common.BatchNum) (bool, error) {
  408. source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  409. if _, err := os.Stat(source); os.IsNotExist(err) {
  410. return false, nil
  411. } else if err != nil {
  412. return false, err
  413. }
  414. return true, nil
  415. }
  416. // DeleteCheckpoint removes if exist the checkpoint of the given batchNum
  417. func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
  418. checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  419. if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
  420. return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
  421. } else if err != nil {
  422. return tracerr.Wrap(err)
  423. }
  424. return os.RemoveAll(checkpointPath)
  425. }
  426. // ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
  427. // If there's a gap between the list of checkpoints, an error is returned.
  428. func (k *KVDB) ListCheckpoints() ([]int, error) {
  429. files, err := ioutil.ReadDir(k.cfg.Path)
  430. if err != nil {
  431. return nil, tracerr.Wrap(err)
  432. }
  433. checkpoints := []int{}
  434. var checkpoint int
  435. pattern := fmt.Sprintf("%s%%d", PathBatchNum)
  436. for _, file := range files {
  437. fileName := file.Name()
  438. if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) {
  439. if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil {
  440. return nil, tracerr.Wrap(err)
  441. }
  442. checkpoints = append(checkpoints, checkpoint)
  443. }
  444. }
  445. sort.Ints(checkpoints)
  446. if !k.cfg.NoGapsCheck && len(checkpoints) > 0 {
  447. first := checkpoints[0]
  448. for _, checkpoint := range checkpoints[1:] {
  449. first++
  450. if checkpoint != first {
  451. log.Errorw("gap between checkpoints", "checkpoints", checkpoints)
  452. return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint))
  453. }
  454. }
  455. }
  456. return checkpoints, nil
  457. }
  458. // deleteOldCheckpoints deletes old checkpoints when there are more than
  459. // `s.keep` checkpoints
  460. func (k *KVDB) deleteOldCheckpoints() error {
  461. list, err := k.ListCheckpoints()
  462. if err != nil {
  463. return tracerr.Wrap(err)
  464. }
  465. if k.cfg.Keep > 0 && len(list) > k.cfg.Keep {
  466. for _, checkpoint := range list[:len(list)-k.cfg.Keep] {
  467. if err := k.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
  468. return tracerr.Wrap(err)
  469. }
  470. }
  471. }
  472. return nil
  473. }
  474. // MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum
  475. // to the dest folder. This method is locking, so it can be called from
  476. // multiple places at the same time.
  477. func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error {
  478. source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum))
  479. if _, err := os.Stat(source); os.IsNotExist(err) {
  480. // if kvdb does not have checkpoint at batchNum, return err
  481. return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source))
  482. } else if err != nil {
  483. return tracerr.Wrap(err)
  484. }
  485. // By locking we allow calling MakeCheckpointFromTo from multiple
  486. // places at the same time for the same stateDB. This allows the
  487. // synchronizer to do a reset to a batchNum at the same time as the
  488. // pipeline is doing a txSelector.Reset and batchBuilder.Reset from
  489. // synchronizer to the same batchNum
  490. k.m.Lock()
  491. defer k.m.Unlock()
  492. return pebbleMakeCheckpoint(source, dest)
  493. }
  494. func pebbleMakeCheckpoint(source, dest string) error {
  495. // Remove dest folder (if it exists) before doing the checkpoint
  496. if _, err := os.Stat(dest); os.IsNotExist(err) {
  497. } else if err != nil {
  498. return tracerr.Wrap(err)
  499. } else {
  500. if err := os.RemoveAll(dest); err != nil {
  501. return tracerr.Wrap(err)
  502. }
  503. }
  504. sto, err := pebble.NewPebbleStorage(source, false)
  505. if err != nil {
  506. return tracerr.Wrap(err)
  507. }
  508. defer sto.Close()
  509. // execute Checkpoint
  510. err = sto.Pebble().Checkpoint(dest)
  511. if err != nil {
  512. return tracerr.Wrap(err)
  513. }
  514. return nil
  515. }
  516. // Close the DB
  517. func (k *KVDB) Close() {
  518. if k.db != nil {
  519. k.db.Close()
  520. k.db = nil
  521. }
  522. if k.last != nil {
  523. k.last.close()
  524. }
  525. }