Migrate state-crawler to moonworm.

pull/1065/head
Andrey 2024-05-10 23:36:38 +03:00
rodzic e2fb5b10e5
commit 577df99a74
4 zmienionych plików z 224 dodań i 10 usunięć

Wyświetl plik

@ -5,11 +5,11 @@ import time
import uuid
from collections import OrderedDict
from datetime import datetime
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union, List
import boto3 # type: ignore
import requests # type: ignore
from bugout.data import BugoutResources
from bugout.data import BugoutResources, BugoutSearchResult
from bugout.exceptions import BugoutResponseException
from moonstream.client import ( # type: ignore
ENDPOINT_QUERIES,
@ -170,3 +170,37 @@ def recive_S3_data_from_query(
logger.info("Too many retries")
break
return data_response.json()
def get_all_entries_from_search(
journal_id: str, search_query: str, limit: int, token: str, content: bool = False
) -> List[BugoutSearchResult]:
"""
Get all required entries from journal using search interface
"""
offset = 0
results: List[BugoutSearchResult] = []
existing_methods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
)
results.extend(existing_methods.results) # type: ignore
if len(results) != existing_methods.total_results:
for offset in range(limit, existing_methods.total_results, limit):
existing_methods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
content=content,
timeout=10.0,
limit=limit,
offset=offset,
)
results.extend(existing_methods.results) # type: ignore
return results

Wyświetl plik

@ -58,3 +58,12 @@ class TokenURIs(BaseModel):
block_number: str
block_timestamp: str
address: str
class ViewTasks(BaseModel):
type: str
stateMutability: str
inputs: Any
name: str
outputs: List[Dict[str, Any]]
address: str

Wyświetl plik

@ -393,3 +393,14 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60
# state crawler
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID = os.environ.get(
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID", ""
)
if MOONSTREAM_STATE_CRAWLER_JOURNAL_ID == "":
raise ValueError(
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set"
)

Wyświetl plik

@ -12,14 +12,20 @@ from uuid import UUID
from moonstream.client import Moonstream # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
from web3.middleware import geth_poa_middleware
from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3
from ..actions import recive_S3_data_from_query
from ..actions import recive_S3_data_from_query, get_all_entries_from_search
from ..blockchain import connect
from ..data import ViewTasks
from ..db import PrePing_SessionLocal
from ..settings import INFURA_PROJECT_ID, infura_networks, multicall_contracts
from ..settings import (
bugout_client as bc,
INFURA_PROJECT_ID,
infura_networks,
multicall_contracts,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
)
from .db import clean_labels, commit_session, view_call_to_label
from .Multicall2_interface import Contract as Multicall2
from .web3_util import FunctionSignature
@ -509,11 +515,49 @@ def handle_crawl(args: argparse.Namespace) -> None:
Read all view methods of the contracts and crawl
"""
with open(args.jobs_file, "r") as f:
jobs = json.load(f)
blockchain_type = AvailableBlockchainType(args.blockchain)
if args.jobs_file is not None:
with open(args.jobs_file, "r") as f:
jobs = json.load(f)
else:
jobs = []
# Bugout
query = f"#state_job #blockchain:{blockchain_type.value}"
print(f"Query: {query}")
existing_jobs = get_all_entries_from_search(
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
search_query=query,
limit=1000,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
content=True,
)
if len(existing_jobs) == 0:
logger.info("No jobs found in the journal")
return
for job in existing_jobs:
try:
if job.content is None:
logger.error(f"Job content is None for entry {job.entry_url}")
continue
### parse json
job_content = json.loads(job.content)
### validate via ViewTasks
ViewTasks(**job_content)
jobs.append(job_content)
except Exception as e:
logger.error(f"Job validation of entry {job.entry_url} failed: {e}")
continue
custom_web3_provider = args.web3_uri
if args.infura and INFURA_PROJECT_ID is not None:
@ -573,6 +617,100 @@ def clean_labels_handler(args: argparse.Namespace) -> None:
db_session.close()
def migrate_state_tasks_handler(args: argparse.Namespace) -> None:
### Get all tasks from files
with open(args.jobs_file, "r") as f:
jobs = json.load(f)
# file example jobs/ethereum-jobs.json
blockchain_type = AvailableBlockchainType(args.blockchain)
migrated_blockchain = blockchain_type.value
### Get all tasks from the journal
query = f"#state_job #{migrated_blockchain}"
existing_jobs = get_all_entries_from_search(
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
search_query=query,
limit=1000,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
content=True,
)
existing_state_tasks_list = []
print(f"Existing jobs: {len(existing_jobs)}")
print(f"New jobs: {jobs}")
### validate existing jobs
for job in existing_jobs:
try:
if job.content is None:
logger.error(f"Job content is None for entry {job.entry_url}")
continue
### parse json
job_content = json.loads(job.content)
### validate via ViewTasks
ViewTasks(**job_content)
except Exception as e:
logger.error(f"Job validation of entry {job.entry_url} failed: {e}")
continue
### from tags get blockchain, name and address
for tag in job.tags:
if tag.startswith("blockchain"):
blockchain = tag.split(":")[1]
if tag.startswith("name"):
name = tag.split(":")[1]
if tag.startswith("address"):
address = tag.split(":")[1]
existing_state_tasks_list.append(f"{blockchain}:{name}:{address}")
### Get all tasks from files
for job in jobs:
name = job["name"]
address = job["address"]
### Deduplicate tasks
if f"{migrated_blockchain}:{name}:{address}" not in existing_state_tasks_list:
### create new task
json_str = json.dumps(job, indent=4)
### add tabs to json string for better readability
json_str_with_tabs = "\n".join(
"\t" + line for line in json_str.splitlines()
)
try:
bc.create_entry(
title=f"{name}:{address}",
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_STATE_CRAWLER_JOURNAL_ID,
content=json_str_with_tabs,
tags=[
"state_job",
f"blockchain:{migrated_blockchain}",
f"name:{name}",
f"address:{address}",
],
)
except Exception as e:
logger.error(f"Error creating entry: {e}")
continue
def main() -> None:
parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda _: parser.print_help())
@ -615,7 +753,7 @@ def main() -> None:
"-j",
type=str,
help="Path to json file with jobs",
required=True,
required=False,
)
view_state_crawler_parser.add_argument(
"--batch-size",
@ -626,6 +764,28 @@ def main() -> None:
)
view_state_crawler_parser.set_defaults(func=handle_crawl)
view_state_migration_parser = subparsers.add_parser(
"migrate-jobs",
help="Migrate jobs from one files to bugout",
)
view_state_migration_parser.add_argument(
"--jobs-file",
"-j",
type=str,
help="Path to json file with jobs",
required=True,
)
view_state_migration_parser.add_argument(
"--blockchain",
"-b",
type=str,
help="Type of blovkchain wich writng in database",
required=True,
)
view_state_migration_parser.set_defaults(func=migrate_state_tasks_handler)
view_state_cleaner = subparsers.add_parser(
"clean-state-labels",
help="Clean labels from database",