diff --git a/common/account.go b/common/account.go index 4472854..6ebde88 100644 --- a/common/account.go +++ b/common/account.go @@ -263,3 +263,13 @@ type IdxNonce struct { Idx Idx `db:"idx"` Nonce Nonce `db:"nonce"` } + +// AccountUpdate represents an account balance and/or nonce update after a +// processed batch +type AccountUpdate struct { + EthBlockNum int64 `meddler:"eth_block_num"` + BatchNum BatchNum `meddler:"batch_num"` + Idx Idx `meddler:"idx"` + Nonce Nonce `meddler:"nonce"` + Balance *big.Int `meddler:"balance,bigint"` +} diff --git a/common/batch.go b/common/batch.go index 972b53a..b6c146b 100644 --- a/common/batch.go +++ b/common/batch.go @@ -77,6 +77,7 @@ type BatchData struct { L1CoordinatorTxs []L1Tx L2Txs []L2Tx CreatedAccounts []Account + UpdatedAccounts []AccountUpdate ExitTree []ExitInfo Batch Batch } diff --git a/config/config.go b/config/config.go index 81f33ed..76a1f13 100644 --- a/config/config.go +++ b/config/config.go @@ -232,6 +232,11 @@ type Node struct { // `Eth.LastBatch`). This value only affects the reported % of // synchronization of blocks and batches, nothing else. StatsRefreshPeriod Duration `validate:"required"` + // StoreAccountUpdates when set to true makes the synchronizer + // store every account update in the account_update SQL table. + // This allows querying nonces and balances from the HistoryDB + // via SQL. + StoreAccountUpdates bool } `validate:"required"` SmartContracts struct { // Rollup is the address of the Hermez.sol smart contract diff --git a/db/historydb/historydb.go b/db/historydb/historydb.go index b59d96f..2f67c57 100644 --- a/db/historydb/historydb.go +++ b/db/historydb/historydb.go @@ -528,6 +528,37 @@ func (hdb *HistoryDB) GetAllAccounts() ([]common.Account, error) { return db.SlicePtrsToSlice(accs).([]common.Account), tracerr.Wrap(err) } +// AddAccountUpdates inserts accUpdates into the DB +func (hdb *HistoryDB) AddAccountUpdates(accUpdates []common.AccountUpdate) error { + return tracerr.Wrap(hdb.addAccountUpdates(hdb.db, accUpdates)) +} +func (hdb *HistoryDB) addAccountUpdates(d meddler.DB, accUpdates []common.AccountUpdate) error { + if len(accUpdates) == 0 { + return nil + } + return tracerr.Wrap(db.BulkInsert( + d, + `INSERT INTO account_update ( + eth_block_num, + batch_num, + idx, + nonce, + balance + ) VALUES %s;`, + accUpdates, + )) +} + +// GetAllAccountUpdates returns all the AccountUpdate from the DB +func (hdb *HistoryDB) GetAllAccountUpdates() ([]common.AccountUpdate, error) { + var accUpdates []*common.AccountUpdate + err := meddler.QueryAll( + hdb.db, &accUpdates, + "SELECT eth_block_num, batch_num, idx, nonce, balance FROM account_update ORDER BY idx;", + ) + return db.SlicePtrsToSlice(accUpdates).([]common.AccountUpdate), tracerr.Wrap(err) +} + // AddL1Txs inserts L1 txs to the DB. USD and DepositAmountUSD will be set automatically before storing the tx. // If the tx is originated by a coordinator, BatchNum must be provided. If it's originated by a user, // BatchNum should be null, and the value will be setted by a trigger when a batch forges the tx. @@ -1018,6 +1049,11 @@ func (hdb *HistoryDB) AddBlockSCData(blockData *common.BlockData) (err error) { return tracerr.Wrap(err) } + // Add accountBalances if it exists + if err := hdb.addAccountUpdates(txn, batch.UpdatedAccounts); err != nil { + return tracerr.Wrap(err) + } + // Set the EffectiveAmount and EffectiveDepositAmount of all the // L1UserTxs that have been forged in this batch if err = hdb.setExtraInfoForgedL1UserTxs(txn, batch.L1UserTxs); err != nil { diff --git a/db/historydb/historydb_test.go b/db/historydb/historydb_test.go index ddc38eb..da797f0 100644 --- a/db/historydb/historydb_test.go +++ b/db/historydb/historydb_test.go @@ -377,6 +377,22 @@ func TestAccounts(t *testing.T) { accs[i].Balance = nil assert.Equal(t, accs[i], acc) } + // Test AccountBalances + accUpdates := make([]common.AccountUpdate, len(accs)) + for i, acc := range accs { + accUpdates[i] = common.AccountUpdate{ + EthBlockNum: batches[acc.BatchNum-1].EthBlockNum, + BatchNum: acc.BatchNum, + Idx: acc.Idx, + Nonce: common.Nonce(i), + Balance: big.NewInt(int64(i)), + } + } + err = historyDB.AddAccountUpdates(accUpdates) + require.NoError(t, err) + fetchedAccBalances, err := historyDB.GetAllAccountUpdates() + require.NoError(t, err) + assert.Equal(t, accUpdates, fetchedAccBalances) } func TestTxs(t *testing.T) { diff --git a/db/migrations/0001.sql b/db/migrations/0001.sql index 1b2854f..6064089 100644 --- a/db/migrations/0001.sql +++ b/db/migrations/0001.sql @@ -100,6 +100,15 @@ CREATE TABLE account ( eth_addr BYTEA NOT NULL ); +CREATE TABLE account_update ( + item_id SERIAL, + eth_block_num BIGINT NOT NULL REFERENCES block (eth_block_num) ON DELETE CASCADE, + batch_num BIGINT NOT NULL REFERENCES batch (batch_num) ON DELETE CASCADE, + idx BIGINT NOT NULL REFERENCES account (idx) ON DELETE CASCADE, + nonce BIGINT NOT NULL, + balance BYTEA NOT NULL +); + CREATE TABLE exit_tree ( item_id SERIAL PRIMARY KEY, batch_num BIGINT REFERENCES batch (batch_num) ON DELETE CASCADE, @@ -674,6 +683,7 @@ DROP TABLE token_exchange; DROP TABLE wdelayer_vars; DROP TABLE tx; DROP TABLE exit_tree; +DROP TABLE account_update; DROP TABLE account; DROP TABLE token; DROP TABLE bid; diff --git a/node/node.go b/node/node.go index 4d28ec8..5bc7f0f 100644 --- a/node/node.go +++ b/node/node.go @@ -183,8 +183,9 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { } sync, err := synchronizer.NewSynchronizer(client, historyDB, stateDB, synchronizer.Config{ - StatsRefreshPeriod: cfg.Synchronizer.StatsRefreshPeriod.Duration, - ChainID: chainIDU16, + StatsRefreshPeriod: cfg.Synchronizer.StatsRefreshPeriod.Duration, + StoreAccountUpdates: cfg.Synchronizer.StoreAccountUpdates, + ChainID: chainIDU16, }) if err != nil { return nil, tracerr.Wrap(err) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 9606188..f2814c0 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -206,8 +206,9 @@ type SCConsts struct { // Config is the Synchronizer configuration type Config struct { - StatsRefreshPeriod time.Duration - ChainID uint16 + StatsRefreshPeriod time.Duration + StoreAccountUpdates bool + ChainID uint16 } // Synchronizer implements the Synchronizer type @@ -993,6 +994,21 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e } batchData.CreatedAccounts = processTxsOut.CreatedAccounts + if s.cfg.StoreAccountUpdates { + batchData.UpdatedAccounts = make([]common.AccountUpdate, 0, + len(processTxsOut.UpdatedAccounts)) + for _, acc := range processTxsOut.UpdatedAccounts { + batchData.UpdatedAccounts = append(batchData.UpdatedAccounts, + common.AccountUpdate{ + EthBlockNum: blockNum, + BatchNum: batchNum, + Idx: acc.Idx, + Nonce: acc.Nonce, + Balance: acc.Balance, + }) + } + } + slotNum := int64(0) if ethBlock.Num >= s.consts.Auction.GenesisBlockNum { slotNum = (ethBlock.Num - s.consts.Auction.GenesisBlockNum) / diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index a0aa818..f22b3d1 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -171,6 +171,8 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc *exit = syncBatch.ExitTree[j] } assert.Equal(t, batch.Batch, syncBatch.Batch) + // Ignore updated accounts + syncBatch.UpdatedAccounts = nil assert.Equal(t, batch, syncBatch) assert.Equal(t, &batch.Batch, dbBatch) //nolint:gosec @@ -344,7 +346,8 @@ func TestSyncGeneral(t *testing.T) { // Create Synchronizer s, err := NewSynchronizer(client, historyDB, stateDB, Config{ - StatsRefreshPeriod: 0 * time.Second, + StatsRefreshPeriod: 0 * time.Second, + StoreAccountUpdates: true, }) require.NoError(t, err) @@ -735,7 +738,8 @@ func TestSyncForgerCommitment(t *testing.T) { // Create Synchronizer s, err := NewSynchronizer(client, historyDB, stateDB, Config{ - StatsRefreshPeriod: 0 * time.Second, + StatsRefreshPeriod: 0 * time.Second, + StoreAccountUpdates: true, }) require.NoError(t, err) @@ -835,7 +839,8 @@ func TestSyncForgerCommitment(t *testing.T) { syncCommitment[syncBlock.Block.Num] = stats.Sync.Auction.CurrentSlot.ForgerCommitment s2, err := NewSynchronizer(client, historyDB, stateDB, Config{ - StatsRefreshPeriod: 0 * time.Second, + StatsRefreshPeriod: 0 * time.Second, + StoreAccountUpdates: true, }) require.NoError(t, err) stats = s2.Stats() diff --git a/txprocessor/txprocessor.go b/txprocessor/txprocessor.go index da3632e..d1dc417 100644 --- a/txprocessor/txprocessor.go +++ b/txprocessor/txprocessor.go @@ -27,6 +27,9 @@ type TxProcessor struct { // AccumulatedFees contains the accumulated fees for each token (Coord // Idx) in the processed batch AccumulatedFees map[common.Idx]*big.Int + // updatedAccounts stores the last version of the account when it has + // been created/updated by any of the processed transactions. + updatedAccounts map[common.Idx]*common.Account config Config } @@ -55,6 +58,9 @@ type ProcessTxOutput struct { CreatedAccounts []common.Account CoordinatorIdxsMap map[common.TokenID]common.Idx CollectedFees map[common.TokenID]*big.Int + // UpdatedAccounts returns the current state of each account + // created/updated by any of the processed transactions. + UpdatedAccounts map[common.Idx]*common.Account } func newErrorNotEnoughBalance(tx common.Tx) error { @@ -127,6 +133,10 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat return nil, tracerr.Wrap(fmt.Errorf("L1UserTx + L1CoordinatorTx (%d) can not be bigger than MaxL1Tx (%d)", len(l1usertxs)+len(l1coordinatortxs), tp.config.MaxTx)) } + if tp.s.Type() == statedb.TypeSynchronizer { + tp.updatedAccounts = make(map[common.Idx]*common.Account) + } + exits := make([]processedExit, nTx) if tp.s.Type() == statedb.TypeBatchBuilder { @@ -382,7 +392,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat tp.zki.EthAddr3[iFee] = common.EthAddrToBigInt(accCoord.EthAddr) } accCoord.Balance = new(big.Int).Add(accCoord.Balance, accumulatedFee) - pFee, err := tp.s.UpdateAccount(idx, accCoord) + pFee, err := tp.updateAccount(idx, accCoord) if err != nil { log.Error(err) return nil, tracerr.Wrap(err) @@ -439,7 +449,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat } } - // retuTypeexitInfos, createdAccounts and collectedFees, so Synchronizer will + // retun exitInfos, createdAccounts and collectedFees, so Synchronizer will // be able to store it into HistoryDB for the concrete BatchNum return &ProcessTxOutput{ ZKInputs: nil, @@ -447,6 +457,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat CreatedAccounts: createdAccounts, CoordinatorIdxsMap: coordIdxsMap, CollectedFees: collectedFees, + UpdatedAccounts: tp.updatedAccounts, }, nil } @@ -741,7 +752,7 @@ func (tp *TxProcessor) applyCreateAccount(tx *common.L1Tx) error { EthAddr: tx.FromEthAddr, } - p, err := tp.s.CreateAccount(common.Idx(tp.s.CurrentIdx()+1), account) + p, err := tp.createAccount(common.Idx(tp.s.CurrentIdx()+1), account) if err != nil { return tracerr.Wrap(err) } @@ -776,6 +787,28 @@ func (tp *TxProcessor) applyCreateAccount(tx *common.L1Tx) error { return tp.s.SetCurrentIdx(tp.s.CurrentIdx() + 1) } +// createAccount is a wrapper over the StateDB.CreateAccount method that also +// stores the created account in the updatedAccounts map in case the StateDB is +// of TypeSynchronizer +func (tp *TxProcessor) createAccount(idx common.Idx, account *common.Account) (*merkletree.CircomProcessorProof, error) { + if tp.s.Type() == statedb.TypeSynchronizer { + account.Idx = idx + tp.updatedAccounts[idx] = account + } + return tp.s.CreateAccount(idx, account) +} + +// updateAccount is a wrapper over the StateDB.UpdateAccount method that also +// stores the updated account in the updatedAccounts map in case the StateDB is +// of TypeSynchronizer +func (tp *TxProcessor) updateAccount(idx common.Idx, account *common.Account) (*merkletree.CircomProcessorProof, error) { + if tp.s.Type() == statedb.TypeSynchronizer { + account.Idx = idx + tp.updatedAccounts[idx] = account + } + return tp.s.UpdateAccount(idx, account) +} + // applyDeposit updates the balance in the account of the depositer, if // andTransfer parameter is set to true, the method will also apply the // Transfer of the L1Tx/DepositTransfer @@ -806,7 +839,7 @@ func (tp *TxProcessor) applyDeposit(tx *common.L1Tx, transfer bool) error { } // update sender account in localStateDB - p, err := tp.s.UpdateAccount(tx.FromIdx, accSender) + p, err := tp.updateAccount(tx.FromIdx, accSender) if err != nil { return tracerr.Wrap(err) } @@ -843,7 +876,7 @@ func (tp *TxProcessor) applyDeposit(tx *common.L1Tx, transfer bool) error { accReceiver.Balance = new(big.Int).Add(accReceiver.Balance, tx.EffectiveAmount) // update receiver account in localStateDB - p, err := tp.s.UpdateAccount(tx.ToIdx, accReceiver) + p, err := tp.updateAccount(tx.ToIdx, accReceiver) if err != nil { return tracerr.Wrap(err) } @@ -926,7 +959,7 @@ func (tp *TxProcessor) applyTransfer(coordIdxsMap map[common.TokenID]common.Idx, } // update sender account in localStateDB - pSender, err := tp.s.UpdateAccount(tx.FromIdx, accSender) + pSender, err := tp.updateAccount(tx.FromIdx, accSender) if err != nil { log.Error(err) return tracerr.Wrap(err) @@ -965,7 +998,7 @@ func (tp *TxProcessor) applyTransfer(coordIdxsMap map[common.TokenID]common.Idx, accReceiver.Balance = new(big.Int).Add(accReceiver.Balance, tx.Amount) // update receiver account in localStateDB - pReceiver, err := tp.s.UpdateAccount(auxToIdx, accReceiver) + pReceiver, err := tp.updateAccount(auxToIdx, accReceiver) if err != nil { return tracerr.Wrap(err) } @@ -1008,7 +1041,7 @@ func (tp *TxProcessor) applyCreateAccountDepositTransfer(tx *common.L1Tx) error } // create Account of the Sender - p, err := tp.s.CreateAccount(common.Idx(tp.s.CurrentIdx()+1), accSender) + p, err := tp.createAccount(common.Idx(tp.s.CurrentIdx()+1), accSender) if err != nil { return tracerr.Wrap(err) } @@ -1056,7 +1089,7 @@ func (tp *TxProcessor) applyCreateAccountDepositTransfer(tx *common.L1Tx) error accReceiver.Balance = new(big.Int).Add(accReceiver.Balance, tx.EffectiveAmount) // update receiver account in localStateDB - p, err = tp.s.UpdateAccount(tx.ToIdx, accReceiver) + p, err = tp.updateAccount(tx.ToIdx, accReceiver) if err != nil { return tracerr.Wrap(err) } @@ -1130,7 +1163,7 @@ func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx, } } - p, err := tp.s.UpdateAccount(tx.FromIdx, acc) + p, err := tp.updateAccount(tx.FromIdx, acc) if err != nil { return nil, false, tracerr.Wrap(err) } diff --git a/txprocessor/txprocessor_test.go b/txprocessor/txprocessor_test.go index 7b39838..f8825cc 100644 --- a/txprocessor/txprocessor_test.go +++ b/txprocessor/txprocessor_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "math/big" "os" + "sort" "testing" ethCommon "github.com/ethereum/go-ethereum/common" @@ -1006,3 +1007,103 @@ func TestExitOf0Amount(t *testing.T) { require.NoError(t, err) assert.Equal(t, "0", ptOut.ZKInputs.Metadata.NewExitRootRaw.BigInt().String()) } + +func TestUpdatedAccounts(t *testing.T) { + dir, err := ioutil.TempDir("", "tmpdb") + require.NoError(t, err) + defer assert.NoError(t, os.RemoveAll(dir)) + + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) + assert.NoError(t, err) + + set := ` +Type: Blockchain +AddToken(1) +CreateAccountCoordinator(0) Coord // 256 +CreateAccountCoordinator(1) Coord // 257 +> batch // 1 +CreateAccountDeposit(0) A: 50 // 258 +CreateAccountDeposit(0) B: 60 // 259 +CreateAccountDeposit(1) A: 70 // 260 +CreateAccountDeposit(1) B: 80 // 261 +> batchL1 // 2 +> batchL1 // 3 +Transfer(0) A-B: 5 (126) +> batch // 4 +Exit(1) B: 5 (126) +> batch // 5 +> block + ` + + chainID := uint16(0) + tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx) + blocks, err := tc.GenerateBlocks(set) + require.NoError(t, err) + tilCfgExtra := til.ConfigExtra{ + BootCoordAddr: ethCommon.HexToAddress("0xE39fEc6224708f0772D2A74fd3f9055A90E0A9f2"), + CoordUser: "Coord", + } + err = tc.FillBlocksExtra(blocks, &tilCfgExtra) + require.NoError(t, err) + tc.FillBlocksL1UserTxsBatchNum(blocks) + err = tc.FillBlocksForgedL1UserTxs(blocks) + require.NoError(t, err) + + require.Equal(t, 5, len(blocks[0].Rollup.Batches)) + + config := Config{ + NLevels: 32, + MaxFeeTx: 64, + MaxTx: 512, + MaxL1Tx: 16, + ChainID: chainID, + } + tp := NewTxProcessor(sdb, config) + + sortedKeys := func(m map[common.Idx]*common.Account) []int { + keys := make([]int, 0) + for k := range m { + keys = append(keys, int(k)) + } + sort.Ints(keys) + return keys + } + + for _, batch := range blocks[0].Rollup.Batches { + l2Txs := common.L2TxsToPoolL2Txs(batch.L2Txs) + ptOut, err := tp.ProcessTxs(batch.Batch.FeeIdxsCoordinator, batch.L1UserTxs, + batch.L1CoordinatorTxs, l2Txs) + require.NoError(t, err) + switch batch.Batch.BatchNum { + case 1: + assert.Equal(t, 2, len(ptOut.UpdatedAccounts)) + assert.Equal(t, []int{256, 257}, sortedKeys(ptOut.UpdatedAccounts)) + case 2: + assert.Equal(t, 0, len(ptOut.UpdatedAccounts)) + assert.Equal(t, []int{}, sortedKeys(ptOut.UpdatedAccounts)) + case 3: + assert.Equal(t, 4, len(ptOut.UpdatedAccounts)) + assert.Equal(t, []int{258, 259, 260, 261}, sortedKeys(ptOut.UpdatedAccounts)) + case 4: + assert.Equal(t, 2+1, len(ptOut.UpdatedAccounts)) + assert.Equal(t, []int{256, 258, 259}, sortedKeys(ptOut.UpdatedAccounts)) + case 5: + assert.Equal(t, 1+1, len(ptOut.UpdatedAccounts)) + assert.Equal(t, []int{257, 261}, sortedKeys(ptOut.UpdatedAccounts)) + } + for idx, updAcc := range ptOut.UpdatedAccounts { + acc, err := sdb.GetAccount(idx) + require.NoError(t, err) + // If acc.Balance is 0, set it to 0 with big.NewInt so + // that the comparison succeeds. Without this, the + // comparison will not succeed because acc.Balance is + // set from a slice, and thus the internal big.Int + // buffer is not nil (big.Int.abs) + if acc.Balance.BitLen() == 0 { + acc.Balance = big.NewInt(0) + } + assert.Equal(t, acc, updAcc) + } + } +}