CLI client token arg for mooncrawl

pull/539/head
kompotkot 2022-01-24 14:00:49 +00:00
rodzic d62506d175
commit c8b458bf8b
8 zmienionych plików z 73 dodań i 19 usunięć

Wyświetl plik

@ -25,6 +25,7 @@ from .settings import (
MOONSTREAM_CRAWL_WORKERS,
MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI,
MOONSTREAM_POLYGON_WEB3_PROVIDER_URI,
MOONSTREAM_CLIENT_ID_HEADER,
)
logger = logging.getLogger(__name__)
@ -37,9 +38,17 @@ class BlockCrawlError(Exception):
"""
def connect(blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] = None):
def connect(
blockchain_type: AvailableBlockchainType,
web3_uri: Optional[str] = None,
token: Optional[str] = None,
):
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
request_kwargs: Any = None
if token is not None:
request_kwargs = {"headers": {MOONSTREAM_CLIENT_ID_HEADER: token}}
if web3_uri is None:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI
@ -49,7 +58,10 @@ def connect(blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] =
raise Exception("Wrong blockchain type provided for web3 URI")
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
web3_provider = Web3.HTTPProvider(web3_uri)
web3_provider = Web3.HTTPProvider(
endpoint_uri=web3_uri,
request_kwargs=request_kwargs,
)
else:
web3_provider = Web3.IPCProvider(web3_uri)
web3_client = Web3(web3_provider)
@ -181,15 +193,18 @@ def add_block_transactions(
def get_latest_blocks(
blockchain_type: AvailableBlockchainType, confirmations: int = 0
blockchain_type: AvailableBlockchainType,
confirmations: int = 0,
token: Optional[str] = None,
) -> Tuple[Optional[int], int]:
"""
Retrieve the latest block from the connected node (connection is created by the connect(AvailableBlockchainType) method).
Retrieve the latest block from the connected node (connection is created by the
connect(AvailableBlockchainType, ClientTokenID) method).
If confirmations > 0, and the latest block on the node has block number N, this returns the block
with block_number (N - confirmations)
"""
web3_client = connect(blockchain_type)
web3_client = connect(blockchain_type, token=token)
latest_block_number: int = web3_client.eth.block_number
if confirmations > 0:
latest_block_number -= confirmations
@ -212,11 +227,12 @@ def crawl_blocks(
blockchain_type: AvailableBlockchainType,
blocks_numbers: List[int],
with_transactions: bool = False,
token: Optional[str] = None,
) -> None:
"""
Open database and geth sessions and fetch block data from blockchain.
"""
web3_client = connect(blockchain_type)
web3_client = connect(blockchain_type, token=token)
with yield_db_session_ctx() as db_session:
pbar = tqdm(total=len(blocks_numbers))
for block_number in blocks_numbers:
@ -256,6 +272,7 @@ def check_missing_blocks(
blockchain_type: AvailableBlockchainType,
blocks_numbers: List[int],
notransactions=False,
token: Optional[str] = None,
) -> List[int]:
"""
Query block from postgres. If block does not presented in database,
@ -294,7 +311,7 @@ def check_missing_blocks(
[block[0], block[1]] for block in blocks_exist_raw_query.all()
]
web3_client = connect(blockchain_type)
web3_client = connect(blockchain_type, token=token)
blocks_exist_len = len(blocks_exist)
pbar = tqdm(total=blocks_exist_len)
@ -336,6 +353,7 @@ def crawl_blocks_executor(
block_numbers_list: List[int],
with_transactions: bool = False,
num_processes: int = MOONSTREAM_CRAWL_WORKERS,
token: Optional[str] = None,
) -> None:
"""
Execute crawler in processes.
@ -363,14 +381,16 @@ def crawl_blocks_executor(
results: List[Future] = []
if num_processes == 1:
logger.warning("Executing block crawler in lazy mod")
return crawl_blocks(blockchain_type, block_numbers_list, with_transactions)
return crawl_blocks(
blockchain_type, block_numbers_list, with_transactions, token
)
else:
with ThreadPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor:
for worker in worker_indices:
block_chunk = worker_job_lists[worker]
logger.info(f"Spawned process for {len(block_chunk)} blocks")
result = executor.submit(
crawl_blocks, blockchain_type, block_chunk, with_transactions
crawl_blocks, blockchain_type, block_chunk, with_transactions, token
)
result.add_done_callback(record_error)
results.append(result)

Wyświetl plik

@ -117,7 +117,7 @@ def run_crawler_desc(
def handle_parser(args: argparse.Namespace):
with yield_db_session_ctx() as session:
w3 = connect(AvailableBlockchainType.ETHEREUM)
w3 = connect(AvailableBlockchainType.ETHEREUM, token=args.token)
if args.order == "asc":
run_crawler_asc(
w3=w3,
@ -153,6 +153,11 @@ def generate_parser():
"""
parser = argparse.ArgumentParser(description="Moonstream Deployment Crawler")
parser.add_argument(
"--token",
type=str,
help="Client token ID",
)
parser.add_argument(
"--start", "-s", type=int, default=None, help="block to start crawling from"
)

Wyświetl plik

@ -93,7 +93,7 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None:
"""
while True:
latest_stored_block_number, latest_block_number = get_latest_blocks(
AvailableBlockchainType(args.blockchain), args.confirmations
AvailableBlockchainType(args.blockchain), args.confirmations, args.token
)
if latest_stored_block_number is None:
latest_stored_block_number = 0
@ -137,6 +137,7 @@ def crawler_blocks_sync_handler(args: argparse.Namespace) -> None:
block_numbers_list=blocks_numbers_list,
with_transactions=True,
num_processes=args.jobs,
token=args.token,
)
logger.info(
f"Synchronized blocks from {latest_stored_block_number} to {latest_block_number}"
@ -155,6 +156,8 @@ def crawler_blocks_add_handler(args: argparse.Namespace) -> None:
blockchain_type=AvailableBlockchainType(args.blockchain),
block_numbers_list=blocks_numbers_list,
with_transactions=True,
num_processes=MOONSTREAM_CRAWL_WORKERS,
token=args.token,
)
logger.info(
@ -177,7 +180,7 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None:
confirmations = 150
shift = 2000
_, latest_block_number = get_latest_blocks(
AvailableBlockchainType(args.blockchain), confirmations
AvailableBlockchainType(args.blockchain), confirmations, args.token
)
block_range = f"{latest_block_number-shift}-{latest_block_number}"
@ -206,6 +209,7 @@ def crawler_blocks_missing_handler(args: argparse.Namespace) -> None:
block_numbers_list=missing_blocks_numbers_total,
with_transactions=True,
num_processes=1 if args.lazy else MOONSTREAM_CRAWL_WORKERS,
token=args.token,
)
logger.info(
f"Required {time.time() - startTime} with {MOONSTREAM_CRAWL_WORKERS} workers "
@ -246,6 +250,12 @@ def main() -> None:
time_now = datetime.now(timezone.utc)
parser.add_argument(
"--token",
type=str,
help="Client token ID",
)
# Blockchain blocks parser
parser_crawler_blocks = subcommands.add_parser(
"blocks", description="Blockchain blocks commands"

Wyświetl plik

@ -54,7 +54,9 @@ def handle_crawl(args: argparse.Namespace) -> None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = _retry_connect_web3(blockchain_type)
web3 = _retry_connect_web3(
blockchain_type=blockchain_type, token=args.token
)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(

Wyświetl plik

@ -85,6 +85,7 @@ def _retry_connect_web3(
blockchain_type: AvailableBlockchainType,
retry_count: int = 10,
sleep_time: float = 5,
token: Optional[str] = None,
) -> Web3:
"""
Retry connecting to the blockchain.
@ -92,7 +93,7 @@ def _retry_connect_web3(
while retry_count > 0:
retry_count -= 1
try:
web3 = connect(blockchain_type)
web3 = connect(blockchain_type, token=token)
web3.eth.block_number
logger.info(f"Connected to {blockchain_type}")
return web3

Wyświetl plik

@ -55,7 +55,9 @@ def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3:
raise ValueError(
"Could not find Web3 connection information in arguments or in MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI environment variable"
)
return connect(AvailableBlockchainType.ETHEREUM, web3_connection_string)
return connect(
AvailableBlockchainType.ETHEREUM, web3_connection_string, token=args.token
)
def get_latest_block_from_node(web3_client: Web3):
@ -258,6 +260,12 @@ def main() -> None:
parser.set_defaults(func=lambda _: parser.print_help())
subcommands = parser.add_subparsers(description="Subcommands")
parser.add_argument(
"--token",
type=str,
help="Client token ID",
)
parser_ethereum = subcommands.add_parser(
"ethereum",
description="Collect information about NFTs from Ethereum blockchains",

Wyświetl plik

@ -26,10 +26,13 @@ ORIGINS = RAW_ORIGINS.split(",")
# OpenAPI
DOCS_TARGET_PATH = "docs"
# Crawler label
CRAWLER_LABEL = "moonworm-alpha"
MOONSTREAM_CLIENT_ID_HEADER = os.environ.get(
"MOONSTREAM_CLIENT_ID_HEADER", "x-moonstream-client-id"
)
# Geth connection address
MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI = os.environ.get(
"MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI", ""

Wyświetl plik

@ -8,7 +8,7 @@ import logging
import time
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List
from typing import Any, Callable, Dict, List, Optional
from uuid import UUID
import boto3 # type: ignore
@ -337,7 +337,9 @@ def generate_list_of_names(
def process_external(
abi_external_calls: List[Dict[str, Any]], blockchain: AvailableBlockchainType
abi_external_calls: List[Dict[str, Any]],
blockchain: AvailableBlockchainType,
token: Optional[str] = None,
):
"""
Request all required external data
@ -383,7 +385,7 @@ def process_external(
logger.error(f"Error processing external call: {e}")
if external_calls:
web3_client = connect(blockchain)
web3_client = connect(blockchain, token=token)
for extcall in external_calls:
try:
@ -434,6 +436,7 @@ def generate_web3_metrics(
address: str,
crawler_label: str,
abi_json: Any,
token: Optional[str] = None,
) -> List[Any]:
"""
Generate stats for cards components
@ -446,6 +449,7 @@ def generate_web3_metrics(
extention_data = process_external(
abi_external_calls=abi_external_calls,
blockchain=blockchain_type,
token=token,
)
extention_data.append(
@ -611,6 +615,7 @@ def stats_generate_handler(args: argparse.Namespace):
address=address,
crawler_label=crawler_label,
abi_json=abi_json,
token=args.token,
)
# Generate blocks state information