kopia lustrzana https://github.com/bugout-dev/moonstream
Remove and test generation.
rodzic
0cc9a30b69
commit
e0b5b00eab
|
@ -104,163 +104,6 @@ def push_statistics(
|
|||
print(f"Statistics push to bucket: s3://{bucket}/{result_key}")
|
||||
|
||||
|
||||
def generate_metrics(
|
||||
db_session: Session,
|
||||
blockchain_type: AvailableBlockchainType,
|
||||
address: str,
|
||||
timescale: str,
|
||||
metrics: List[str],
|
||||
start: Any,
|
||||
):
|
||||
"""
|
||||
Generage metrics
|
||||
"""
|
||||
block_model = get_block_model(blockchain_type)
|
||||
transaction_model = get_transaction_model(blockchain_type)
|
||||
|
||||
start = start
|
||||
end = datetime.utcnow()
|
||||
|
||||
start_timestamp = int(start.timestamp())
|
||||
end_timestamp = int(end.timestamp())
|
||||
|
||||
results: Dict[str, Any] = {}
|
||||
|
||||
time_step = timescales_params[timescale]["timestep"]
|
||||
|
||||
time_format = timescales_params[timescale]["timeformat"]
|
||||
|
||||
def make_query(
|
||||
db_session: Session,
|
||||
identifying_column: Column,
|
||||
statistic_column: Column,
|
||||
aggregate_func: Callable,
|
||||
) -> Query:
|
||||
|
||||
unformated_time_series_subquery = db_session.query(
|
||||
func.generate_series(
|
||||
start,
|
||||
end,
|
||||
time_step,
|
||||
).label("timeseries_points")
|
||||
).subquery(name="unformated_time_series_subquery")
|
||||
|
||||
time_series_formated = db_session.query(
|
||||
func.to_char(
|
||||
unformated_time_series_subquery.c.timeseries_points, time_format
|
||||
).label("timeseries_points")
|
||||
)
|
||||
|
||||
time_series_formated_subquery = time_series_formated.subquery(
|
||||
name="time_series_subquery"
|
||||
)
|
||||
|
||||
metric_count_subquery = (
|
||||
db_session.query(
|
||||
aggregate_func(statistic_column).label("count"),
|
||||
func.to_char(
|
||||
func.to_timestamp(block_model.timestamp), time_format
|
||||
).label("timeseries_points"),
|
||||
)
|
||||
.join(
|
||||
block_model,
|
||||
transaction_model.block_number == block_model.block_number,
|
||||
)
|
||||
.filter(identifying_column == address)
|
||||
.filter(block_model.timestamp >= start_timestamp)
|
||||
.filter(block_model.timestamp <= end_timestamp)
|
||||
.group_by(text("timeseries_points"))
|
||||
).subquery(name="metric_counts")
|
||||
|
||||
metrics_time_series = (
|
||||
db_session.query(
|
||||
time_series_formated_subquery.c.timeseries_points.label(
|
||||
"timeseries_points"
|
||||
),
|
||||
func.coalesce(metric_count_subquery.c.count.label("count"), 0),
|
||||
)
|
||||
.join(
|
||||
metric_count_subquery,
|
||||
time_series_formated_subquery.c.timeseries_points
|
||||
== metric_count_subquery.c.timeseries_points,
|
||||
isouter=True,
|
||||
)
|
||||
.order_by(text("timeseries_points DESC"))
|
||||
)
|
||||
|
||||
response_metric: List[Any] = []
|
||||
|
||||
for created_date, count in metrics_time_series:
|
||||
|
||||
if not isinstance(count, int):
|
||||
count = int(count)
|
||||
response_metric.append({"date": created_date, "count": count})
|
||||
|
||||
return response_metric
|
||||
|
||||
try:
|
||||
|
||||
if "transactions_out" in metrics:
|
||||
start_time = time.time()
|
||||
results["transactions_out"] = make_query(
|
||||
db_session,
|
||||
transaction_model.from_address,
|
||||
transaction_model.hash,
|
||||
func.count,
|
||||
)
|
||||
|
||||
print("--- transactions_out %s seconds ---" % (time.time() - start_time))
|
||||
|
||||
if "transactions_in" in metrics:
|
||||
start_time = time.time()
|
||||
results["transactions_in"] = make_query(
|
||||
db_session,
|
||||
transaction_model.to_address,
|
||||
transaction_model.hash,
|
||||
func.count,
|
||||
)
|
||||
|
||||
print("--- transactions_in %s seconds ---" % (time.time() - start_time))
|
||||
|
||||
if "value_out" in metrics:
|
||||
start_time = time.time()
|
||||
results["value_out"] = make_query(
|
||||
db_session,
|
||||
transaction_model.from_address,
|
||||
transaction_model.value,
|
||||
func.sum,
|
||||
)
|
||||
print("--- value_out %s seconds ---" % (time.time() - start_time))
|
||||
|
||||
if "value_in" in metrics:
|
||||
start_time = time.time()
|
||||
results["value_in"] = make_query(
|
||||
db_session,
|
||||
transaction_model.to_address,
|
||||
transaction_model.value,
|
||||
func.sum,
|
||||
)
|
||||
|
||||
print("--- value_in %s seconds ---" % (time.time() - start_time))
|
||||
|
||||
except Exception as err:
|
||||
logger.error(err)
|
||||
db_session.rollback()
|
||||
reporter.error_report(
|
||||
err,
|
||||
[
|
||||
"dashboard",
|
||||
"metrics",
|
||||
"statistics",
|
||||
f"metrics:{','.join(metrics)}"
|
||||
f"blockchain:{blockchain_type}"
|
||||
f"address:{address}",
|
||||
],
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def generate_data(
|
||||
db_session: Session,
|
||||
blockchain_type: AvailableBlockchainType,
|
||||
|
@ -663,10 +506,6 @@ def stats_generate_handler(args: argparse.Namespace):
|
|||
):
|
||||
crawler_label = "moonworm"
|
||||
|
||||
generic = dashboard_subscription_filters["generic"]
|
||||
|
||||
generic_metrics_names = [item["name"] for item in generic]
|
||||
|
||||
if not subscription_by_id[subscription_id].resource_data["abi"]:
|
||||
|
||||
methods = []
|
||||
|
@ -802,14 +641,7 @@ def stats_generate_handler(args: argparse.Namespace):
|
|||
|
||||
s3_data_object["events"] = events_data
|
||||
|
||||
s3_data_object["generic"] = generate_metrics(
|
||||
db_session=db_session,
|
||||
blockchain_type=blockchain_type,
|
||||
address=address,
|
||||
timescale=timescale,
|
||||
metrics=generic_metrics_names,
|
||||
start=start_date,
|
||||
)
|
||||
s3_data_object["generic"] = {}
|
||||
|
||||
push_statistics(
|
||||
statistics_data=s3_data_object,
|
||||
|
|
Ładowanie…
Reference in New Issue