You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

599 lines
15 KiB

Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Update coordinator to work better under real net - cli / node - Update handler of SIGINT so that after 3 SIGINTs, the process terminates unconditionally - coordinator - Store stats without pointer - In all functions that send a variable via channel, check for context done to avoid deadlock (due to no process reading from the channel, which has no queue) when the node is stopped. - Abstract `canForge` so that it can be used outside of the `Coordinator` - In `canForge` check the blockNumber in current and next slot. - Update tests due to smart contract changes in slot handling, and minimum bid defaults - TxManager - Add consts, vars and stats to allow evaluating `canForge` - Add `canForge` method (not used yet) - Store batch and nonces status (last success and last pending) - Track nonces internally instead of relying on the ethereum node (this is required to work with ganache when there are pending txs) - Handle the (common) case of the receipt not being found after the tx is sent. - Don't start the main loop until we get an initial messae fo the stats and vars (so that in the loop the stats and vars are set to synchronizer values) - When a tx fails, check and discard all the failed transactions before sending the message to stop the pipeline. This will avoid sending consecutive messages of stop the pipeline when multiple txs are detected to be failed consecutively. Also, future txs of the same pipeline after a discarded txs are discarded, and their nonces reused. - Robust handling of nonces: - If geth returns nonce is too low, increase it - If geth returns nonce too hight, decrease it - If geth returns underpriced, increase gas price - If geth returns replace underpriced, increase gas price - Add support for resending transactions after a timeout - Store `BatchInfos` in a queue - Pipeline - When an error is found, stop forging batches and send a message to the coordinator to stop the pipeline with information of the failed batch number so that in a restart, non-failed batches are not repated. - When doing a reset of the stateDB, if possible reset from the local checkpoint instead of resetting from the synchronizer. This allows resetting from a batch that is valid but not yet sent / synced. - Every time a pipeline is started, assign it a number from a counter. This allows the TxManager to ignore batches from stopped pipelines, via a message sent by the coordinator. - Avoid forging when we haven't reached the rollup genesis block number. - Add config parameter `StartSlotBlocksDelay`: StartSlotBlocksDelay is the number of blocks of delay to wait before starting the pipeline when we reach a slot in which we can forge. - When detecting a reorg, only reset the pipeline if the batch from which the pipeline started changed and wasn't sent by us. - Add config parameter `ScheduleBatchBlocksAheadCheck`: ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which the forger address is checked to be allowed to forge (apart from checking the next block), used to decide when to stop scheduling new batches (by stopping the pipeline). For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck is 5, eventhough at block 11 we canForge, the pipeline will be stopped if we can't forge at block 15. This value should be the expected number of blocks it takes between scheduling a batch and having it mined. - Add config parameter `SendBatchBlocksMarginCheck`: SendBatchBlocksMarginCheck is the number of margin blocks ahead in which the coordinator is also checked to be allowed to forge, apart from the next block; used to decide when to stop sending batches to the smart contract. For example, if we are at block 10 and SendBatchBlocksMarginCheck is 5, eventhough at block 11 we canForge, the batch will be discarded if we can't forge at block 15. - Add config parameter `TxResendTimeout`: TxResendTimeout is the timeout after which a non-mined ethereum transaction will be resent (reusing the nonce) with a newly calculated gas price - Add config parameter `MaxGasPrice`: MaxGasPrice is the maximum gas price allowed for ethereum transactions - Add config parameter `NoReuseNonce`: NoReuseNonce disables reusing nonces of pending transactions for new replacement transactions. This is useful for testing with Ganache. - Extend BatchInfo with more useful information for debugging - eth / ethereum client - Add necessary methods to create the auth object for transactions manually so that we can set the nonce, gas price, gas limit, etc manually - Update `RollupForgeBatch` to take an auth object as input (so that the coordinator can set parameters manually) - synchronizer - In stats, add `NextSlot` - In stats, store full last batch instead of just last batch number - Instead of calculating a nextSlot from scratch every time, update the current struct (only updating the forger info if we are Synced) - Afer every processed batch, check that the calculated StateDB MTRoot matches the StateRoot found in the forgeBatch event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Update coordinator to work better under real net - cli / node - Update handler of SIGINT so that after 3 SIGINTs, the process terminates unconditionally - coordinator - Store stats without pointer - In all functions that send a variable via channel, check for context done to avoid deadlock (due to no process reading from the channel, which has no queue) when the node is stopped. - Abstract `canForge` so that it can be used outside of the `Coordinator` - In `canForge` check the blockNumber in current and next slot. - Update tests due to smart contract changes in slot handling, and minimum bid defaults - TxManager - Add consts, vars and stats to allow evaluating `canForge` - Add `canForge` method (not used yet) - Store batch and nonces status (last success and last pending) - Track nonces internally instead of relying on the ethereum node (this is required to work with ganache when there are pending txs) - Handle the (common) case of the receipt not being found after the tx is sent. - Don't start the main loop until we get an initial messae fo the stats and vars (so that in the loop the stats and vars are set to synchronizer values) - When a tx fails, check and discard all the failed transactions before sending the message to stop the pipeline. This will avoid sending consecutive messages of stop the pipeline when multiple txs are detected to be failed consecutively. Also, future txs of the same pipeline after a discarded txs are discarded, and their nonces reused. - Robust handling of nonces: - If geth returns nonce is too low, increase it - If geth returns nonce too hight, decrease it - If geth returns underpriced, increase gas price - If geth returns replace underpriced, increase gas price - Add support for resending transactions after a timeout - Store `BatchInfos` in a queue - Pipeline - When an error is found, stop forging batches and send a message to the coordinator to stop the pipeline with information of the failed batch number so that in a restart, non-failed batches are not repated. - When doing a reset of the stateDB, if possible reset from the local checkpoint instead of resetting from the synchronizer. This allows resetting from a batch that is valid but not yet sent / synced. - Every time a pipeline is started, assign it a number from a counter. This allows the TxManager to ignore batches from stopped pipelines, via a message sent by the coordinator. - Avoid forging when we haven't reached the rollup genesis block number. - Add config parameter `StartSlotBlocksDelay`: StartSlotBlocksDelay is the number of blocks of delay to wait before starting the pipeline when we reach a slot in which we can forge. - When detecting a reorg, only reset the pipeline if the batch from which the pipeline started changed and wasn't sent by us. - Add config parameter `ScheduleBatchBlocksAheadCheck`: ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which the forger address is checked to be allowed to forge (apart from checking the next block), used to decide when to stop scheduling new batches (by stopping the pipeline). For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck is 5, eventhough at block 11 we canForge, the pipeline will be stopped if we can't forge at block 15. This value should be the expected number of blocks it takes between scheduling a batch and having it mined. - Add config parameter `SendBatchBlocksMarginCheck`: SendBatchBlocksMarginCheck is the number of margin blocks ahead in which the coordinator is also checked to be allowed to forge, apart from the next block; used to decide when to stop sending batches to the smart contract. For example, if we are at block 10 and SendBatchBlocksMarginCheck is 5, eventhough at block 11 we canForge, the batch will be discarded if we can't forge at block 15. - Add config parameter `TxResendTimeout`: TxResendTimeout is the timeout after which a non-mined ethereum transaction will be resent (reusing the nonce) with a newly calculated gas price - Add config parameter `MaxGasPrice`: MaxGasPrice is the maximum gas price allowed for ethereum transactions - Add config parameter `NoReuseNonce`: NoReuseNonce disables reusing nonces of pending transactions for new replacement transactions. This is useful for testing with Ganache. - Extend BatchInfo with more useful information for debugging - eth / ethereum client - Add necessary methods to create the auth object for transactions manually so that we can set the nonce, gas price, gas limit, etc manually - Update `RollupForgeBatch` to take an auth object as input (so that the coordinator can set parameters manually) - synchronizer - In stats, add `NextSlot` - In stats, store full last batch instead of just last batch number - Instead of calculating a nextSlot from scratch every time, update the current struct (only updating the forger info if we are Synced) - Afer every processed batch, check that the calculated StateDB MTRoot matches the StateRoot found in the forgeBatch event.
3 years ago
Update coordinator to work better under real net - cli / node - Update handler of SIGINT so that after 3 SIGINTs, the process terminates unconditionally - coordinator - Store stats without pointer - In all functions that send a variable via channel, check for context done to avoid deadlock (due to no process reading from the channel, which has no queue) when the node is stopped. - Abstract `canForge` so that it can be used outside of the `Coordinator` - In `canForge` check the blockNumber in current and next slot. - Update tests due to smart contract changes in slot handling, and minimum bid defaults - TxManager - Add consts, vars and stats to allow evaluating `canForge` - Add `canForge` method (not used yet) - Store batch and nonces status (last success and last pending) - Track nonces internally instead of relying on the ethereum node (this is required to work with ganache when there are pending txs) - Handle the (common) case of the receipt not being found after the tx is sent. - Don't start the main loop until we get an initial messae fo the stats and vars (so that in the loop the stats and vars are set to synchronizer values) - When a tx fails, check and discard all the failed transactions before sending the message to stop the pipeline. This will avoid sending consecutive messages of stop the pipeline when multiple txs are detected to be failed consecutively. Also, future txs of the same pipeline after a discarded txs are discarded, and their nonces reused. - Robust handling of nonces: - If geth returns nonce is too low, increase it - If geth returns nonce too hight, decrease it - If geth returns underpriced, increase gas price - If geth returns replace underpriced, increase gas price - Add support for resending transactions after a timeout - Store `BatchInfos` in a queue - Pipeline - When an error is found, stop forging batches and send a message to the coordinator to stop the pipeline with information of the failed batch number so that in a restart, non-failed batches are not repated. - When doing a reset of the stateDB, if possible reset from the local checkpoint instead of resetting from the synchronizer. This allows resetting from a batch that is valid but not yet sent / synced. - Every time a pipeline is started, assign it a number from a counter. This allows the TxManager to ignore batches from stopped pipelines, via a message sent by the coordinator. - Avoid forging when we haven't reached the rollup genesis block number. - Add config parameter `StartSlotBlocksDelay`: StartSlotBlocksDelay is the number of blocks of delay to wait before starting the pipeline when we reach a slot in which we can forge. - When detecting a reorg, only reset the pipeline if the batch from which the pipeline started changed and wasn't sent by us. - Add config parameter `ScheduleBatchBlocksAheadCheck`: ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which the forger address is checked to be allowed to forge (apart from checking the next block), used to decide when to stop scheduling new batches (by stopping the pipeline). For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck is 5, eventhough at block 11 we canForge, the pipeline will be stopped if we can't forge at block 15. This value should be the expected number of blocks it takes between scheduling a batch and having it mined. - Add config parameter `SendBatchBlocksMarginCheck`: SendBatchBlocksMarginCheck is the number of margin blocks ahead in which the coordinator is also checked to be allowed to forge, apart from the next block; used to decide when to stop sending batches to the smart contract. For example, if we are at block 10 and SendBatchBlocksMarginCheck is 5, eventhough at block 11 we canForge, the batch will be discarded if we can't forge at block 15. - Add config parameter `TxResendTimeout`: TxResendTimeout is the timeout after which a non-mined ethereum transaction will be resent (reusing the nonce) with a newly calculated gas price - Add config parameter `MaxGasPrice`: MaxGasPrice is the maximum gas price allowed for ethereum transactions - Add config parameter `NoReuseNonce`: NoReuseNonce disables reusing nonces of pending transactions for new replacement transactions. This is useful for testing with Ganache. - Extend BatchInfo with more useful information for debugging - eth / ethereum client - Add necessary methods to create the auth object for transactions manually so that we can set the nonce, gas price, gas limit, etc manually - Update `RollupForgeBatch` to take an auth object as input (so that the coordinator can set parameters manually) - synchronizer - In stats, add `NextSlot` - In stats, store full last batch instead of just last batch number - Instead of calculating a nextSlot from scratch every time, update the current struct (only updating the forger info if we are Synced) - Afer every processed batch, check that the calculated StateDB MTRoot matches the StateRoot found in the forgeBatch event.
3 years ago
Update coordinator to work better under real net - cli / node - Update handler of SIGINT so that after 3 SIGINTs, the process terminates unconditionally - coordinator - Store stats without pointer - In all functions that send a variable via channel, check for context done to avoid deadlock (due to no process reading from the channel, which has no queue) when the node is stopped. - Abstract `canForge` so that it can be used outside of the `Coordinator` - In `canForge` check the blockNumber in current and next slot. - Update tests due to smart contract changes in slot handling, and minimum bid defaults - TxManager - Add consts, vars and stats to allow evaluating `canForge` - Add `canForge` method (not used yet) - Store batch and nonces status (last success and last pending) - Track nonces internally instead of relying on the ethereum node (this is required to work with ganache when there are pending txs) - Handle the (common) case of the receipt not being found after the tx is sent. - Don't start the main loop until we get an initial messae fo the stats and vars (so that in the loop the stats and vars are set to synchronizer values) - When a tx fails, check and discard all the failed transactions before sending the message to stop the pipeline. This will avoid sending consecutive messages of stop the pipeline when multiple txs are detected to be failed consecutively. Also, future txs of the same pipeline after a discarded txs are discarded, and their nonces reused. - Robust handling of nonces: - If geth returns nonce is too low, increase it - If geth returns nonce too hight, decrease it - If geth returns underpriced, increase gas price - If geth returns replace underpriced, increase gas price - Add support for resending transactions after a timeout - Store `BatchInfos` in a queue - Pipeline - When an error is found, stop forging batches and send a message to the coordinator to stop the pipeline with information of the failed batch number so that in a restart, non-failed batches are not repated. - When doing a reset of the stateDB, if possible reset from the local checkpoint instead of resetting from the synchronizer. This allows resetting from a batch that is valid but not yet sent / synced. - Every time a pipeline is started, assign it a number from a counter. This allows the TxManager to ignore batches from stopped pipelines, via a message sent by the coordinator. - Avoid forging when we haven't reached the rollup genesis block number. - Add config parameter `StartSlotBlocksDelay`: StartSlotBlocksDelay is the number of blocks of delay to wait before starting the pipeline when we reach a slot in which we can forge. - When detecting a reorg, only reset the pipeline if the batch from which the pipeline started changed and wasn't sent by us. - Add config parameter `ScheduleBatchBlocksAheadCheck`: ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which the forger address is checked to be allowed to forge (apart from checking the next block), used to decide when to stop scheduling new batches (by stopping the pipeline). For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck is 5, eventhough at block 11 we canForge, the pipeline will be stopped if we can't forge at block 15. This value should be the expected number of blocks it takes between scheduling a batch and having it mined. - Add config parameter `SendBatchBlocksMarginCheck`: SendBatchBlocksMarginCheck is the number of margin blocks ahead in which the coordinator is also checked to be allowed to forge, apart from the next block; used to decide when to stop sending batches to the smart contract. For example, if we are at block 10 and SendBatchBlocksMarginCheck is 5, eventhough at block 11 we canForge, the batch will be discarded if we can't forge at block 15. - Add config parameter `TxResendTimeout`: TxResendTimeout is the timeout after which a non-mined ethereum transaction will be resent (reusing the nonce) with a newly calculated gas price - Add config parameter `MaxGasPrice`: MaxGasPrice is the maximum gas price allowed for ethereum transactions - Add config parameter `NoReuseNonce`: NoReuseNonce disables reusing nonces of pending transactions for new replacement transactions. This is useful for testing with Ganache. - Extend BatchInfo with more useful information for debugging - eth / ethereum client - Add necessary methods to create the auth object for transactions manually so that we can set the nonce, gas price, gas limit, etc manually - Update `RollupForgeBatch` to take an auth object as input (so that the coordinator can set parameters manually) - synchronizer - In stats, add `NextSlot` - In stats, store full last batch instead of just last batch number - Instead of calculating a nextSlot from scratch every time, update the current struct (only updating the forger info if we are Synced) - Afer every processed batch, check that the calculated StateDB MTRoot matches the StateRoot found in the forgeBatch event.
3 years ago
Update coordinator to work better under real net - cli / node - Update handler of SIGINT so that after 3 SIGINTs, the process terminates unconditionally - coordinator - Store stats without pointer - In all functions that send a variable via channel, check for context done to avoid deadlock (due to no process reading from the channel, which has no queue) when the node is stopped. - Abstract `canForge` so that it can be used outside of the `Coordinator` - In `canForge` check the blockNumber in current and next slot. - Update tests due to smart contract changes in slot handling, and minimum bid defaults - TxManager - Add consts, vars and stats to allow evaluating `canForge` - Add `canForge` method (not used yet) - Store batch and nonces status (last success and last pending) - Track nonces internally instead of relying on the ethereum node (this is required to work with ganache when there are pending txs) - Handle the (common) case of the receipt not being found after the tx is sent. - Don't start the main loop until we get an initial messae fo the stats and vars (so that in the loop the stats and vars are set to synchronizer values) - When a tx fails, check and discard all the failed transactions before sending the message to stop the pipeline. This will avoid sending consecutive messages of stop the pipeline when multiple txs are detected to be failed consecutively. Also, future txs of the same pipeline after a discarded txs are discarded, and their nonces reused. - Robust handling of nonces: - If geth returns nonce is too low, increase it - If geth returns nonce too hight, decrease it - If geth returns underpriced, increase gas price - If geth returns replace underpriced, increase gas price - Add support for resending transactions after a timeout - Store `BatchInfos` in a queue - Pipeline - When an error is found, stop forging batches and send a message to the coordinator to stop the pipeline with information of the failed batch number so that in a restart, non-failed batches are not repated. - When doing a reset of the stateDB, if possible reset from the local checkpoint instead of resetting from the synchronizer. This allows resetting from a batch that is valid but not yet sent / synced. - Every time a pipeline is started, assign it a number from a counter. This allows the TxManager to ignore batches from stopped pipelines, via a message sent by the coordinator. - Avoid forging when we haven't reached the rollup genesis block number. - Add config parameter `StartSlotBlocksDelay`: StartSlotBlocksDelay is the number of blocks of delay to wait before starting the pipeline when we reach a slot in which we can forge. - When detecting a reorg, only reset the pipeline if the batch from which the pipeline started changed and wasn't sent by us. - Add config parameter `ScheduleBatchBlocksAheadCheck`: ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which the forger address is checked to be allowed to forge (apart from checking the next block), used to decide when to stop scheduling new batches (by stopping the pipeline). For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck is 5, eventhough at block 11 we canForge, the pipeline will be stopped if we can't forge at block 15. This value should be the expected number of blocks it takes between scheduling a batch and having it mined. - Add config parameter `SendBatchBlocksMarginCheck`: SendBatchBlocksMarginCheck is the number of margin blocks ahead in which the coordinator is also checked to be allowed to forge, apart from the next block; used to decide when to stop sending batches to the smart contract. For example, if we are at block 10 and SendBatchBlocksMarginCheck is 5, eventhough at block 11 we canForge, the batch will be discarded if we can't forge at block 15. - Add config parameter `TxResendTimeout`: TxResendTimeout is the timeout after which a non-mined ethereum transaction will be resent (reusing the nonce) with a newly calculated gas price - Add config parameter `MaxGasPrice`: MaxGasPrice is the maximum gas price allowed for ethereum transactions - Add config parameter `NoReuseNonce`: NoReuseNonce disables reusing nonces of pending transactions for new replacement transactions. This is useful for testing with Ganache. - Extend BatchInfo with more useful information for debugging - eth / ethereum client - Add necessary methods to create the auth object for transactions manually so that we can set the nonce, gas price, gas limit, etc manually - Update `RollupForgeBatch` to take an auth object as input (so that the coordinator can set parameters manually) - synchronizer - In stats, add `NextSlot` - In stats, store full last batch instead of just last batch number - Instead of calculating a nextSlot from scratch every time, update the current struct (only updating the forger info if we are Synced) - Afer every processed batch, check that the calculated StateDB MTRoot matches the StateRoot found in the forgeBatch event.
3 years ago
Update coordinator to work better under real net - cli / node - Update handler of SIGINT so that after 3 SIGINTs, the process terminates unconditionally - coordinator - Store stats without pointer - In all functions that send a variable via channel, check for context done to avoid deadlock (due to no process reading from the channel, which has no queue) when the node is stopped. - Abstract `canForge` so that it can be used outside of the `Coordinator` - In `canForge` check the blockNumber in current and next slot. - Update tests due to smart contract changes in slot handling, and minimum bid defaults - TxManager - Add consts, vars and stats to allow evaluating `canForge` - Add `canForge` method (not used yet) - Store batch and nonces status (last success and last pending) - Track nonces internally instead of relying on the ethereum node (this is required to work with ganache when there are pending txs) - Handle the (common) case of the receipt not being found after the tx is sent. - Don't start the main loop until we get an initial messae fo the stats and vars (so that in the loop the stats and vars are set to synchronizer values) - When a tx fails, check and discard all the failed transactions before sending the message to stop the pipeline. This will avoid sending consecutive messages of stop the pipeline when multiple txs are detected to be failed consecutively. Also, future txs of the same pipeline after a discarded txs are discarded, and their nonces reused. - Robust handling of nonces: - If geth returns nonce is too low, increase it - If geth returns nonce too hight, decrease it - If geth returns underpriced, increase gas price - If geth returns replace underpriced, increase gas price - Add support for resending transactions after a timeout - Store `BatchInfos` in a queue - Pipeline - When an error is found, stop forging batches and send a message to the coordinator to stop the pipeline with information of the failed batch number so that in a restart, non-failed batches are not repated. - When doing a reset of the stateDB, if possible reset from the local checkpoint instead of resetting from the synchronizer. This allows resetting from a batch that is valid but not yet sent / synced. - Every time a pipeline is started, assign it a number from a counter. This allows the TxManager to ignore batches from stopped pipelines, via a message sent by the coordinator. - Avoid forging when we haven't reached the rollup genesis block number. - Add config parameter `StartSlotBlocksDelay`: StartSlotBlocksDelay is the number of blocks of delay to wait before starting the pipeline when we reach a slot in which we can forge. - When detecting a reorg, only reset the pipeline if the batch from which the pipeline started changed and wasn't sent by us. - Add config parameter `ScheduleBatchBlocksAheadCheck`: ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which the forger address is checked to be allowed to forge (apart from checking the next block), used to decide when to stop scheduling new batches (by stopping the pipeline). For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck is 5, eventhough at block 11 we canForge, the pipeline will be stopped if we can't forge at block 15. This value should be the expected number of blocks it takes between scheduling a batch and having it mined. - Add config parameter `SendBatchBlocksMarginCheck`: SendBatchBlocksMarginCheck is the number of margin blocks ahead in which the coordinator is also checked to be allowed to forge, apart from the next block; used to decide when to stop sending batches to the smart contract. For example, if we are at block 10 and SendBatchBlocksMarginCheck is 5, eventhough at block 11 we canForge, the batch will be discarded if we can't forge at block 15. - Add config parameter `TxResendTimeout`: TxResendTimeout is the timeout after which a non-mined ethereum transaction will be resent (reusing the nonce) with a newly calculated gas price - Add config parameter `MaxGasPrice`: MaxGasPrice is the maximum gas price allowed for ethereum transactions - Add config parameter `NoReuseNonce`: NoReuseNonce disables reusing nonces of pending transactions for new replacement transactions. This is useful for testing with Ganache. - Extend BatchInfo with more useful information for debugging - eth / ethereum client - Add necessary methods to create the auth object for transactions manually so that we can set the nonce, gas price, gas limit, etc manually - Update `RollupForgeBatch` to take an auth object as input (so that the coordinator can set parameters manually) - synchronizer - In stats, add `NextSlot` - In stats, store full last batch instead of just last batch number - Instead of calculating a nextSlot from scratch every time, update the current struct (only updating the forger info if we are Synced) - Afer every processed batch, check that the calculated StateDB MTRoot matches the StateRoot found in the forgeBatch event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
Fix eth events query and sync inconsistent state - kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
3 years ago
  1. // Package kvdb provides a key-value database with Checkpoints & Resets system
  2. package kvdb
  3. import (
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "github.com/hermeznetwork/hermez-node/common"
  12. "github.com/hermeznetwork/hermez-node/log"
  13. "github.com/hermeznetwork/tracerr"
  14. "github.com/iden3/go-merkletree/db"
  15. "github.com/iden3/go-merkletree/db/pebble"
  16. )
  17. const (
  18. // PathBatchNum defines the subpath of the Batch Checkpoint in the
  19. // subpath of the KVDB
  20. PathBatchNum = "BatchNum"
  21. // PathCurrent defines the subpath of the current Batch in the subpath
  22. // of the KVDB
  23. PathCurrent = "current"
  24. // PathLast defines the subpath of the last Batch in the subpath
  25. // of the StateDB
  26. PathLast = "last"
  27. // DefaultKeep is the default value for the Keep parameter
  28. DefaultKeep = 128
  29. )
  30. var (
  31. // KeyCurrentBatch is used as key in the db to store the current BatchNum
  32. KeyCurrentBatch = []byte("k:currentbatch")
  33. // keyCurrentIdx is used as key in the db to store the CurrentIdx
  34. keyCurrentIdx = []byte("k:idx")
  35. // ErrNoLast is returned when the KVDB has been configured to not have
  36. // a Last checkpoint but a Last method is used
  37. ErrNoLast = fmt.Errorf("no last checkpoint")
  38. )
  39. // KVDB represents the Key-Value DB object
  40. type KVDB struct {
  41. cfg Config
  42. db *pebble.Storage
  43. // CurrentIdx holds the current Idx that the BatchBuilder is using
  44. CurrentIdx common.Idx
  45. CurrentBatch common.BatchNum
  46. m sync.Mutex
  47. mutexDelOld sync.Mutex
  48. wg sync.WaitGroup
  49. last *Last
  50. }
  51. // Last is a consistent view to the last batch of the stateDB that can
  52. // be queried concurrently.
  53. type Last struct {
  54. db *pebble.Storage
  55. path string
  56. rw sync.RWMutex
  57. }
  58. func (k *Last) setNew() error {
  59. k.rw.Lock()
  60. defer k.rw.Unlock()
  61. if k.db != nil {
  62. k.db.Close()
  63. k.db = nil
  64. }
  65. lastPath := path.Join(k.path, PathLast)
  66. if err := os.RemoveAll(lastPath); err != nil {
  67. return tracerr.Wrap(err)
  68. }
  69. db, err := pebble.NewPebbleStorage(lastPath, false)
  70. if err != nil {
  71. return tracerr.Wrap(err)
  72. }
  73. k.db = db
  74. return nil
  75. }
  76. func (k *Last) set(kvdb *KVDB, batchNum common.BatchNum) error {
  77. k.rw.Lock()
  78. defer k.rw.Unlock()
  79. if k.db != nil {
  80. k.db.Close()
  81. k.db = nil
  82. }
  83. lastPath := path.Join(k.path, PathLast)
  84. if err := kvdb.MakeCheckpointFromTo(batchNum, lastPath); err != nil {
  85. return tracerr.Wrap(err)
  86. }
  87. db, err := pebble.NewPebbleStorage(lastPath, false)
  88. if err != nil {
  89. return tracerr.Wrap(err)
  90. }
  91. k.db = db
  92. return nil
  93. }
  94. func (k *Last) close() {
  95. k.rw.Lock()
  96. defer k.rw.Unlock()
  97. if k.db != nil {
  98. k.db.Close()
  99. k.db = nil
  100. }
  101. }
  102. // Config of the KVDB
  103. type Config struct {
  104. // Path where the checkpoints will be stored
  105. Path string
  106. // Keep is the number of old checkpoints to keep. If 0, all
  107. // checkpoints are kept.
  108. Keep int
  109. // At every checkpoint, check that there are no gaps between the
  110. // checkpoints
  111. NoGapsCheck bool
  112. // NoLast skips having an opened DB with a checkpoint to the last
  113. // batchNum for thread-safe reads.
  114. NoLast bool
  115. }
  116. // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
  117. // Checkpoints older than the value defined by `keep` will be deleted.
  118. // func NewKVDB(pathDB string, keep int) (*KVDB, error) {
  119. func NewKVDB(cfg Config) (*KVDB, error) {
  120. var sto *pebble.Storage
  121. var err error
  122. sto, err = pebble.NewPebbleStorage(path.Join(cfg.Path, PathCurrent), false)
  123. if err != nil {
  124. return nil, tracerr.Wrap(err)
  125. }
  126. var last *Last
  127. if !cfg.NoLast {
  128. last = &Last{
  129. path: cfg.Path,
  130. }
  131. }
  132. kvdb := &KVDB{
  133. cfg: cfg,
  134. db: sto,
  135. last: last,
  136. }
  137. // load currentBatch
  138. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  139. if err != nil {
  140. return nil, tracerr.Wrap(err)
  141. }
  142. // make reset (get checkpoint) at currentBatch
  143. err = kvdb.reset(kvdb.CurrentBatch, true)
  144. if err != nil {
  145. return nil, tracerr.Wrap(err)
  146. }
  147. return kvdb, nil
  148. }
  149. // LastRead is a thread-safe method to query the last KVDB
  150. func (k *KVDB) LastRead(fn func(db *pebble.Storage) error) error {
  151. if k.last == nil {
  152. return tracerr.Wrap(ErrNoLast)
  153. }
  154. k.last.rw.RLock()
  155. defer k.last.rw.RUnlock()
  156. return fn(k.last.db)
  157. }
  158. // DB returns the *pebble.Storage from the KVDB
  159. func (k *KVDB) DB() *pebble.Storage {
  160. return k.db
  161. }
  162. // StorageWithPrefix returns the db.Storage with the given prefix from the
  163. // current KVDB
  164. func (k *KVDB) StorageWithPrefix(prefix []byte) db.Storage {
  165. return k.db.WithPrefix(prefix)
  166. }
  167. // Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  168. // not delete the checkpoints between old current and the new current, those
  169. // checkpoints will remain in the storage, and eventually will be deleted when
  170. // MakeCheckpoint overwrites them.
  171. func (k *KVDB) Reset(batchNum common.BatchNum) error {
  172. return k.reset(batchNum, true)
  173. }
  174. // reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  175. // not delete the checkpoints between old current and the new current, those
  176. // checkpoints will remain in the storage, and eventually will be deleted when
  177. // MakeCheckpoint overwrites them. `closeCurrent` will close the currently
  178. // opened db before doing the reset.
  179. func (k *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
  180. currentPath := path.Join(k.cfg.Path, PathCurrent)
  181. if closeCurrent && k.db != nil {
  182. k.db.Close()
  183. k.db = nil
  184. }
  185. // remove 'current'
  186. if err := os.RemoveAll(currentPath); err != nil {
  187. return tracerr.Wrap(err)
  188. }
  189. // remove all checkpoints > batchNum
  190. list, err := k.ListCheckpoints()
  191. if err != nil {
  192. return tracerr.Wrap(err)
  193. }
  194. // Find first batch that is greater than batchNum, and delete
  195. // everything after that
  196. start := 0
  197. for ; start < len(list); start++ {
  198. if common.BatchNum(list[start]) > batchNum {
  199. break
  200. }
  201. }
  202. for _, bn := range list[start:] {
  203. if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  204. return tracerr.Wrap(err)
  205. }
  206. }
  207. if batchNum == 0 {
  208. // if batchNum == 0, open the new fresh 'current'
  209. sto, err := pebble.NewPebbleStorage(currentPath, false)
  210. if err != nil {
  211. return tracerr.Wrap(err)
  212. }
  213. k.db = sto
  214. k.CurrentIdx = common.RollupConstReservedIDx // 255
  215. k.CurrentBatch = 0
  216. if k.last != nil {
  217. if err := k.last.setNew(); err != nil {
  218. return tracerr.Wrap(err)
  219. }
  220. }
  221. return nil
  222. }
  223. // copy 'batchNum' to 'current'
  224. if err := k.MakeCheckpointFromTo(batchNum, currentPath); err != nil {
  225. return tracerr.Wrap(err)
  226. }
  227. // copy 'batchNum' to 'last'
  228. if k.last != nil {
  229. if err := k.last.set(k, batchNum); err != nil {
  230. return tracerr.Wrap(err)
  231. }
  232. }
  233. // open the new 'current'
  234. sto, err := pebble.NewPebbleStorage(currentPath, false)
  235. if err != nil {
  236. return tracerr.Wrap(err)
  237. }
  238. k.db = sto
  239. // get currentBatch num
  240. k.CurrentBatch, err = k.GetCurrentBatch()
  241. if err != nil {
  242. return tracerr.Wrap(err)
  243. }
  244. // idx is obtained from the statedb reset
  245. k.CurrentIdx, err = k.GetCurrentIdx()
  246. if err != nil {
  247. return tracerr.Wrap(err)
  248. }
  249. return nil
  250. }
  251. // ResetFromSynchronizer performs a reset in the KVDB getting the state from
  252. // synchronizerKVDB for the given batchNum.
  253. func (k *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error {
  254. if synchronizerKVDB == nil {
  255. return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil"))
  256. }
  257. currentPath := path.Join(k.cfg.Path, PathCurrent)
  258. if k.db != nil {
  259. k.db.Close()
  260. k.db = nil
  261. }
  262. // remove 'current'
  263. if err := os.RemoveAll(currentPath); err != nil {
  264. return tracerr.Wrap(err)
  265. }
  266. // remove all checkpoints
  267. list, err := k.ListCheckpoints()
  268. if err != nil {
  269. return tracerr.Wrap(err)
  270. }
  271. for _, bn := range list {
  272. if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  273. return tracerr.Wrap(err)
  274. }
  275. }
  276. if batchNum == 0 {
  277. // if batchNum == 0, open the new fresh 'current'
  278. sto, err := pebble.NewPebbleStorage(currentPath, false)
  279. if err != nil {
  280. return tracerr.Wrap(err)
  281. }
  282. k.db = sto
  283. k.CurrentIdx = common.RollupConstReservedIDx // 255
  284. k.CurrentBatch = 0
  285. return nil
  286. }
  287. checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  288. // copy synchronizer 'BatchNumX' to 'BatchNumX'
  289. if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil {
  290. return tracerr.Wrap(err)
  291. }
  292. // copy 'BatchNumX' to 'current'
  293. err = k.MakeCheckpointFromTo(batchNum, currentPath)
  294. if err != nil {
  295. return tracerr.Wrap(err)
  296. }
  297. // open the new 'current'
  298. sto, err := pebble.NewPebbleStorage(currentPath, false)
  299. if err != nil {
  300. return tracerr.Wrap(err)
  301. }
  302. k.db = sto
  303. // get currentBatch num
  304. k.CurrentBatch, err = k.GetCurrentBatch()
  305. if err != nil {
  306. return tracerr.Wrap(err)
  307. }
  308. // get currentIdx
  309. k.CurrentIdx, err = k.GetCurrentIdx()
  310. if err != nil {
  311. return tracerr.Wrap(err)
  312. }
  313. return nil
  314. }
  315. // GetCurrentBatch returns the current BatchNum stored in the KVDB
  316. func (k *KVDB) GetCurrentBatch() (common.BatchNum, error) {
  317. cbBytes, err := k.db.Get(KeyCurrentBatch)
  318. if tracerr.Unwrap(err) == db.ErrNotFound {
  319. return 0, nil
  320. }
  321. if err != nil {
  322. return 0, tracerr.Wrap(err)
  323. }
  324. return common.BatchNumFromBytes(cbBytes)
  325. }
  326. // setCurrentBatch stores the current BatchNum in the KVDB
  327. func (k *KVDB) setCurrentBatch() error {
  328. tx, err := k.db.NewTx()
  329. if err != nil {
  330. return tracerr.Wrap(err)
  331. }
  332. err = tx.Put(KeyCurrentBatch, k.CurrentBatch.Bytes())
  333. if err != nil {
  334. return tracerr.Wrap(err)
  335. }
  336. if err := tx.Commit(); err != nil {
  337. return tracerr.Wrap(err)
  338. }
  339. return nil
  340. }
  341. // GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
  342. // used for an Account in the k.
  343. func (k *KVDB) GetCurrentIdx() (common.Idx, error) {
  344. idxBytes, err := k.db.Get(keyCurrentIdx)
  345. if tracerr.Unwrap(err) == db.ErrNotFound {
  346. return common.RollupConstReservedIDx, nil // 255, nil
  347. }
  348. if err != nil {
  349. return 0, tracerr.Wrap(err)
  350. }
  351. return common.IdxFromBytes(idxBytes[:])
  352. }
  353. // SetCurrentIdx stores Idx in the KVDB
  354. func (k *KVDB) SetCurrentIdx(idx common.Idx) error {
  355. k.CurrentIdx = idx
  356. tx, err := k.db.NewTx()
  357. if err != nil {
  358. return tracerr.Wrap(err)
  359. }
  360. idxBytes, err := idx.Bytes()
  361. if err != nil {
  362. return tracerr.Wrap(err)
  363. }
  364. err = tx.Put(keyCurrentIdx, idxBytes[:])
  365. if err != nil {
  366. return tracerr.Wrap(err)
  367. }
  368. if err := tx.Commit(); err != nil {
  369. return tracerr.Wrap(err)
  370. }
  371. return nil
  372. }
  373. // MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
  374. // Internally this advances & stores the current BatchNum, and then stores a
  375. // Checkpoint of the current state of the k.
  376. func (k *KVDB) MakeCheckpoint() error {
  377. // advance currentBatch
  378. k.CurrentBatch++
  379. checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, k.CurrentBatch))
  380. if err := k.setCurrentBatch(); err != nil {
  381. return tracerr.Wrap(err)
  382. }
  383. // if checkpoint BatchNum already exist in disk, delete it
  384. if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
  385. } else if err != nil {
  386. return tracerr.Wrap(err)
  387. } else {
  388. if err := os.RemoveAll(checkpointPath); err != nil {
  389. return tracerr.Wrap(err)
  390. }
  391. }
  392. // execute Checkpoint
  393. if err := k.db.Pebble().Checkpoint(checkpointPath); err != nil {
  394. return tracerr.Wrap(err)
  395. }
  396. // copy 'CurrentBatch' to 'last'
  397. if k.last != nil {
  398. if err := k.last.set(k, k.CurrentBatch); err != nil {
  399. return tracerr.Wrap(err)
  400. }
  401. }
  402. k.wg.Add(1)
  403. go func() {
  404. delErr := k.DeleteOldCheckpoints()
  405. if delErr != nil {
  406. log.Errorw("delete old checkpoints failed", "err", delErr)
  407. }
  408. k.wg.Done()
  409. }()
  410. return nil
  411. }
  412. // CheckpointExists returns true if the checkpoint exists
  413. func (k *KVDB) CheckpointExists(batchNum common.BatchNum) (bool, error) {
  414. source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  415. if _, err := os.Stat(source); os.IsNotExist(err) {
  416. return false, nil
  417. } else if err != nil {
  418. return false, tracerr.Wrap(err)
  419. }
  420. return true, nil
  421. }
  422. // DeleteCheckpoint removes if exist the checkpoint of the given batchNum
  423. func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
  424. checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  425. if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
  426. return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
  427. } else if err != nil {
  428. return tracerr.Wrap(err)
  429. }
  430. return os.RemoveAll(checkpointPath)
  431. }
  432. // ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
  433. // If there's a gap between the list of checkpoints, an error is returned.
  434. func (k *KVDB) ListCheckpoints() ([]int, error) {
  435. files, err := ioutil.ReadDir(k.cfg.Path)
  436. if err != nil {
  437. return nil, tracerr.Wrap(err)
  438. }
  439. checkpoints := []int{}
  440. var checkpoint int
  441. pattern := fmt.Sprintf("%s%%d", PathBatchNum)
  442. for _, file := range files {
  443. fileName := file.Name()
  444. if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) {
  445. if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil {
  446. return nil, tracerr.Wrap(err)
  447. }
  448. checkpoints = append(checkpoints, checkpoint)
  449. }
  450. }
  451. sort.Ints(checkpoints)
  452. if !k.cfg.NoGapsCheck && len(checkpoints) > 0 {
  453. first := checkpoints[0]
  454. for _, checkpoint := range checkpoints[1:] {
  455. first++
  456. if checkpoint != first {
  457. log.Errorw("gap between checkpoints", "checkpoints", checkpoints)
  458. return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint))
  459. }
  460. }
  461. }
  462. return checkpoints, nil
  463. }
  464. // DeleteOldCheckpoints deletes old checkpoints when there are more than
  465. // `s.keep` checkpoints
  466. func (k *KVDB) DeleteOldCheckpoints() error {
  467. k.mutexDelOld.Lock()
  468. defer k.mutexDelOld.Unlock()
  469. list, err := k.ListCheckpoints()
  470. if err != nil {
  471. return tracerr.Wrap(err)
  472. }
  473. if k.cfg.Keep > 0 && len(list) > k.cfg.Keep {
  474. for _, checkpoint := range list[:len(list)-k.cfg.Keep] {
  475. if err := k.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
  476. return tracerr.Wrap(err)
  477. }
  478. }
  479. }
  480. return nil
  481. }
  482. // MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum
  483. // to the dest folder. This method is locking, so it can be called from
  484. // multiple places at the same time.
  485. func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error {
  486. source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum))
  487. if _, err := os.Stat(source); os.IsNotExist(err) {
  488. // if kvdb does not have checkpoint at batchNum, return err
  489. return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source))
  490. } else if err != nil {
  491. return tracerr.Wrap(err)
  492. }
  493. // By locking we allow calling MakeCheckpointFromTo from multiple
  494. // places at the same time for the same stateDB. This allows the
  495. // synchronizer to do a reset to a batchNum at the same time as the
  496. // pipeline is doing a txSelector.Reset and batchBuilder.Reset from
  497. // synchronizer to the same batchNum
  498. k.m.Lock()
  499. defer k.m.Unlock()
  500. return PebbleMakeCheckpoint(source, dest)
  501. }
  502. // PebbleMakeCheckpoint is a hepler function to make a pebble checkpoint from
  503. // source to dest.
  504. func PebbleMakeCheckpoint(source, dest string) error {
  505. // Remove dest folder (if it exists) before doing the checkpoint
  506. if _, err := os.Stat(dest); os.IsNotExist(err) {
  507. } else if err != nil {
  508. return tracerr.Wrap(err)
  509. } else {
  510. if err := os.RemoveAll(dest); err != nil {
  511. return tracerr.Wrap(err)
  512. }
  513. }
  514. sto, err := pebble.NewPebbleStorage(source, false)
  515. if err != nil {
  516. return tracerr.Wrap(err)
  517. }
  518. defer sto.Close()
  519. // execute Checkpoint
  520. err = sto.Pebble().Checkpoint(dest)
  521. if err != nil {
  522. return tracerr.Wrap(err)
  523. }
  524. return nil
  525. }
  526. // Close the DB
  527. func (k *KVDB) Close() {
  528. if k.db != nil {
  529. k.db.Close()
  530. k.db = nil
  531. }
  532. if k.last != nil {
  533. k.last.close()
  534. }
  535. // wait for deletion of old checkpoints
  536. k.wg.Wait()
  537. }