Add streams by timestemp.

pull/97/head
Andrey Dolgolev 2021-08-06 17:03:24 +03:00
rodzic 93c6fccf9b
commit 1a43528ae9
7 zmienionych plików z 262 dodań i 159 usunięć

Wyświetl plik

@ -0,0 +1,205 @@
from datetime import datetime
import logging
from typing import Dict, Any, List, Optional, Union
from moonstreamdb.models import (
EthereumBlock,
EthereumTransaction,
EthereumPendingTransaction,
)
from sqlalchemy import or_, and_, text
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import desc, false
from . import data
from .settings import DEFAULT_PAGE_SIZE
logger = logging.getLogger(__name__)
async def get_transaction_in_blocks(
db_session: Session,
query: str,
user_subscriptions_resources_by_address: Dict[str, Any],
start_time: Optional[int] = 0,
end_time: Optional[int] = 0,
) -> List[data.EthereumTransactionItem]:
subscriptions_addresses = list(user_subscriptions_resources_by_address.keys())
if start_time < 1438215988: # first block
start_time = False
if end_time < 1438215988: # first block
end_time = False
if query == "" or query == " ":
filters = [
or_(
EthereumTransaction.to_address == address,
EthereumTransaction.from_address == address,
)
for address in subscriptions_addresses
]
filters = or_(*filters)
else:
filters = database_search_query(
query, allowed_addresses=subscriptions_addresses
)
if not filters:
return [], None, None
filters = and_(*filters)
# Get start point
if start_time is False and end_time is False:
ethereum_transaction_start_point = (
db_session.query(
EthereumTransaction.hash,
EthereumTransaction.block_number,
EthereumTransaction.from_address,
EthereumTransaction.to_address,
EthereumTransaction.gas,
EthereumTransaction.gas_price,
EthereumTransaction.input,
EthereumTransaction.nonce,
EthereumTransaction.value,
EthereumBlock.timestamp.label("timestamp"),
)
.join(EthereumBlock)
.filter(filters)
.order_by(text("timestamp desc"))
.limit(1)
).one_or_none()
start_time = False
print(ethereum_transaction_start_point)
end_time = ethereum_transaction_start_point[-1]
ethereum_transactions = (
db_session.query(
EthereumTransaction.hash,
EthereumTransaction.block_number,
EthereumTransaction.from_address,
EthereumTransaction.to_address,
EthereumTransaction.gas,
EthereumTransaction.gas_price,
EthereumTransaction.input,
EthereumTransaction.nonce,
EthereumTransaction.value,
EthereumBlock.timestamp.label("timestamp"),
)
.join(EthereumBlock)
.filter(filters)
)
print(f"last record: {end_time}")
if start_time and end_time:
if start_time < end_time:
start_time, end_time = end_time, start_time
if start_time:
ethereum_transactions = ethereum_transactions.filter(
EthereumBlock.timestamp <= start_time
)
if end_time:
ethereum_transactions = ethereum_transactions.filter(
EthereumBlock.timestamp >= end_time
)
print(f"count: {ethereum_transactions.count()}")
response = []
for row_index, (
hash,
block_number,
from_address,
to_address,
gas,
gas_price,
input,
nonce,
value,
timestamp,
) in enumerate(ethereum_transactions):
subscription_type_id = None
from_label = None
to_label = None
color = None
if from_address in subscriptions_addresses:
from_label = user_subscriptions_resources_by_address[from_address]["label"]
subscription_type_id = user_subscriptions_resources_by_address[
from_address
]["subscription_type_id"]
color = user_subscriptions_resources_by_address[from_address]["color"]
if to_address in subscriptions_addresses:
subscription_type_id = user_subscriptions_resources_by_address[to_address][
"subscription_type_id"
]
to_label = user_subscriptions_resources_by_address[to_address]["label"]
color = user_subscriptions_resources_by_address[to_address]["color"]
response.append(
data.EthereumTransactionItem(
color=color,
from_label=from_label,
to_label=to_label,
block_number=block_number,
gas=gas,
gasPrice=gas_price,
value=value,
from_address=from_address,
to_address=to_address,
hash=hash,
input=input,
nonce=nonce,
timestamp=timestamp,
subscription_type_id=subscription_type_id,
)
)
return response, start_time, end_time
def database_search_query(q: str, allowed_addresses: List[str]):
filters = q.split("+")
constructed_filters = []
for filter_item in filters:
if filter_item == "":
logger.warning("Skipping empty filter item")
continue
# Try Google style search filters
components = filter_item.split(":")
if len(components) == 2:
filter_type = components[0]
filter_value = components[1]
else:
continue
if filter_type == "to" and filter_value:
constructed_filters.append(EthereumTransaction.to_address == filter_value)
if filter_type == "from" and filter_value:
if filter_value not in allowed_addresses:
continue
constructed_filters.append(EthereumTransaction.from_address == filter_value)
if filter_type == "address" and filter_value:
constructed_filters.append(
or_(
EthereumTransaction.to_address == filter_value,
EthereumTransaction.from_address == filter_value,
)
)
return constructed_filters

Wyświetl plik

@ -114,6 +114,7 @@ class EthereumTransactionItem(BaseModel):
color: Optional[str]
from_label: Optional[str] = None
to_label: Optional[str] = None
block_number: Optional[int] = None
gas: int
gasPrice: int
value: int
@ -128,6 +129,8 @@ class EthereumTransactionItem(BaseModel):
class EthereumTransactionResponse(BaseModel):
stream: List[EthereumTransactionItem]
start_time: int
end_time: int
class TxinfoEthereumBlockchainRequest(BaseModel):

Wyświetl plik

@ -5,28 +5,22 @@ import logging
from typing import Any, cast, Dict, List, Optional, Set, Union
from pydantic.utils import to_camel
from sqlalchemy.engine.base import Transaction
from datetime import datetime, timedelta
from bugout.data import BugoutResource, BugoutResources
from bugout.data import BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import Body, FastAPI, HTTPException, Request, Form, Query, Depends
from fastapi import FastAPI, HTTPException, Request, Form, Query, Depends
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb.models import (
EthereumBlock,
EthereumTransaction,
EthereumPendingTransaction,
ESDFunctionSignature,
ESDEventSignature,
)
from moonstreamdb import db
from sqlalchemy.orm import Session
from sqlalchemy import or_, and_
from .. import actions
from .. import data
from ..middleware import BroodAuthMiddleware
from ..settings import (
MOONSTREAM_APPLICATION_ID,
DEFAULT_PAGE_SIZE,
DOCS_TARGET_PATH,
ORIGINS,
DOCS_PATHS,
@ -67,9 +61,8 @@ app.add_middleware(BroodAuthMiddleware, whitelist=whitelist_paths)
async def search_transactions(
request: Request,
q: str = Query(""),
filters: Optional[List[str]] = Query(None),
limit: int = Query(10),
offset: int = Query(0),
start_time: Optional[int] = Query(0), # Optional[int] = Query(0), #
end_time: Optional[int] = Query(0), # Optional[int] = Query(0), #
db_session: Session = Depends(db.yield_db_session),
):
@ -88,136 +81,29 @@ async def search_transactions(
except Exception as e:
raise HTTPException(status_code=500)
subscriptions_addresses = [
resource.resource_data["address"]
for resource in user_subscriptions_resources.resources
]
if q == "" or q == " ":
filters = [
or_(
EthereumTransaction.to_address == address,
EthereumTransaction.from_address == address,
)
for address in subscriptions_addresses
]
filters = or_(*filters)
else:
filters = database_search_query(q, allowed_addresses=subscriptions_addresses)
if not filters:
return data.EthereumTransactionResponse(stream=[])
filters = and_(*filters)
address_to_subscriptions = {
resource.resource_data["address"]: resource.resource_data
for resource in user_subscriptions_resources.resources
}
ethereum_transactions = (
db_session.query(
EthereumTransaction.hash,
EthereumTransaction.block_number,
EthereumTransaction.from_address,
EthereumTransaction.to_address,
EthereumTransaction.gas,
EthereumTransaction.gas_price,
EthereumTransaction.input,
EthereumTransaction.nonce,
EthereumTransaction.value,
EthereumBlock.timestamp,
transactions: List[Any] = []
if address_to_subscriptions:
print("address_to_subscriptions")
(
transactions_in_blocks,
first_item_time,
last_item_time,
) = await actions.get_transaction_in_blocks(
db_session=db_session,
query=q,
user_subscriptions_resources_by_address=address_to_subscriptions,
start_time=start_time,
end_time=end_time,
)
.join(EthereumBlock)
.filter(filters)
.limit(25)
transactions.extend(transactions_in_blocks)
return data.EthereumTransactionResponse(
stream=transactions, start_time=first_item_time, end_time=last_item_time
)
response = []
for (
hash,
block_number,
from_address,
to_address,
gas,
gas_price,
input,
nonce,
value,
timestamp,
) in ethereum_transactions:
subscription_type_id = None
from_label = None
to_label = None
color = None
if from_address in subscriptions_addresses:
from_label = address_to_subscriptions[from_address]["label"]
subscription_type_id = address_to_subscriptions[from_address][
"subscription_type_id"
]
color = address_to_subscriptions[from_address]["color"]
if to_address in subscriptions_addresses:
subscription_type_id = address_to_subscriptions[to_address][
"subscription_type_id"
]
to_label = address_to_subscriptions[to_address]["label"]
color = address_to_subscriptions[to_address]["color"]
response.append(
data.EthereumTransactionItem(
color=color,
from_label=from_label,
to_label=to_label,
gas=gas,
gasPrice=gas_price,
value=value,
from_address=from_address,
to_address=to_address,
hash=hash,
input=input,
nonce=nonce,
timestamp=timestamp,
subscription_type_id="1",
)
)
return data.EthereumTransactionResponse(stream=response)
def database_search_query(q: str, allowed_addresses: List[str]):
filters = q.split("+")
constructed_filters = []
for filter_item in filters:
if filter_item == "":
logger.warning("Skipping empty filter item")
continue
# Try Google style search filters
components = filter_item.split(":")
if len(components) == 2:
filter_type = components[0]
filter_value = components[1]
else:
continue
if filter_type == "to" and filter_value:
constructed_filters.append(EthereumTransaction.to_address == filter_value)
if filter_type == "from" and filter_value:
if filter_value not in allowed_addresses:
continue
constructed_filters.append(EthereumTransaction.from_address == filter_value)
if filter_type == "address" and filter_value:
constructed_filters.append(
or_(
EthereumTransaction.to_address == filter_value,
EthereumTransaction.from_address == filter_value,
)
)
return constructed_filters

Wyświetl plik

@ -39,3 +39,5 @@ DOCS_PATHS = {}
for path in MOONSTREAM_OPENAPI_LIST:
DOCS_PATHS[f"/{path}/{DOCS_TARGET_PATH}"] = "GET"
DOCS_PATHS[f"/{path}/{DOCS_TARGET_PATH}/openapi.json"] = "GET"
DEFAULT_PAGE_SIZE = 10

Wyświetl plik

@ -42,7 +42,6 @@ const StreamEntry = ({ entry, filterCallback, filterConstants }) => {
};
const [showFullView] = useMediaQuery(["(min-width: 420px)"]);
console.log(entry);
return (
<Flex

Wyświetl plik

@ -1,6 +1,7 @@
import { useInfiniteQuery } from "react-query";
import { queryCacheProps } from "./hookCommon";
import { SubscriptionsService } from "../services";
import moment from "moment";
const useJournalEntries = ({
refreshRate,
@ -9,31 +10,29 @@ const useJournalEntries = ({
searchQuery,
enabled,
}) => {
const limit = pageSize ? pageSize : 25;
//const limit = pageSize ? pageSize : 25;
const getStream =
(searchTerm) =>
async ({ pageParam = 0 }) => {
if (!pageParam) {
pageParam = 0;
}
async ({ pageParam = { start_time: 0, end_time: 0 } }) => {
console.log("pageParam", pageParam);
const response = await SubscriptionsService.getStream({
searchTerm,
isContent,
limit,
offset: pageParam,
searchTerm: searchTerm,
start_time: pageParam.start_time,
end_time: pageParam.end_time,
});
const newEntryList = response.data.stream.map((entry) => ({
...entry,
}));
console.log("response.data", response.data);
return {
data: [...newEntryList],
pageParams: {
pageParam: pageParam + 1,
next_offset: response.data.next_offset,
total_results: response.data.total_results,
offset: response.data.offset,
start_time: response.data.start_time,
end_time: response.data.end_time,
},
};
};
@ -42,6 +41,9 @@ const useJournalEntries = ({
data: EntriesPages,
isFetchingMore,
isLoading,
fetchNextPage,
fetchPreviousPage,
hasNextPage,
canFetchMore,
fetchMore,
refetch,
@ -49,7 +51,14 @@ const useJournalEntries = ({
refetchInterval: refreshRate,
...queryCacheProps,
getNextPageParam: (lastGroup) => {
return lastGroup.next_offset === null ? false : lastGroup.next_offset;
console.log("lastGroup", lastGroup);
console.log("canFetchMore", canFetchMore);
console.log("fetchMore", fetchMore);
console.log("fetchNextPage", fetchNextPage);
console.log("fetchPreviousPage", fetchPreviousPage);
console.log("hasNextPage", hasNextPage);
return 1;
},
onSuccess: () => {},
enabled: !!enabled,

Wyświetl plik

@ -3,15 +3,14 @@ import { http } from "../utils";
const API = process.env.NEXT_PUBLIC_MOONSTREAM_API_URL;
export const getStream = ({ searchTerm, limit, offset, isContent }) =>
export const getStream = ({ searchTerm, start_time, end_time }) =>
http({
method: "GET",
url: `${API}/streams/`,
params: {
q: searchTerm,
limit: encodeURIComponent(limit),
offset: encodeURIComponent(offset),
content: encodeURIComponent(isContent),
start_time: encodeURIComponent(start_time),
end_time: encodeURIComponent(end_time),
},
});