Multithread for adding blocks with txs

ldb-add-txs
kompotkot 2022-03-14 19:26:59 +00:00
rodzic ea91a4deb6
commit 7c90baebf1
3 zmienionych plików z 92 dodań i 27 usunięć

Wyświetl plik

@ -27,25 +27,76 @@ func (cb *CorruptBlocks) registerCorruptBlock(number uint64, source, description
} }
// Add new blocks with transactions to database // Add new blocks with transactions to database
func add(blockchain string, blockNumbers []uint64) error { func add(blockchain string, blockNumbers []uint64, workers int) error {
for _, bn := range blockNumbers { jobsCh := make(chan Job, workers)
block, err := localConnections.getChainBlock(bn) resultCh := make(chan Result, len(blockNumbers))
if err != nil { doneCh := make(chan struct{}, workers)
description := fmt.Sprintf("Unable to get block: %d from chain, err %v", bn, err)
fmt.Println(description) // Add jobs
corruptBlocks.registerCorruptBlock(bn, "blockchain", description) go func() {
continue for _, bn := range blockNumbers {
jobsCh <- Job{
BlockNumber: bn,
Results: resultCh,
}
} }
td := localConnections.Chain.GetTd(block.Hash(), block.NumberU64()) close(jobsCh)
}()
chainTxs := localConnections.getChainTxs(block.Hash(), bn) for i := 0; i < workers; i++ {
// Do jobs
go func() {
for job := range jobsCh {
block, err := localConnections.getChainBlock(job.BlockNumber)
if err != nil {
job.Results <- Result{
ErrorOutput: fmt.Sprintf("Unable to get block: %d from chain, err %v", job.BlockNumber, err),
ErrorSource: "blockchain",
Number: job.BlockNumber,
}
continue
}
err = localConnections.writeDatabaseBlockTxs(blockchain, block, chainTxs, td) td := localConnections.Chain.GetTd(block.Hash(), block.NumberU64())
if err != nil {
fmt.Printf("Error occurred due saving block %d with transactions in database: %v", bn, err) chainTxs := localConnections.getChainTxs(block.Hash(), block.NumberU64())
err = localConnections.writeDatabaseBlockTxs(blockchain, block, chainTxs, td)
if err != nil {
job.Results <- Result{
ErrorOutput: fmt.Sprintf("Unable to write block %d with txs in database, err %v", job.BlockNumber, err),
ErrorSource: "database",
Number: job.BlockNumber,
}
continue
}
job.Results <- Result{
Output: fmt.Sprintf("Processed block number: %d", job.BlockNumber),
}
}
doneCh <- struct{}{}
}()
}
// Await completion
go func() {
for i := 0; i < workers; i++ {
<-doneCh
} }
close(resultCh)
}()
fmt.Printf("Processed block number: %d\r", bn) for result := range resultCh {
if result.ErrorOutput != "" {
fmt.Println(result.ErrorOutput)
}
if result.Output != "" {
fmt.Println(result.Output)
}
if result.Output != "" && result.ErrorOutput != "" {
fmt.Printf("Unprocessable result with error: %s and output: %s", result.ErrorOutput, result.Output)
}
} }
return nil return nil

Wyświetl plik

@ -105,6 +105,10 @@ func processAddCommand(ctx *cli.Context) error {
if blockchain != "ethereum" && blockchain != "polygon" { if blockchain != "ethereum" && blockchain != "polygon" {
return fmt.Errorf("Unsupported blockchain provided") return fmt.Errorf("Unsupported blockchain provided")
} }
threads := ctx.GlobalInt(ThreadsFlag.Name)
if threads <= 0 {
threads = 1
}
start, end, err := startEndBlock(ctx) start, end, err := startEndBlock(ctx)
if err != nil { if err != nil {
@ -124,7 +128,7 @@ func processAddCommand(ctx *cli.Context) error {
} }
for blocks := range BlockYield(start, end, BlockNumberStep) { for blocks := range BlockYield(start, end, BlockNumberStep) {
err = add(blockchain, blocks) err = add(blockchain, blocks, threads)
if err != nil { if err != nil {
return fmt.Errorf("Error occurred due add acction: %v", err) return fmt.Errorf("Error occurred due add acction: %v", err)
} }
@ -239,6 +243,7 @@ func LDBCLI() {
BlockchainFlag, BlockchainFlag,
DataDirFlag, DataDirFlag,
GCModeFlag, GCModeFlag,
ThreadsFlag,
}, },
}, },
{ {

Wyświetl plik

@ -227,6 +227,12 @@ func prepareTxsQuery(blockchain string, block *types.Block, txs []*types.Transac
) )
for i, tx := range txs { for i, tx := range txs {
var maxFeePerGas interface{}
maxFeePerGas = "NULL"
var maxPriorityFeePerGas interface{}
maxPriorityFeePerGas = "NULL"
m, err := tx.AsMessage(signer, block.Number()) m, err := tx.AsMessage(signer, block.Number())
if err != nil { if err != nil {
return "", fmt.Errorf("Transaction to message transformation failed: %v", err) return "", fmt.Errorf("Transaction to message transformation failed: %v", err)
@ -235,15 +241,15 @@ func prepareTxsQuery(blockchain string, block *types.Block, txs []*types.Transac
txsQuery += "," txsQuery += ","
} }
txsQuery += fmt.Sprintf( txsQuery += fmt.Sprintf(
`('%s', %d, '%s', '%s', %d, %d, %d, %d, '0x%x', '0x%x', %d, %d, %d)`, `('%s', %d, '%s', '%s', %d, %d, %v, %v, '0x%x', %d, %d, %d, %d)`,
tx.Hash(), tx.Hash(),
block.Number(), block.Number(),
m.From(), m.From(),
tx.To(), tx.To(),
tx.Gas(), tx.Gas(),
tx.GasPrice(), tx.GasPrice(),
0, //"max_fee", maxFeePerGas,
0, //"max_prior", maxPriorityFeePerGas,
tx.Data(), tx.Data(),
tx.Nonce(), tx.Nonce(),
i, i,
@ -274,15 +280,18 @@ func (lc *LocalConnections) writeDatabaseBlockTxs(
genesisHash := common.HexToHash("0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3") genesisHash := common.HexToHash("0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3")
chainConfig := rawdb.ReadChainConfig(lc.ChainDB, genesisHash) chainConfig := rawdb.ReadChainConfig(lc.ChainDB, genesisHash)
signer := types.MakeSigner(chainConfig, block.Number()) signer := types.MakeSigner(chainConfig, block.Number())
txsQuery, err := prepareTxsQuery(blockchain, block, txs, signer)
if err != nil { if len(txs) > 0 {
dbTx.Rollback() txsQuery, err := prepareTxsQuery(blockchain, block, txs, signer)
return err if err != nil {
} dbTx.Rollback()
_, err = dbTx.Exec(txsQuery) return err
if err != nil { }
dbTx.Rollback() _, err = dbTx.Exec(txsQuery)
return fmt.Errorf("An error occurred during sql operation: %v", err) if err != nil {
dbTx.Rollback()
return fmt.Errorf("An error occurred during sql operation: %v", err)
}
} }
err = dbTx.Commit() err = dbTx.Commit()