kopia lustrzana https://github.com/bugout-dev/moonstream
Working nodebalancer with authorization via resources
rodzic
7bd293d11e
commit
e3a08730cc
|
@ -1,16 +1,66 @@
|
|||
# Node Balancer application
|
||||
|
||||
## Installation
|
||||
# Installation
|
||||
|
||||
- Prepare environment variables
|
||||
- Build application
|
||||
- Prepare environment variables
|
||||
- Build application
|
||||
|
||||
```bash
|
||||
go build -o nodebalancer
|
||||
```
|
||||
|
||||
- Run with following parameters:
|
||||
# Work with nodebalancer
|
||||
|
||||
## clients
|
||||
|
||||
```bash
|
||||
nodebalancer -host 0.0.0.0 -port 8544 -healthcheck
|
||||
nodebalancer clients | jq .
|
||||
```
|
||||
|
||||
This command will return a list of bugout resources of registered users to access node balancer with their `crawlers/app/project` (in our project we will call it `crawlers`).
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"user_id": "<user_id_from_any_bugout_application>",
|
||||
"access_id": "<access_uuid_which_provided_with_query>",
|
||||
"name": "<short_description_of_purpose_of_this_crawler>",
|
||||
"description": "<long_description>",
|
||||
"blockchain_access": true,
|
||||
"extended_methods": false
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
`access_id` - token which allow access to nodebalancer, could be specified in both ways:
|
||||
|
||||
- as a header `x-moonstream-access-id` with value `access_id`
|
||||
- as query parameter `access_id=access_id`
|
||||
|
||||
`blockchain_access` - boolean which allow you or not to have access to blockchain node, otherwise you will be redirected to database
|
||||
|
||||
`extended_methods` - boolean which allow you to call not whitelisted method to blockchain node, by default for new user this is equal to `false`
|
||||
|
||||
## server
|
||||
|
||||
```bash
|
||||
nodebalancer server -host 0.0.0.0 -port 8544 -healthcheck
|
||||
```
|
||||
|
||||
Flag `--healthcheck` will execute background process to ping-pong available nodes to keep their status and current block number.
|
||||
Flag `--debug` will extend output of each request to server and healthchecks summary.
|
||||
|
||||
# Work with node
|
||||
|
||||
Common request to fetch block number
|
||||
|
||||
```bash
|
||||
curl --request GET 'http://127.0.0.1:8544/nb/ethereum/jsonrpc?access_id=<access_id>&data_source=<blockchain/database>' \
|
||||
--header 'Content-Type: application/json' \
|
||||
--data-raw '{
|
||||
"jsonrpc":"2.0",
|
||||
"method":"eth_getBlockByNumber",
|
||||
"params":["0xb71b64", false],
|
||||
"id":1
|
||||
}'
|
||||
```
|
||||
|
|
|
@ -9,7 +9,9 @@ import (
|
|||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||
|
@ -19,6 +21,38 @@ import (
|
|||
// for each blockchain we work during session.
|
||||
var blockchainPool BlockchainPool
|
||||
|
||||
// Node structure with
|
||||
// StatusURL for status server at node endpoint
|
||||
// GethURL for geth/bor/etc node http.server endpoint
|
||||
type Node struct {
|
||||
StatusURL *url.URL
|
||||
GethURL *url.URL
|
||||
|
||||
Alive bool
|
||||
CurrentBlock uint64
|
||||
|
||||
mux sync.RWMutex
|
||||
|
||||
StatusReverseProxy *httputil.ReverseProxy
|
||||
GethReverseProxy *httputil.ReverseProxy
|
||||
}
|
||||
|
||||
type NodePool struct {
|
||||
Blockchain string
|
||||
Nodes []*Node
|
||||
|
||||
// Counter to observe all nodes
|
||||
Current uint64
|
||||
}
|
||||
|
||||
type BlockchainPool struct {
|
||||
Blockchains []*NodePool
|
||||
}
|
||||
|
||||
type NodeStatusResponse struct {
|
||||
CurrentBlock uint64 `json:"current_block"`
|
||||
}
|
||||
|
||||
// AddNode to the nodes pool
|
||||
func (bpool *BlockchainPool) AddNode(node *Node, blockchain string) {
|
||||
var nodePool *NodePool
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||
)
|
||||
|
||||
var (
|
||||
nodeConfigs NodeConfigs
|
||||
)
|
||||
|
||||
// Node conf
|
||||
type BlockchainConfig struct {
|
||||
Blockchain string
|
||||
IPs []string
|
||||
Port string
|
||||
}
|
||||
|
||||
type NodeConfig struct {
|
||||
Blockchain string
|
||||
Addr string
|
||||
Port uint16
|
||||
}
|
||||
|
||||
type NodeConfigs struct {
|
||||
NodeConfigs []NodeConfig
|
||||
}
|
||||
|
||||
// Return list of NodeConfig structures
|
||||
func (nc *NodeConfigs) InitNodeConfiguration() {
|
||||
// Define available blockchain nodes
|
||||
blockchainConfigList := make([]BlockchainConfig, 0, 2)
|
||||
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
|
||||
Blockchain: "ethereum",
|
||||
IPs: []string{configs.MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR, configs.MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR},
|
||||
Port: "8545",
|
||||
})
|
||||
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
|
||||
Blockchain: "polygon",
|
||||
IPs: []string{configs.MOONSTREAM_NODE_POLYGON_A_IPC_ADDR, configs.MOONSTREAM_NODE_POLYGON_B_IPC_ADDR},
|
||||
Port: "8545",
|
||||
})
|
||||
|
||||
// Parse node addr, ip and blockchain
|
||||
for _, b := range blockchainConfigList {
|
||||
for _, nodeIP := range b.IPs {
|
||||
port, err := strconv.ParseInt(b.Port, 0, 16)
|
||||
if err != nil {
|
||||
log.Printf("Unable to parse port number: %s", b.Port)
|
||||
continue
|
||||
}
|
||||
nc.NodeConfigs = append(nc.NodeConfigs, NodeConfig{
|
||||
Blockchain: b.Blockchain,
|
||||
Addr: nodeIP,
|
||||
Port: uint16(port),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,6 +18,36 @@ type BugoutClient struct {
|
|||
AuthURL string
|
||||
}
|
||||
|
||||
type PingResponse struct {
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// Bugout responses
|
||||
type BugoutUserResponse struct {
|
||||
ID string `json:"user_id"`
|
||||
ApplicationID string `json:"application_id"`
|
||||
}
|
||||
|
||||
type UserAccess struct {
|
||||
UserID string `json:"user_id"`
|
||||
AccessID string `json:"access_id"`
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
BlockchainAccess bool `json:"blockchain_access"`
|
||||
ExtendedMethods bool `json:"extended_methods"`
|
||||
|
||||
dataSource string
|
||||
}
|
||||
|
||||
type BugoutResourceResponse struct {
|
||||
ID string `json:"id"`
|
||||
ResourceData UserAccess `json:"resource_data"`
|
||||
}
|
||||
|
||||
type BugoutResourcesResponse struct {
|
||||
Resources []BugoutResourceResponse `json:"resources"`
|
||||
}
|
||||
|
||||
// Initialize Bugout http client
|
||||
func InitBugoutClient() {
|
||||
client := http.Client{Timeout: configs.BUGOUT_AUTH_CALL_TIMEOUT}
|
||||
|
@ -58,35 +88,44 @@ func (bc *BugoutClient) GetUser(token string) (BugoutUserResponse, error) {
|
|||
return userResponse, nil
|
||||
}
|
||||
|
||||
// Get Bugout resources
|
||||
func (bc *BugoutClient) GetResources(token string, userID string) (BugoutResourcesResponse, error) {
|
||||
url := fmt.Sprintf("%s/resources?application_id=%s", configs.BUGOUT_AUTH_URL, configs.BUGOUT_NODE_BALANCER_APPLICATION_ID)
|
||||
// Get user accesses from Bugout resources
|
||||
func (bc *BugoutClient) GetUserAccesses(token, userID, accessID string) ([]UserAccess, error) {
|
||||
var userAccesses []UserAccess
|
||||
|
||||
url := fmt.Sprintf("%s/resources?application_id=%s", configs.BUGOUT_AUTH_URL, configs.NB_APPLICATION_ID)
|
||||
if userID != "" {
|
||||
url += fmt.Sprintf("&user_id=%s", userID)
|
||||
}
|
||||
if accessID != "" {
|
||||
url += fmt.Sprintf("&access_id=%s", accessID)
|
||||
}
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return BugoutResourcesResponse{}, err
|
||||
return userAccesses, err
|
||||
}
|
||||
req.Header = http.Header{
|
||||
"Authorization": []string{fmt.Sprintf("Bearer %s", token)},
|
||||
}
|
||||
resp, err := bc.Client.Do(req)
|
||||
if err != nil {
|
||||
return BugoutResourcesResponse{}, err
|
||||
return userAccesses, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Parse response
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return BugoutResourcesResponse{}, err
|
||||
return userAccesses, err
|
||||
}
|
||||
var resourcesResponse BugoutResourcesResponse
|
||||
err = json.Unmarshal(body, &resourcesResponse)
|
||||
if err != nil {
|
||||
return BugoutResourcesResponse{}, err
|
||||
return userAccesses, err
|
||||
}
|
||||
|
||||
return resourcesResponse, nil
|
||||
for _, resourceData := range resourcesResponse.Resources {
|
||||
userAccesses = append(userAccesses, resourceData.ResourceData)
|
||||
}
|
||||
|
||||
return userAccesses, nil
|
||||
}
|
||||
|
|
|
@ -15,8 +15,8 @@ var (
|
|||
|
||||
// Command Line Interface state
|
||||
type StateCLI struct {
|
||||
serverCmd *flag.FlagSet
|
||||
clientsCmd *flag.FlagSet
|
||||
serverCmd *flag.FlagSet
|
||||
usersCmd *flag.FlagSet
|
||||
|
||||
// Common flags
|
||||
showVersion bool
|
||||
|
@ -31,7 +31,7 @@ type StateCLI struct {
|
|||
func (s *StateCLI) populateCLI() {
|
||||
// Subcommands setup
|
||||
s.serverCmd = flag.NewFlagSet("server", flag.ExitOnError)
|
||||
s.clientsCmd = flag.NewFlagSet("clients", flag.ExitOnError)
|
||||
s.usersCmd = flag.NewFlagSet("users", flag.ExitOnError)
|
||||
|
||||
// Server subcommand flag pointers
|
||||
s.serverCmd.StringVar(&s.listeningAddr, "host", "127.0.0.1", "Server listening address")
|
||||
|
@ -41,6 +41,8 @@ func (s *StateCLI) populateCLI() {
|
|||
}
|
||||
|
||||
func init() {
|
||||
configs.VerifyEnvironments()
|
||||
|
||||
InitBugoutClient()
|
||||
}
|
||||
|
||||
|
@ -56,21 +58,21 @@ func CLI() {
|
|||
case "server":
|
||||
stateCLI.serverCmd.Parse(os.Args[2:])
|
||||
Server()
|
||||
case "clients":
|
||||
stateCLI.clientsCmd.Parse(os.Args[2:])
|
||||
resources, err := bugoutClient.GetResources(configs.BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN, "")
|
||||
case "users":
|
||||
stateCLI.usersCmd.Parse(os.Args[2:])
|
||||
userAccesses, err := bugoutClient.GetUserAccesses(configs.NB_CONTROLLER_TOKEN, "", "")
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to get resources %v", err)
|
||||
return
|
||||
}
|
||||
resourcesJson, err := json.Marshal(resources)
|
||||
userAccessesJson, err := json.Marshal(userAccesses)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to marshal resources %v", err)
|
||||
return
|
||||
}
|
||||
fmt.Println(string(resourcesJson))
|
||||
fmt.Println(string(userAccessesJson))
|
||||
case "version":
|
||||
fmt.Printf("v%s\n", configs.NODE_BALANCER_VERSION)
|
||||
fmt.Printf("v%s\n", configs.NB_VERSION)
|
||||
default:
|
||||
flag.PrintDefaults()
|
||||
os.Exit(1)
|
||||
|
|
|
@ -3,13 +3,29 @@ package cmd
|
|||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||
)
|
||||
|
||||
var ethereumClientPool ClientPool
|
||||
var polygonClientPool ClientPool
|
||||
var (
|
||||
ethereumClientPool ClientPool
|
||||
polygonClientPool ClientPool
|
||||
)
|
||||
|
||||
// Node - which one node client worked with
|
||||
// LastCallTs - timestamp from last call
|
||||
type Client struct {
|
||||
Node *Node
|
||||
LastCallTs int64
|
||||
|
||||
mux sync.RWMutex
|
||||
}
|
||||
|
||||
type ClientPool struct {
|
||||
Client map[string]*Client
|
||||
}
|
||||
|
||||
// Generate client pools for different blockchains
|
||||
func CreateClientPools() {
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
/*
|
||||
Data structure.
|
||||
*/
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type PingResponse struct {
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
type NodeStatusResponse struct {
|
||||
CurrentBlock uint64 `json:"current_block"`
|
||||
}
|
||||
|
||||
// Bugout responses
|
||||
type BugoutUserResponse struct {
|
||||
ID string `json:"user_id"`
|
||||
ApplicationID string `json:"application_id"`
|
||||
}
|
||||
|
||||
type BugoutResourceDataResponse struct {
|
||||
UserID string `json:"user_id"`
|
||||
BlockchainAccess bool `json:"blockchain_access"`
|
||||
}
|
||||
|
||||
type BugoutResourceResponse struct {
|
||||
ID string `json:"id"`
|
||||
ResourceData BugoutResourceDataResponse `json:"resource_data"`
|
||||
}
|
||||
|
||||
type BugoutResourcesResponse struct {
|
||||
Resources []BugoutResourceResponse `json:"resources"`
|
||||
}
|
||||
|
||||
// Node - which one node client worked with
|
||||
// LastCallTs - timestamp from last call
|
||||
type Client struct {
|
||||
Node *Node
|
||||
LastCallTs int64
|
||||
|
||||
mux sync.RWMutex
|
||||
}
|
||||
|
||||
type ClientPool struct {
|
||||
Client map[string]*Client
|
||||
}
|
||||
|
||||
// Node structure with
|
||||
// StatusURL for status server at node endpoint
|
||||
// GethURL for geth/bor/etc node http.server endpoint
|
||||
type Node struct {
|
||||
StatusURL *url.URL
|
||||
GethURL *url.URL
|
||||
|
||||
Alive bool
|
||||
CurrentBlock uint64
|
||||
|
||||
mux sync.RWMutex
|
||||
|
||||
StatusReverseProxy *httputil.ReverseProxy
|
||||
GethReverseProxy *httputil.ReverseProxy
|
||||
}
|
||||
|
||||
type NodePool struct {
|
||||
Blockchain string
|
||||
Nodes []*Node
|
||||
|
||||
// Counter to observe all nodes
|
||||
Current uint64
|
||||
}
|
||||
|
||||
type BlockchainPool struct {
|
||||
Blockchains []*NodePool
|
||||
}
|
|
@ -5,13 +5,9 @@ package cmd
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
|
||||
|
||||
|
@ -48,69 +44,62 @@ func logMiddleware(next http.Handler) http.Handler {
|
|||
})
|
||||
}
|
||||
|
||||
// Bugout authentication
|
||||
func authMiddleware(next http.Handler) http.Handler {
|
||||
// Check access id was provided correctly and save user access configuration to request context
|
||||
func accessMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
authHeaders := r.Header["Authorization"]
|
||||
authHeadersLen := len(authHeaders)
|
||||
if authHeadersLen == 0 {
|
||||
http.Error(w, "Authorization header not found", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if authHeadersLen > 1 {
|
||||
http.Error(w, "Too many authorization headers provided", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
authHeader := authHeaders[0]
|
||||
var currentUserAccess UserAccess
|
||||
|
||||
// Extract Bearer token
|
||||
headerSlice := strings.Split(authHeader, " ")
|
||||
if len(headerSlice) != 2 {
|
||||
http.Error(w, "Unacceptable token format provided", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if headerSlice[0] != "Bearer" {
|
||||
http.Error(w, "Unacceptable token format provided", http.StatusBadRequest)
|
||||
return
|
||||
var accessID string
|
||||
accessIDHeaders := r.Header[configs.NB_ACCESS_ID_HEADER]
|
||||
for _, h := range accessIDHeaders {
|
||||
accessID = h
|
||||
}
|
||||
|
||||
// Check token is active
|
||||
client := http.Client{Timeout: configs.BUGOUT_AUTH_CALL_TIMEOUT}
|
||||
authReq, err := http.NewRequest("GET", fmt.Sprintf("%s/user", configs.BUGOUT_AUTH_URL), nil)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to construct authorization request", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
authReq.Header.Set("Authorization", authHeader)
|
||||
resp, err := client.Do(authReq)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to reach authorization server", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Parse response from authorization server
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to read respose from authorization server", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var userResponse BugoutUserResponse
|
||||
err = json.Unmarshal(body, &userResponse)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to parse respose from authorization server", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if userResponse.ID == "" {
|
||||
http.Error(w, "Wrong authorization header", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if userResponse.ApplicationID != configs.BUGOUT_NODE_BALANCER_APPLICATION_ID {
|
||||
http.Error(w, "Wrong authorization header", http.StatusForbidden)
|
||||
return
|
||||
dataSource := "database"
|
||||
dataSources := r.Header[configs.NB_DATA_SOURCE_HEADER]
|
||||
for _, h := range dataSources {
|
||||
dataSource = h
|
||||
}
|
||||
|
||||
ctxUser := context.WithValue(r.Context(), "user", userResponse)
|
||||
queries := r.URL.Query()
|
||||
for k, v := range queries {
|
||||
if k == "access_id" {
|
||||
accessID = v[0]
|
||||
}
|
||||
if k == "data_source" {
|
||||
dataSource = v[0]
|
||||
}
|
||||
}
|
||||
|
||||
// If access id does not belong to controller, then find it in Bugout resources
|
||||
if accessID == configs.NB_CONTROLLER_ACCESS_ID {
|
||||
currentUserAccess = controllerUserAccess
|
||||
currentUserAccess.dataSource = dataSource
|
||||
} else {
|
||||
userAccesses, err := bugoutClient.GetUserAccesses(configs.NB_CONTROLLER_TOKEN, "", accessID)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to get user with provided access identifier", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if len(userAccesses) == 0 {
|
||||
http.Error(w, "User with provided access identifier not found", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
userAccess := userAccesses[0]
|
||||
|
||||
currentUserAccess = UserAccess{
|
||||
UserID: userAccess.UserID,
|
||||
AccessID: userAccess.AccessID,
|
||||
Name: userAccess.Name,
|
||||
Description: userAccess.Description,
|
||||
BlockchainAccess: userAccess.BlockchainAccess,
|
||||
ExtendedMethods: userAccess.ExtendedMethods,
|
||||
|
||||
dataSource: dataSource,
|
||||
}
|
||||
}
|
||||
|
||||
ctxUser := context.WithValue(r.Context(), "currentUserAccess", currentUserAccess)
|
||||
|
||||
next.ServeHTTP(w, r.WithContext(ctxUser))
|
||||
})
|
||||
|
|
|
@ -24,13 +24,15 @@ func pingRoute(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
// lbHandler load balances the incoming requests to nodes
|
||||
func lbHandler(w http.ResponseWriter, r *http.Request) {
|
||||
userRaw := r.Context().Value("user")
|
||||
user, ok := userRaw.(BugoutUserResponse)
|
||||
currentUserAccessRaw := r.Context().Value("currentUserAccess")
|
||||
currentUserAccess, ok := currentUserAccessRaw.(UserAccess)
|
||||
if !ok {
|
||||
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println(currentUserAccess)
|
||||
|
||||
attempts := GetAttemptsFromContext(r)
|
||||
if attempts > configs.NB_CONNECTION_RETRIES {
|
||||
log.Printf("Max attempts reached from %s %s, terminating\n", r.RemoteAddr, r.URL.Path)
|
||||
|
@ -56,14 +58,14 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
|||
http.Error(w, fmt.Sprintf("Unacceptable blockchain provided %s", blockchain), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
node = cpool.GetClientNode(user.ID)
|
||||
node = cpool.GetClientNode(currentUserAccess.AccessID)
|
||||
if node == nil {
|
||||
node = blockchainPool.GetNextNode(blockchain)
|
||||
if node == nil {
|
||||
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
cpool.AddClientNode(user.ID, node)
|
||||
cpool.AddClientNode(currentUserAccess.AccessID, node)
|
||||
}
|
||||
|
||||
// Save origin path, to use in proxyErrorHandler if node will not response
|
||||
|
@ -75,7 +77,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
|||
node.StatusReverseProxy.ServeHTTP(w, r)
|
||||
return
|
||||
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)):
|
||||
lbJSONRPCHandler(w, r, blockchain, node, user)
|
||||
lbJSONRPCHandler(w, r, blockchain, node, currentUserAccess)
|
||||
return
|
||||
default:
|
||||
http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest)
|
||||
|
@ -83,51 +85,64 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, user BugoutUserResponse) {
|
||||
var dataSource string
|
||||
dataSources := r.Header[configs.MOONSTREAM_DATA_SOURCE_HEADER]
|
||||
// TODO(kompotkot): Re-write it, to be able to work without database
|
||||
if len(dataSources) == 0 {
|
||||
dataSource = "database"
|
||||
} else {
|
||||
dataSource = dataSources[0]
|
||||
}
|
||||
|
||||
func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentUserAccess UserAccess) {
|
||||
switch {
|
||||
case dataSource == "blockchain":
|
||||
if user.ID != controllerUserID {
|
||||
resources, err := bugoutClient.GetResources(configs.BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN, user.ID)
|
||||
case currentUserAccess.dataSource == "blockchain":
|
||||
if currentUserAccess.BlockchainAccess == false {
|
||||
http.Error(w, "Access to blockchain node not allowed with provided access id", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if currentUserAccess.ExtendedMethods == false {
|
||||
jsonrpcRequest, err := parseJSONRPCRequest(r)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("not allowed %s", dataSource), http.StatusBadRequest)
|
||||
http.Error(w, "Unable to parse JSON RPC request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
blockchainAccess := false
|
||||
for _, resource := range resources.Resources {
|
||||
if resource.ResourceData.BlockchainAccess == true {
|
||||
blockchainAccess = true
|
||||
}
|
||||
}
|
||||
|
||||
if blockchainAccess == false {
|
||||
http.Error(w, fmt.Sprintf("not allowed %s", dataSource), http.StatusBadRequest)
|
||||
err = verifyMethodWhitelisted(jsonrpcRequest.Method)
|
||||
if err != nil {
|
||||
http.Error(w, "Method for provided access id not allowed", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("proxied to node")
|
||||
// r.URL.Path = "/"
|
||||
// node.GethReverseProxy.ServeHTTP(w, r)
|
||||
r.URL.Path = "/"
|
||||
node.GethReverseProxy.ServeHTTP(w, r)
|
||||
return
|
||||
case dataSource == "database":
|
||||
case currentUserAccess.dataSource == "database":
|
||||
lbDatabaseHandler(w, r, blockchain)
|
||||
return
|
||||
default:
|
||||
http.Error(w, fmt.Sprintf("Unacceptable data source %s", dataSource), http.StatusBadRequest)
|
||||
http.Error(w, fmt.Sprintf("Unacceptable data source %s", currentUserAccess.dataSource), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func parseJSONRPCRequest(r *http.Request) (JSONRPCRequest, error) {
|
||||
var jsonrpcRequest JSONRPCRequest
|
||||
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return jsonrpcRequest, err
|
||||
}
|
||||
err = json.Unmarshal(body, &jsonrpcRequest)
|
||||
if err != nil {
|
||||
return jsonrpcRequest, err
|
||||
}
|
||||
|
||||
return jsonrpcRequest, nil
|
||||
}
|
||||
|
||||
var ALLOWED_METHODS = []string{"eth_getBlockByNumber"}
|
||||
|
||||
func verifyMethodWhitelisted(method string) error {
|
||||
for _, m := range ALLOWED_METHODS {
|
||||
if method == m {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("Method not allowed")
|
||||
}
|
||||
|
||||
type JSONRPCRequest struct {
|
||||
Jsonrpc string `json:"jsonrpc"`
|
||||
Method string `json:"method"`
|
||||
|
@ -135,18 +150,10 @@ type JSONRPCRequest struct {
|
|||
ID uint64 `json:"id"`
|
||||
}
|
||||
|
||||
// var ALLOWED_ETH_ENDPOINTS = []string{"eth_getBlockByNumber"}
|
||||
|
||||
func lbDatabaseHandler(w http.ResponseWriter, r *http.Request, blockchain string) {
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
jsonrpcRequest, err := parseJSONRPCRequest(r)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
var jsonrpcRequest JSONRPCRequest
|
||||
err = json.Unmarshal(body, &jsonrpcRequest)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
http.Error(w, "Unable to parse JSON RPC request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -18,8 +18,7 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
// User id to controll access to blockchain nodes
|
||||
controllerUserID string
|
||||
controllerUserAccess UserAccess
|
||||
|
||||
// Crash reporter
|
||||
reporter *humbug.HumbugReporter
|
||||
|
@ -105,18 +104,26 @@ func Server() {
|
|||
var err error
|
||||
sessionID := uuid.New().String()
|
||||
consent := humbug.CreateHumbugConsent(humbug.True)
|
||||
reporter, err = humbug.CreateHumbugReporter(consent, "moonstream-node-balancer", sessionID, configs.HUMBUG_REPORTER_NODE_BALANCER_TOKEN)
|
||||
reporter, err = humbug.CreateHumbugReporter(consent, "moonstream-node-balancer", sessionID, configs.HUMBUG_REPORTER_NB_TOKEN)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Invalid Humbug Crash configuration: %s", err.Error()))
|
||||
}
|
||||
// Record system information
|
||||
reporter.Publish(humbug.SystemReport())
|
||||
|
||||
user, err := bugoutClient.GetUser(configs.BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN)
|
||||
userAccesses, err := bugoutClient.GetUserAccesses(configs.NB_CONTROLLER_TOKEN, "", configs.NB_CONTROLLER_ACCESS_ID)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to access Bugout authentication server %v", err)
|
||||
}
|
||||
controllerUserID = user.ID
|
||||
userAccess := userAccesses[0]
|
||||
controllerUserAccess = UserAccess{
|
||||
UserID: userAccess.UserID,
|
||||
AccessID: userAccess.AccessID,
|
||||
Name: userAccess.Name,
|
||||
Description: userAccess.Description,
|
||||
BlockchainAccess: userAccess.BlockchainAccess,
|
||||
ExtendedMethods: userAccess.ExtendedMethods,
|
||||
}
|
||||
|
||||
err = InitDatabaseClient()
|
||||
if err != nil {
|
||||
|
@ -124,10 +131,10 @@ func Server() {
|
|||
}
|
||||
|
||||
// Fill NodeConfigList with initial nodes from environment variables
|
||||
configs.ConfigList.InitNodeConfigList()
|
||||
nodeConfigs.InitNodeConfiguration()
|
||||
|
||||
// Parse nodes and set list of proxies
|
||||
for i, nodeConfig := range configs.ConfigList.Configs {
|
||||
for i, nodeConfig := range nodeConfigs.NodeConfigs {
|
||||
gethUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", nodeConfig.Addr, nodeConfig.Port))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
@ -156,7 +163,7 @@ func Server() {
|
|||
}
|
||||
|
||||
serveMux := http.NewServeMux()
|
||||
serveMux.Handle("/nb/", authMiddleware(http.HandlerFunc(lbHandler)))
|
||||
serveMux.Handle("/nb/", accessMiddleware(http.HandlerFunc(lbHandler)))
|
||||
serveMux.HandleFunc("/ping", pingRoute)
|
||||
|
||||
// Set common middlewares, from bottom to top
|
||||
|
|
|
@ -6,43 +6,48 @@ package configs
|
|||
import (
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Bugout config
|
||||
var BUGOUT_AUTH_URL = os.Getenv("BUGOUT_AUTH_URL")
|
||||
var BUGOUT_NODE_BALANCER_APPLICATION_ID = os.Getenv("BUGOUT_NODE_BALANCER_APPLICATION_ID")
|
||||
var BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN = os.Getenv("BUGOUT_NODE_BALANCER_CONTROLLER_TOKEN")
|
||||
var BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 5
|
||||
var (
|
||||
// Bugout and application configuration
|
||||
BUGOUT_AUTH_URL = os.Getenv("BUGOUT_AUTH_URL")
|
||||
BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 5
|
||||
NB_APPLICATION_ID = os.Getenv("NB_APPLICATION_ID")
|
||||
NB_CONTROLLER_TOKEN = os.Getenv("NB_CONTROLLER_TOKEN")
|
||||
NB_CONTROLLER_ACCESS_ID = os.Getenv("NB_CONTROLLER_ACCESS_ID")
|
||||
|
||||
// Node config
|
||||
type BlockchainConfig struct {
|
||||
Blockchain string
|
||||
IPs []string
|
||||
Port string
|
||||
}
|
||||
NB_CONNECTION_RETRIES = 2
|
||||
NB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10
|
||||
NB_HEALTH_CHECK_INTERVAL = time.Second * 5
|
||||
NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
|
||||
|
||||
type NodeConfig struct {
|
||||
Blockchain string
|
||||
Addr string
|
||||
Port uint16
|
||||
}
|
||||
// Client configuration
|
||||
NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds
|
||||
|
||||
type NodeConfigList struct {
|
||||
Configs []NodeConfig
|
||||
}
|
||||
// Hardcoded node addresses
|
||||
// TODO(kompotkot): Write CLI to be able to add nodes
|
||||
MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR")
|
||||
MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR")
|
||||
MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_A_IPC_ADDR")
|
||||
MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_B_IPC_ADDR")
|
||||
|
||||
var ConfigList NodeConfigList
|
||||
MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT")
|
||||
|
||||
var MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR")
|
||||
var MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR")
|
||||
var MOONSTREAM_NODE_POLYGON_A_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_A_IPC_ADDR")
|
||||
var MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = os.Getenv("MOONSTREAM_NODE_POLYGON_B_IPC_ADDR")
|
||||
var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT")
|
||||
var MOONSTREAM_DATA_SOURCE_HEADER = os.Getenv("MOONSTREAM_DATA_SOURCE_HEADER")
|
||||
NB_ACCESS_ID_HEADER = os.Getenv("NB_ACCESS_ID_HEADER")
|
||||
NB_DATA_SOURCE_HEADER = os.Getenv("NB_DATA_SOURCE_HEADER")
|
||||
|
||||
func checkEnvVarSet() {
|
||||
// Humbug configuration
|
||||
HUMBUG_REPORTER_NB_TOKEN = os.Getenv("HUMBUG_REPORTER_NB_TOKEN")
|
||||
|
||||
// Database configuration
|
||||
MOONSTREAM_DB_URI_READ_ONLY = os.Getenv("MOONSTREAM_DB_URI_READ_ONLY")
|
||||
MOONSTREAM_DB_MAX_IDLE_CONNS int = 30
|
||||
MOONSTREAM_DB_CONN_MAX_LIFETIME = 30 * time.Minute
|
||||
)
|
||||
|
||||
// Verify required environment variables are set
|
||||
func VerifyEnvironments() {
|
||||
if MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR == "" {
|
||||
MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR = "a.ethereum.moonstream.internal"
|
||||
}
|
||||
|
@ -57,61 +62,15 @@ func checkEnvVarSet() {
|
|||
MOONSTREAM_NODE_POLYGON_B_IPC_ADDR = "b.polygon.moonstream.internal"
|
||||
}
|
||||
|
||||
if MOONSTREAM_DATA_SOURCE_HEADER == "" {
|
||||
MOONSTREAM_DATA_SOURCE_HEADER = "X-Moonstream-Data-Source"
|
||||
if NB_ACCESS_ID_HEADER == "" {
|
||||
NB_ACCESS_ID_HEADER = "X-Node-Balancer-Access-Id"
|
||||
}
|
||||
|
||||
if NB_DATA_SOURCE_HEADER == "" {
|
||||
NB_DATA_SOURCE_HEADER = "X-Node-Balancer-Data-Source"
|
||||
}
|
||||
|
||||
if MOONSTREAM_NODES_SERVER_PORT == "" {
|
||||
log.Fatal("MOONSTREAM_NODES_SERVER_PORT environment variable not set")
|
||||
}
|
||||
}
|
||||
|
||||
// Return list of NodeConfig structures
|
||||
func (nc *NodeConfigList) InitNodeConfigList() {
|
||||
checkEnvVarSet()
|
||||
|
||||
// Define available blockchain nodes
|
||||
blockchainConfigList := make([]BlockchainConfig, 0, 2)
|
||||
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
|
||||
Blockchain: "ethereum",
|
||||
IPs: []string{MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR, MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR},
|
||||
Port: "8545",
|
||||
})
|
||||
blockchainConfigList = append(blockchainConfigList, BlockchainConfig{
|
||||
Blockchain: "polygon",
|
||||
IPs: []string{MOONSTREAM_NODE_POLYGON_A_IPC_ADDR, MOONSTREAM_NODE_POLYGON_B_IPC_ADDR},
|
||||
Port: "8545",
|
||||
})
|
||||
|
||||
// Parse node addr, ip and blockchain
|
||||
for _, b := range blockchainConfigList {
|
||||
for _, nodeIP := range b.IPs {
|
||||
port, err := strconv.ParseInt(b.Port, 0, 16)
|
||||
if err != nil {
|
||||
log.Printf("Unable to parse port number: %s", b.Port)
|
||||
continue
|
||||
}
|
||||
nc.Configs = append(nc.Configs, NodeConfig{
|
||||
Blockchain: b.Blockchain,
|
||||
Addr: nodeIP,
|
||||
Port: uint16(port),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var NB_CONNECTION_RETRIES = 2
|
||||
var NB_CONNECTION_RETRIES_INTERVAL = time.Millisecond * 10
|
||||
var NB_HEALTH_CHECK_INTERVAL = time.Second * 5
|
||||
var NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
|
||||
|
||||
// Client config
|
||||
var NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds
|
||||
|
||||
// Humbug config
|
||||
var HUMBUG_REPORTER_NODE_BALANCER_TOKEN = os.Getenv("HUMBUG_REPORTER_NODE_BALANCER_TOKEN")
|
||||
|
||||
// Database config
|
||||
var MOONSTREAM_DB_URI_READ_ONLY = os.Getenv("MOONSTREAM_DB_URI_READ_ONLY")
|
||||
var MOONSTREAM_DB_MAX_IDLE_CONNS int = 30
|
||||
var MOONSTREAM_DB_CONN_MAX_LIFETIME = 30 * time.Minute
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
package configs
|
||||
|
||||
var NODE_BALANCER_VERSION = "0.0.2"
|
||||
var NB_VERSION = "0.0.2"
|
||||
|
|
|
@ -1,17 +1,20 @@
|
|||
# Required environment variables for load balancer
|
||||
export BUGOUT_AUTH_URL="https://auth.bugout.dev"
|
||||
export BUGOUT_NODE_BALANCER_APPLICATION_ID="<application_id_to_controll_access>"
|
||||
export BUGOUT_INTERNAL_CRAWLERS_USER_ID="<application_user_id_of_moonstream_internal_crawlers>"
|
||||
export MOONSTREAM_NODES_SERVER_PORT="<node_status_server_port>"
|
||||
export HUMBUG_REPORTER_NODE_BALANCER_TOKEN="<bugout_humbug_token_for_crash_reports>"
|
||||
export NODE_BALANCER_APPLICATION_ID="<application_id_to_controll_access>"
|
||||
export NODE_BALANCER_CONTROLLER_TOKEN="<token_of_controller_user>"
|
||||
export NODE_BALANCER_CONTROLLER_ACCESS_ID="<controller_access_id_for_internal_crawlers>"
|
||||
|
||||
# Database variables
|
||||
export MOONSTREAM_DB_URI="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>"
|
||||
|
||||
# Nodes
|
||||
export MOONSTREAM_NODES_SERVER_PORT="<node_status_server_port>"
|
||||
# Ethereum nodes depends variables
|
||||
export MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR="<node_geth_http_ip_addr>"
|
||||
export MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR="<node_geth_http_ip_addr>"
|
||||
|
||||
export MOONSTREAM_NODE_ETHEREUM_A_IPC_ADDR="127.0.0.1"
|
||||
export MOONSTREAM_NODE_ETHEREUM_B_IPC_ADDR="127.0.0.2"
|
||||
# Polygon nodes depends variables
|
||||
export MOONSTREAM_NODE_POLYGON_A_IPC_ADDR="<node_bor_http_ip_addr>"
|
||||
export MOONSTREAM_NODE_POLYGON_B_IPC_ADDR="<node_bor_http_ip_addr>"
|
||||
export MOONSTREAM_NODE_POLYGON_A_IPC_ADDR="127.0.0.1"
|
||||
export MOONSTREAM_NODE_POLYGON_B_IPC_ADDR="127.0.0.2"
|
||||
|
||||
# Error humbug reporter
|
||||
export HUMBUG_REPORTER_NODE_BALANCER_TOKEN="<bugout_humbug_token_for_crash_reports>"
|
||||
|
|
Ładowanie…
Reference in New Issue