Fix merging duplicates.

pull/751/head
Andrey 2023-04-25 17:50:46 +03:00
rodzic 447763ca85
commit ad06bf947f
1 zmienionych plików z 140 dodań i 145 usunięć

Wyświetl plik

@ -731,7 +731,7 @@ def stats_generate_handler(args: argparse.Namespace):
abi_json = {}
else:
abi_json = abi
abi_json = json.loads(abi)
methods = generate_list_of_names(
type="function",
@ -841,174 +841,169 @@ def stats_generate_handler(args: argparse.Namespace):
merged_functions[address]["merged"].add(method)
except Exception as e:
traceback.print_exc()
logger.error(f"Error while merging subscriptions: {e}")
# Request contracts for external calls.
# result is a {call_hash: value} dictionary.
# Request contracts for external calls.
# result is a {call_hash: value} dictionary.
external_calls_results = process_external_merged(
external_calls=merged_external_calls["merged"],
blockchain=blockchain_type,
access_id=args.access_id,
external_calls_results = process_external_merged(
external_calls=merged_external_calls["merged"],
blockchain=blockchain_type,
access_id=args.access_id,
)
for address in address_dashboard_id_subscription_id_tree.keys():
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
for address in address_dashboard_id_subscription_id_tree.keys():
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
s3_data_object_for_contract: Dict[str, Any] = {}
s3_data_object_for_contract: Dict[str, Any] = {}
crawler_label = CRAWLER_LABEL
crawler_label = CRAWLER_LABEL
for timescale in [timescale.value for timescale in TimeScale]:
try:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
for timescale in [timescale.value for timescale in TimeScale]:
try:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)
logger.info(f"Timescale: {timescale}")
logger.info(f"Timescale: {timescale}")
# Write state of blocks in database
s3_data_object_for_contract["blocks_state"] = current_blocks_state
# Write state of blocks in database
s3_data_object_for_contract[
"blocks_state"
] = current_blocks_state
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object_for_contract["generic"] = {}
# TODO(Andrey): Remove after https://github.com/bugout-dev/moonstream/issues/524
s3_data_object_for_contract["generic"] = {}
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_functions[address]["merged"],
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object_for_contract["methods"] = functions_calls_data
# Generate functions call timeseries
functions_calls_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_functions[address]["merged"],
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object_for_contract["methods"] = functions_calls_data
# Generte events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_events[address]["merged"],
start=start_date,
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object_for_contract["events"] = events_data
# Generte events timeseries
events_data = generate_data(
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
timescale=timescale,
functions=merged_events[address]["merged"],
start=start_date,
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object_for_contract["events"] = events_data
for dashboard_id in address_dashboard_id_subscription_id_tree[
address
]: # Dashboards loop for address
for (
subscription_id
) in address_dashboard_id_subscription_id_tree[address][
dashboard_id
]:
try:
extention_data = []
for dashboard_id in address_dashboard_id_subscription_id_tree[
address
]: # Dashboards loop for address
for (
subscription_id
) in address_dashboard_id_subscription_id_tree[address][
dashboard_id
]:
try:
extention_data = []
s3_subscription_data_object: Dict[str, Any] = {}
s3_subscription_data_object: Dict[str, Any] = {}
s3_subscription_data_object[
"blocks_state"
] = s3_data_object_for_contract["blocks_state"]
s3_subscription_data_object[
"blocks_state"
] = s3_data_object_for_contract["blocks_state"]
if dashboard_id in merged_external_calls:
for (
external_call_hash,
display_name,
) in merged_external_calls[dashboard_id][
subscription_id
].items():
if (
external_call_hash
in external_calls_results
):
extention_data.append(
{
"display_name": display_name,
"value": external_calls_results[
external_call_hash
],
}
)
s3_subscription_data_object[
"web3_metric"
] = extention_data
# list of user defined events
events_list = merged_events[address][dashboard_id][
if dashboard_id in merged_external_calls:
for (
external_call_hash,
display_name,
) in merged_external_calls[dashboard_id][
subscription_id
]
].items():
if external_call_hash in external_calls_results:
extention_data.append(
{
"display_name": display_name,
"value": external_calls_results[
external_call_hash
],
}
)
s3_subscription_data_object["events"] = {}
s3_subscription_data_object[
"web3_metric"
] = extention_data
for event in events_list:
if event in events_data:
s3_subscription_data_object["events"][
event
] = events_data[event]
# list of user defined events
# list of user defined functions
events_list = merged_events[address][dashboard_id][
subscription_id
]
functions_list = merged_functions[address][
dashboard_id
][subscription_id]
s3_subscription_data_object["events"] = {}
s3_subscription_data_object["methods"] = {}
for event in events_list:
if event in events_data:
s3_subscription_data_object["events"][
event
] = events_data[event]
for function in functions_list:
if function in functions_calls_data:
s3_subscription_data_object["methods"][
function
] = functions_calls_data[function]
# list of user defined functions
# Push data to S3 bucket
push_statistics(
statistics_data=s3_subscription_data_object,
subscription_type_id=subscription_id_by_blockchain[
args.blockchain
],
address=address,
timescale=timescale,
bucket=MOONSTREAM_S3_DASHBOARDS_DATA_BUCKET, # type: ignore
dashboard_id=dashboard_id,
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"subscriptions:{subscription_id}",
f"dashboard:{dashboard}",
],
)
logger.error(err)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"timescale:{timescale}",
f"data_generation_failed",
],
)
logger.error(err)
functions_list = merged_functions[address][
dashboard_id
][subscription_id]
s3_subscription_data_object["methods"] = {}
for function in functions_list:
if function in functions_calls_data:
s3_subscription_data_object["methods"][
function
] = functions_calls_data[function]
# Push data to S3 bucket
push_statistics(
statistics_data=s3_subscription_data_object,
subscription_type_id=subscription_id_by_blockchain[
args.blockchain
],
address=address,
timescale=timescale,
bucket=MOONSTREAM_S3_DASHBOARDS_DATA_BUCKET, # type: ignore
dashboard_id=dashboard_id,
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}"
f"subscriptions:{subscription_id}",
f"dashboard:{dashboard}",
],
)
logger.error(err)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"dashboard",
"statistics",
f"blockchain:{args.blockchain}" f"timescale:{timescale}",
f"data_generation_failed",
],
)
logger.error(err)
reporter.custom_report(
title=f"Dashboard stats generated.",