Merge branch 'main' into starknet-db

pull/936/head
kompotkot 2023-10-24 14:18:55 +00:00
commit a460813296
6 zmienionych plików z 145 dodań i 118 usunięć

Wyświetl plik

@ -158,12 +158,13 @@ def _autoscale_crawl_events(
all_events = []
for job in jobs:
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3,
job.event_abi,
from_block,
to_block,
batch_size,
job.contracts[0],
web3=web3,
event_abi=job.event_abi,
from_block=from_block,
to_block=to_block,
batch_size=batch_size,
contract_address=job.contracts[0],
max_blocks_batch=3000,
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(

Wyświetl plik

@ -14,7 +14,7 @@ from bugout.data import (
BugoutSearchResult,
)
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Path, Request
from fastapi import APIRouter, Body, Path, Request, Query
from moonstreamdb.blockchain import AvailableBlockchainType
from sqlalchemy import text
@ -36,6 +36,7 @@ from ..settings import (
MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE,
MOONSTREAM_S3_QUERIES_BUCKET,
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
from ..settings import bugout_client as bc
@ -48,6 +49,10 @@ router = APIRouter(
@router.get("/list", tags=["queries"])
async def get_list_of_queries_handler(request: Request) -> List[Dict[str, Any]]:
"""
Return list of queries which user own
"""
token = request.state.token
# Check already existed queries
@ -73,7 +78,7 @@ async def create_query_handler(
request: Request, query_applied: data.PreapprovedQuery = Body(...)
) -> BugoutJournalEntry:
"""
Create query in bugout journal
Create query in bugout journal with preapprove status required approval from moonstream team
"""
token = request.state.token
@ -117,6 +122,7 @@ async def create_query_handler(
title=f"Query:{query_name}",
tags=["type:query"],
content=query_applied.query,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS * 2,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
@ -161,10 +167,15 @@ async def create_query_handler(
@router.get("/templates", tags=["queries"])
def get_suggested_queries(
supported_interfaces: Optional[List[str]] = None,
address: Optional[str] = None,
title: Optional[str] = None,
limit: int = 10,
supported_interfaces: Optional[List[str]] = Query(
None, description="Supported interfaces in format: d73f4e3a erc1155"
),
address: Optional[str] = Query(
None,
description="Query address for search if template applied to particular address",
),
title: Optional[str] = Query(None, description="Query title for search"),
limit: int = Query(10),
) -> data.SuggestedQueriesResponse:
"""
Return set of suggested queries for user
@ -191,9 +202,10 @@ def get_suggested_queries(
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=query,
limit=limit,
timeout=5,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except BugoutResponseException as e:
logger.error(f"Error in get suggested queries templates: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
@ -222,7 +234,7 @@ def get_suggested_queries(
@router.get("/{query_name}/query", tags=["queries"])
async def get_query_handler(
request: Request, query_name: str
request: Request, query_name: str = Path(..., description="Query name")
) -> data.QueryInfoResponse:
token = request.state.token
@ -248,7 +260,7 @@ async def get_query_handler(
limit=1,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
logger.error(f"Error in search template: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
@ -263,12 +275,10 @@ async def get_query_handler(
)
try:
entries = bc.search(
entry = bc.get_entry(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
limit=1,
timeout=5,
entry_id=query_id,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
@ -276,23 +286,23 @@ async def get_query_handler(
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0:
raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet."
)
else:
entries_results = cast(List[BugoutSearchResult], entries.results)
query_id = entries_results[0].entry_url.split("/")[-1]
entry = entries_results[0]
entries_results = cast(List[BugoutSearchResult], entries.results)
entry = entries_results[0]
content = entry.content
tags = entry.tags
created_at = entry.created_at
updated_at = entry.updated_at
if content is None:
raise MoonstreamHTTPException(
status_code=403, detail=f"Query is empty. Please update it."
)
try:
if entry.content is None:
raise MoonstreamHTTPException(
status_code=403, detail=f"Query is empty. Please update it."
)
query = text(entry.content)
query = text(content)
except Exception as e:
raise MoonstreamHTTPException(
status_code=500, internal_error=e, detail="Error in query parsing"
@ -301,8 +311,7 @@ async def get_query_handler(
query_parameters_names = list(query._bindparams.keys())
tags_dict = {
tag.split(":")[0]: (tag.split(":")[1] if ":" in tag else True)
for tag in entry.tags
tag.split(":")[0]: (tag.split(":")[1] if ":" in tag else True) for tag in tags
}
query_parameters: Dict[str, Any] = {}
@ -313,23 +322,21 @@ async def get_query_handler(
else:
query_parameters[param] = None
print(type(entry.created_at))
return data.QueryInfoResponse(
query=entry.content,
query=content,
query_id=str(query_id),
preapprove="preapprove" in tags_dict,
approved="approved" in tags_dict,
parameters=query_parameters,
created_at=entry.created_at, # type: ignore
updated_at=entry.updated_at, # type: ignore
created_at=created_at, # type: ignore
updated_at=updated_at, # type: ignore
)
@router.put("/{query_name}", tags=["queries"])
async def update_query_handler(
request: Request,
query_name: str,
query_name: str = Path(..., description="Query name"),
request_update: data.UpdateQueryRequest = Body(...),
) -> BugoutJournalEntryContent:
token = request.state.token
@ -367,9 +374,9 @@ async def update_query_handler(
)
async def update_query_data_handler(
request: Request,
query_name: str,
query_name: str = Path(..., description="Query name"),
request_update: data.UpdateDataRequest = Body(...),
) -> Optional[data.QueryPresignUrl]:
) -> data.QueryPresignUrl:
"""
Request update data on S3 bucket
"""
@ -407,7 +414,7 @@ async def update_query_data_handler(
limit=1,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
logger.error(f"Error in search template: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
@ -422,52 +429,55 @@ async def update_query_data_handler(
)
try:
entries = bc.search(
entry = bc.get_entry(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
limit=1,
timeout=5,
entry_id=query_id,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0:
### check tags
if "preapprove" in entry.tags or "approved" not in entry.tags:
raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet."
)
content = entry.content
tags = entry.tags
else:
entries_results = cast(List[BugoutSearchResult], entries.results)
query_id = entries_results[0].entry_url.split("/")[-1]
s3_response = None
entries_results = cast(List[BugoutSearchResult], entries.results)
if entries_results[0].content:
content = entries_results[0].content
tags = entries_results[0].tags
if content:
file_type = "json"
if "ext:csv" in tags:
file_type = "csv"
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
json={
"query": content,
"params": request_update.params,
"file_type": file_type,
"blockchain": request_update.blockchain
if request_update.blockchain
else None,
},
timeout=5,
)
try:
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
json={
"query": content,
"params": request_update.params,
"file_type": file_type,
"blockchain": request_update.blockchain
if request_update.blockchain
else None,
},
timeout=5,
)
except Exception as e:
logger.error(f"Error interaction with crawlers: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if responce.status_code != 200:
raise MoonstreamHTTPException(
@ -476,6 +486,10 @@ async def update_query_data_handler(
)
s3_response = data.QueryPresignUrl(**responce.json())
else:
raise MoonstreamHTTPException(
status_code=403, detail=f"Query is empty. Please update it."
)
return s3_response
@ -483,7 +497,7 @@ async def update_query_data_handler(
@router.post("/{query_name}", tags=["queries"])
async def get_access_link_handler(
request: Request,
query_name: str,
query_name: str = Path(..., description="Query name"),
request_update: data.UpdateDataRequest = Body(...),
) -> Optional[data.QueryPresignUrl]:
"""
@ -513,7 +527,7 @@ async def get_access_link_handler(
limit=1,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
logger.error(f"Error in search template: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
@ -528,12 +542,10 @@ async def get_access_link_handler(
)
try:
entries = bc.search(
entry = bc.get_entry(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"tag:approved tag:query_id:{query_id} !tag:preapprove",
limit=1,
timeout=5,
entry_id=query_id,
)
except BugoutResponseException as e:
logger.error(f"Error in get query: {str(e)}")
@ -541,38 +553,37 @@ async def get_access_link_handler(
except Exception as e:
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if len(entries.results) == 0:
raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet."
)
else:
entry = cast(BugoutJournalEntry, entries.results[0])
entries_results = cast(List[BugoutSearchResult], entries.results)
content = entry.content
tags = entry.tags
if not content:
raise MoonstreamHTTPException(
status_code=403, detail=f"Query is empty. Please update it."
)
try:
s3_response = None
passed_params = dict(request_update.params)
if entries_results[0].content:
passed_params = dict(request_update.params)
file_type = "json"
tags = entries_results[0].tags
if "ext:csv" in tags:
file_type = "csv"
file_type = "json"
params_hash = query_parameter_hash(passed_params)
if "ext:csv" in tags:
file_type = "csv"
bucket = MOONSTREAM_S3_QUERIES_BUCKET
key = f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/{params_hash}/data.{file_type}"
params_hash = query_parameter_hash(passed_params)
bucket = MOONSTREAM_S3_QUERIES_BUCKET
key = f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/{params_hash}/data.{file_type}"
stats_presigned_url = generate_s3_access_links(
method_name="get_object",
bucket=bucket,
key=key,
expiration=300000,
http_method="GET",
)
s3_response = data.QueryPresignUrl(url=stats_presigned_url)
stats_presigned_url = generate_s3_access_links(
method_name="get_object",
bucket=bucket,
key=key,
expiration=300000,
http_method="GET",
)
s3_response = data.QueryPresignUrl(url=stats_presigned_url)
except Exception as e:
logger.error(f"Error in get access link: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
@ -582,8 +593,7 @@ async def get_access_link_handler(
@router.delete("/{query_name}", tags=["queries"])
async def remove_query_handler(
request: Request,
query_name: str,
request: Request, query_name: str = Path(..., description="Query name")
) -> BugoutJournalEntry:
"""
Request delete query from journal

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream library and API version.
"""
MOONSTREAMAPI_VERSION = "0.3.0"
MOONSTREAMAPI_VERSION = "0.3.1"

Wyświetl plik

@ -38,25 +38,24 @@ var (
"web3_clientVersion": true,
// zksync methods
"zks_estimateFee": true,
"zks_estimateGasL1ToL2": true,
"zks_getAllAccountBalances": true,
"zks_getBlockDetails": true,
"zks_getBridgeContracts": true,
"zks_getBytecodeByHash": true,
"zks_getConfirmedTokens": true,
"zks_getL1BatchBlockRange": true,
"zks_getL1BatchDetails": true,
"zks_getL2ToL1LogProof": true,
"zks_getL2ToL1MsgProof": true,
"zks_getMainContract": true,
"zks_estimateFee": true,
"zks_estimateGasL1ToL2": true,
"zks_getAllAccountBalances": true,
"zks_getBlockDetails": true,
"zks_getBridgeContracts": true,
"zks_getBytecodeByHash": true,
"zks_getConfirmedTokens": true,
"zks_getL1BatchBlockRange": true,
"zks_getL1BatchDetails": true,
"zks_getL2ToL1LogProof": true,
"zks_getL2ToL1MsgProof": true,
"zks_getMainContract": true,
"zks_getRawBlockTransactions": true,
"zks_getTestnetPaymaster": true,
"zks_getTokenPrice": true,
"zks_getTransactionDetails": true,
"zks_L1BatchNumber": true,
"zks_L1ChainId": true,
"zks_getTestnetPaymaster": true,
"zks_getTokenPrice": true,
"zks_getTransactionDetails": true,
"zks_L1BatchNumber": true,
"zks_L1ChainId": true,
}
)
@ -64,7 +63,7 @@ type JSONRPCRequest struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params []interface{} `json:"params"`
ID uint64 `json:"id"`
ID interface{} `json:"id"` // According to the JSON-RPC specification, the id can be a string, number, or null
}
type BlockchainConfig struct {

Wyświetl plik

@ -390,6 +390,7 @@ func jsonrpcRequestParser(body []byte) ([]JSONRPCRequest, error) {
var jsonrpcRequest []JSONRPCRequest
firstByte := bytes.TrimLeft(body, " \t\r\n")
switch {
case len(firstByte) > 0 && firstByte[0] == '[':
err := json.Unmarshal(body, &jsonrpcRequest)
@ -407,6 +408,17 @@ func jsonrpcRequestParser(body []byte) ([]JSONRPCRequest, error) {
return nil, fmt.Errorf("incorrect first byte in JSON RPC request")
}
for _, req := range jsonrpcRequest {
switch v := req.ID.(type) {
case float64:
req.ID = uint64(v)
case string:
case nil:
default:
return nil, fmt.Errorf("unexpected type for id: %T", v)
}
}
return jsonrpcRequest, nil
}

Wyświetl plik

@ -201,6 +201,11 @@ func Server() {
r.URL.RawQuery = ""
r.Header.Del(strings.Title(NB_ACCESS_ID_HEADER))
r.Header.Del(strings.Title(NB_DATA_SOURCE_HEADER))
r.URL.Scheme = endpoint.Scheme
r.URL.Host = endpoint.Host
r.URL.Path = endpoint.Path
// Change r.Host from nodebalancer's to end host so TLS check will be passed
r.Host = r.URL.Host
}