Removed status url, config is json, instead of status call latest

pull/637/head
kompotkot 2022-07-12 18:48:44 +00:00
rodzic cc0b25bf03
commit 1b2f60d97c
14 zmienionych plików z 210 dodań i 216 usunięć

Wyświetl plik

@ -4,17 +4,17 @@ Load balancer, based on https://github.com/kasvith/simplelb/
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
"strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
) )
// Main variable of pool of blockchains which contains pool of nodes // Main variable of pool of blockchains which contains pool of nodes
@ -23,10 +23,9 @@ var blockchainPool BlockchainPool
// Node structure with // Node structure with
// StatusURL for status server at node endpoint // StatusURL for status server at node endpoint
// GethURL for geth/bor/etc node http.server endpoint // Endpoint for geth/bor/etc node http.server endpoint
type Node struct { type Node struct {
StatusURL *url.URL Endpoint *url.URL
GethURL *url.URL
Alive bool Alive bool
CurrentBlock uint64 CurrentBlock uint64
@ -49,8 +48,13 @@ type BlockchainPool struct {
Blockchains []*NodePool Blockchains []*NodePool
} }
// Node status response struct for HealthCheck
type NodeStatusResultResponse struct {
Number string `json:"number"`
}
type NodeStatusResponse struct { type NodeStatusResponse struct {
CurrentBlock uint64 `json:"current_block"` Result NodeStatusResultResponse `json:"result"`
} }
// AddNode to the nodes pool // AddNode to the nodes pool
@ -153,11 +157,11 @@ func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node {
return nil return nil
} }
// SetNodeStatus changes a status of a node by StatusURL or GethURL // SetNodeStatus modify status of the node
func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) { func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) {
for _, b := range bpool.Blockchains { for _, b := range bpool.Blockchains {
for _, n := range b.Nodes { for _, n := range b.Nodes {
if n.StatusURL.String() == url.String() || n.GethURL.String() == url.String() { if n.Endpoint.String() == url.String() {
n.SetAlive(alive) n.SetAlive(alive)
break break
} }
@ -165,55 +169,77 @@ func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) {
} }
} }
// StatusLog logs nodes statuses // StatusLog logs node status
// TODO(kompotkot): Print list of alive and dead nodes // TODO(kompotkot): Print list of alive and dead nodes
func (bpool *BlockchainPool) StatusLog() { func (bpool *BlockchainPool) StatusLog() {
for _, b := range bpool.Blockchains { for _, b := range bpool.Blockchains {
for _, n := range b.Nodes { for _, n := range b.Nodes {
log.Printf( log.Printf(
"Blockchain %s node %s is alive %t. Blockchain called %d times", "Blockchain %s node %s is alive %t. Blockchain called %d times",
b.Blockchain, n.StatusURL, n.Alive, b.Current, b.Blockchain, n.Endpoint.Host, n.Alive, b.Current,
) )
} }
} }
} }
// HealthCheck fetch the node status and current block server // HealthCheck fetch the node latest block
func (bpool *BlockchainPool) HealthCheck() { func (bpool *BlockchainPool) HealthCheck() {
for _, b := range bpool.Blockchains { for _, b := range bpool.Blockchains {
for _, n := range b.Nodes { for _, n := range b.Nodes {
n.SetAlive(false) httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT}
n.SetCurrentBlock(0) resp, err := httpClient.Post(
n.Endpoint.String(),
// Get response from node /ping endpoint "application/json",
httpClient := http.Client{Timeout: configs.NB_HEALTH_CHECK_CALL_TIMEOUT} bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)),
resp, err := httpClient.Get(fmt.Sprintf("%s/status", n.StatusURL)) )
if err != nil { if err != nil {
log.Printf("Unable to reach node: %s\n", n.StatusURL) n.SetAlive(false)
n.SetCurrentBlock(0)
log.Printf("Unable to reach node: %s", n.Endpoint.Host)
continue continue
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
log.Printf("Unable to parse response from node: %s\n", n.StatusURL) n.SetAlive(false)
n.SetCurrentBlock(0)
log.Printf("Unable to parse response from %s node, err %v", n.Endpoint.Host, err)
continue continue
} }
var statusResponse NodeStatusResponse var statusResponse NodeStatusResponse
err = json.Unmarshal(body, &statusResponse) err = json.Unmarshal(body, &statusResponse)
if err != nil { if err != nil {
log.Printf("Unable to read json response from node: %s\n", n.StatusURL) n.SetAlive(false)
n.SetCurrentBlock(0)
log.Printf("Unable to read json response from %s node, err: %v", n.Endpoint.Host, err)
continue
}
blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1)
blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64)
if err != nil {
n.SetAlive(false)
n.SetCurrentBlock(0)
log.Printf("Unable to parse block number from hex to string, err: %v", err)
continue continue
} }
// Mark node in list of nodes as alive or not and update current block // Mark node in list of nodes as alive or not and update current block
n.SetAlive(true) var alive bool
if statusResponse.CurrentBlock != 0 { if blockNumber != 0 {
n.SetCurrentBlock(statusResponse.CurrentBlock) alive = true
} else {
alive = false
} }
n.SetAlive(alive)
n.SetCurrentBlock(blockNumber)
log.Printf("Node %s is alive: %t with current block: %d blockchain called: %d times\n", n.StatusURL, true, statusResponse.CurrentBlock, b.Current) log.Printf(
"Node %s is alive: %t with current block: %d blockchain called: %d times",
n.Endpoint.Host, alive, blockNumber, b.Current,
)
} }
} }
} }

Wyświetl plik

@ -1,17 +1,6 @@
package main package main
import (
"io/ioutil"
"log"
"strconv"
"strings"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
)
var ( var (
nodeConfigs NodeConfigs
ALLOWED_METHODS = map[string]bool{ ALLOWED_METHODS = map[string]bool{
"eth_blockNumber": true, "eth_blockNumber": true,
"eth_call": true, "eth_call": true,
@ -57,49 +46,8 @@ type JSONRPCRequest struct {
ID uint64 `json:"id"` ID uint64 `json:"id"`
} }
// Node conf
type BlockchainConfig struct { type BlockchainConfig struct {
Blockchain string Blockchain string
IPs []string IPs []string
Port string Port string
} }
type NodeConfig struct {
Blockchain string
Addr string
Port uint16
}
type NodeConfigs struct {
NodeConfigs []NodeConfig
}
// Return list of NodeConfig structures
func (nc *NodeConfigs) InitNodeConfigList(configPath string) {
configs.CheckEnvVarSet()
rawBytes, err := ioutil.ReadFile(configPath)
if err != nil {
log.Fatalf("Unable to read config file, %v", err)
}
text := string(rawBytes)
lines := strings.Split(text, "\n")
// Define available blockchain nodes
for _, line := range lines {
fields := strings.Split(line, ",")
if len(fields) == 3 {
port, err := strconv.ParseInt(fields[2], 0, 16)
if err != nil {
log.Printf("Unable to parse port number, %v", err)
continue
}
nc.NodeConfigs = append(nc.NodeConfigs, NodeConfig{
Blockchain: fields[0],
Addr: fields[1],
Port: uint16(port),
})
}
}
}

Wyświetl plik

@ -4,13 +4,12 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"log"
"os" "os"
"strings" "strings"
bugout "github.com/bugout-dev/bugout-go/pkg" bugout "github.com/bugout-dev/bugout-go/pkg"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
) )
var ( var (
@ -138,9 +137,20 @@ func (s *StateCLI) checkRequirements() {
} }
} }
config := configs.GetConfigPath(s.configPathFlag) // Load configuration
if !configs.CheckPathExists(config.ConfigPath) { config, err := GetConfigPath(s.configPathFlag)
configs.GenerateDefaultConfig(config) if err != nil {
fmt.Println(err)
os.Exit(1)
}
if !config.ConfigExists {
if err := GenerateDefaultConfig(config); err != nil {
fmt.Println(err)
os.Exit(1)
}
} else {
log.Printf("Loaded configuration from %s", config.ConfigPath)
} }
s.configPathFlag = config.ConfigPath s.configPathFlag = config.ConfigPath
} }
@ -183,7 +193,7 @@ func (s *StateCLI) populateCLI() {
s.usersCmd.IntVar(&s.offsetFlag, "offset", 0, "Result output offset") s.usersCmd.IntVar(&s.offsetFlag, "offset", 0, "Result output offset")
} }
func CLI() { func cli() {
stateCLI.populateCLI() stateCLI.populateCLI()
if len(os.Args) < 2 { if len(os.Args) < 2 {
stateCLI.usage() stateCLI.usage()
@ -193,7 +203,7 @@ func CLI() {
// Init bugout client // Init bugout client
bc, err := bugout.ClientFromEnv() bc, err := bugout.ClientFromEnv()
if err != nil { if err != nil {
fmt.Printf("Unable to initialize bugout client %v", err) fmt.Printf("Unable to initialize bugout client, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
bugoutClient = bc bugoutClient = bc
@ -213,24 +223,24 @@ func CLI() {
ExtendedMethods: stateCLI.extendedMethodsFlag, ExtendedMethods: stateCLI.extendedMethodsFlag,
} }
_, err := bugoutClient.Brood.FindUser( _, err := bugoutClient.Brood.FindUser(
configs.NB_CONTROLLER_TOKEN, NB_CONTROLLER_TOKEN,
map[string]string{ map[string]string{
"user_id": proposedUserAccess.UserID, "user_id": proposedUserAccess.UserID,
"application_id": configs.NB_APPLICATION_ID, "application_id": NB_APPLICATION_ID,
}, },
) )
if err != nil { if err != nil {
fmt.Printf("User does not exists %v\n", err) fmt.Printf("User does not exists, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
resource, err := bugoutClient.Brood.CreateResource(configs.NB_CONTROLLER_TOKEN, configs.NB_APPLICATION_ID, proposedUserAccess) resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, proposedUserAccess)
if err != nil { if err != nil {
fmt.Printf("Unable to create user access %v\n", err) fmt.Printf("Unable to create user access, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
resource_data, err := json.Marshal(resource.ResourceData) resource_data, err := json.Marshal(resource.ResourceData)
if err != nil { if err != nil {
fmt.Printf("Unable to encode resource %s data interface to json %v", resource.Id, err) fmt.Printf("Unable to encode resource %s data interface to json, err: %v", resource.Id, err)
os.Exit(1) os.Exit(1)
} }
fmt.Println(string(resource_data)) fmt.Println(string(resource_data))
@ -251,31 +261,31 @@ func CLI() {
queryParameters["access_id"] = stateCLI.accessIDFlag queryParameters["access_id"] = stateCLI.accessIDFlag
} }
resources, err := bugoutClient.Brood.GetResources( resources, err := bugoutClient.Brood.GetResources(
configs.NB_CONTROLLER_TOKEN, NB_CONTROLLER_TOKEN,
configs.NB_APPLICATION_ID, NB_APPLICATION_ID,
queryParameters, queryParameters,
) )
if err != nil { if err != nil {
fmt.Printf("Unable to get Bugout resources %v\n", err) fmt.Printf("Unable to get Bugout resources, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
var userAccesses []ClientResourceData var userAccesses []ClientResourceData
for _, resource := range resources.Resources { for _, resource := range resources.Resources {
deletedResource, err := bugoutClient.Brood.DeleteResource(configs.NB_CONTROLLER_TOKEN, resource.Id) deletedResource, err := bugoutClient.Brood.DeleteResource(NB_CONTROLLER_TOKEN, resource.Id)
if err != nil { if err != nil {
fmt.Printf("Unable to delete resource %s %v\n", resource.Id, err) fmt.Printf("Unable to delete resource %s, err: %v\n", resource.Id, err)
continue continue
} }
resource_data, err := json.Marshal(deletedResource.ResourceData) resource_data, err := json.Marshal(deletedResource.ResourceData)
if err != nil { if err != nil {
fmt.Printf("Unable to encode resource %s data interface to json %v", resource.Id, err) fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err)
continue continue
} }
var userAccess ClientResourceData var userAccess ClientResourceData
err = json.Unmarshal(resource_data, &userAccess) err = json.Unmarshal(resource_data, &userAccess)
if err != nil { if err != nil {
fmt.Printf("Unable to decode resource %s data json to structure %v", resource.Id, err) fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err)
continue continue
} }
userAccesses = append(userAccesses, userAccess) userAccesses = append(userAccesses, userAccess)
@ -283,7 +293,7 @@ func CLI() {
userAccessesJson, err := json.Marshal(userAccesses) userAccessesJson, err := json.Marshal(userAccesses)
if err != nil { if err != nil {
fmt.Printf("Unable to marshal user access struct %v\n", err) fmt.Printf("Unable to marshal user access struct, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
fmt.Println(string(userAccessesJson)) fmt.Println(string(userAccessesJson))
@ -292,7 +302,7 @@ func CLI() {
stateCLI.serverCmd.Parse(os.Args[2:]) stateCLI.serverCmd.Parse(os.Args[2:])
stateCLI.checkRequirements() stateCLI.checkRequirements()
configs.CheckEnvVarSet() CheckEnvVarSet()
Server() Server()
@ -308,12 +318,12 @@ func CLI() {
queryParameters["access_id"] = stateCLI.accessIDFlag queryParameters["access_id"] = stateCLI.accessIDFlag
} }
resources, err := bugoutClient.Brood.GetResources( resources, err := bugoutClient.Brood.GetResources(
configs.NB_CONTROLLER_TOKEN, NB_CONTROLLER_TOKEN,
configs.NB_APPLICATION_ID, NB_APPLICATION_ID,
queryParameters, queryParameters,
) )
if err != nil { if err != nil {
fmt.Printf("Unable to get Bugout resources %v\n", err) fmt.Printf("Unable to get Bugout resources, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
@ -331,20 +341,20 @@ func CLI() {
for _, resource := range resources.Resources[offset:limit] { for _, resource := range resources.Resources[offset:limit] {
resource_data, err := json.Marshal(resource.ResourceData) resource_data, err := json.Marshal(resource.ResourceData)
if err != nil { if err != nil {
fmt.Printf("Unable to encode resource %s data interface to json %v", resource.Id, err) fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err)
continue continue
} }
var userAccess ClientResourceData var userAccess ClientResourceData
err = json.Unmarshal(resource_data, &userAccess) err = json.Unmarshal(resource_data, &userAccess)
if err != nil { if err != nil {
fmt.Printf("Unable to decode resource %s data json to structure %v", resource.Id, err) fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err)
continue continue
} }
userAccesses = append(userAccesses, userAccess) userAccesses = append(userAccesses, userAccess)
} }
userAccessesJson, err := json.Marshal(userAccesses) userAccessesJson, err := json.Marshal(userAccesses)
if err != nil { if err != nil {
fmt.Printf("Unable to marshal user accesses struct %v\n", err) fmt.Printf("Unable to marshal user accesses struct, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
fmt.Println(string(userAccessesJson)) fmt.Println(string(userAccessesJson))
@ -353,7 +363,7 @@ func CLI() {
stateCLI.versionCmd.Parse(os.Args[2:]) stateCLI.versionCmd.Parse(os.Args[2:])
stateCLI.checkRequirements() stateCLI.checkRequirements()
fmt.Printf("v%s\n", configs.NB_VERSION) fmt.Printf("v%s\n", NB_VERSION)
default: default:
stateCLI.usage() stateCLI.usage()

Wyświetl plik

@ -5,8 +5,6 @@ import (
"reflect" "reflect"
"sync" "sync"
"time" "time"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
) )
var ( var (
@ -104,7 +102,7 @@ func (cpool *ClientPool) AddClientNode(id string, node *Node) {
func (cpool *ClientPool) GetClientNode(id string) *Node { func (cpool *ClientPool) GetClientNode(id string) *Node {
if cpool.Client[id] != nil { if cpool.Client[id] != nil {
lastCallTs := cpool.Client[id].GetClientLastCallDiff() lastCallTs := cpool.Client[id].GetClientLastCallDiff()
if lastCallTs < configs.NB_CLIENT_NODE_KEEP_ALIVE { if lastCallTs < NB_CLIENT_NODE_KEEP_ALIVE {
cpool.Client[id].UpdateClientLastCall() cpool.Client[id].UpdateClientLastCall()
return cpool.Client[id].Node return cpool.Client[id].Node
} }
@ -119,7 +117,7 @@ func (cpool *ClientPool) CleanInactiveClientNodes() int {
cnt := 0 cnt := 0
for id, client := range cpool.Client { for id, client := range cpool.Client {
lastCallTs := client.GetClientLastCallDiff() lastCallTs := client.GetClientLastCallDiff()
if lastCallTs >= configs.NB_CLIENT_NODE_KEEP_ALIVE { if lastCallTs >= NB_CLIENT_NODE_KEEP_ALIVE {
delete(cpool.Client, id) delete(cpool.Client, id)
} else { } else {
cnt += 1 cnt += 1

Wyświetl plik

@ -4,8 +4,6 @@ import (
"reflect" "reflect"
"testing" "testing"
"time" "time"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
) )
func TestAddClientNode(t *testing.T) { func TestAddClientNode(t *testing.T) {
@ -40,7 +38,7 @@ func TestGetClientNode(t *testing.T) {
{map[string]*Client{}, "1", nil}, {map[string]*Client{}, "1", nil},
{map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, {map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}},
{map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, {map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil},
{map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, {map[string]*Client{"1": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil},
} }
for _, c := range cases { for _, c := range cases {
CreateClientPools() CreateClientPools()
@ -63,11 +61,11 @@ func TestCleanInactiveClientNodes(t *testing.T) {
clients map[string]*Client clients map[string]*Client
expected string expected string
}{ }{
{map[string]*Client{"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}}, ""}, {map[string]*Client{"1": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE}}, ""},
{map[string]*Client{"1": {LastCallTs: ts}}, "1"}, {map[string]*Client{"1": {LastCallTs: ts}}, "1"},
{map[string]*Client{ {map[string]*Client{
"1": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE}, "1": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE},
"2": {LastCallTs: ts - configs.NB_CLIENT_NODE_KEEP_ALIVE - 10}, "2": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE - 10},
"3": {LastCallTs: ts}, "3": {LastCallTs: ts},
}, "3"}, }, "3"},
} }

Wyświetl plik

@ -1,10 +1,12 @@
/* /*
Configurations for load balancer server. Configurations for load balancer server.
*/ */
package configs package main
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
@ -43,8 +45,6 @@ var (
MOONSTREAM_DB_CONN_MAX_LIFETIME = 30 * time.Minute MOONSTREAM_DB_CONN_MAX_LIFETIME = 30 * time.Minute
) )
var MOONSTREAM_NODES_SERVER_PORT = os.Getenv("MOONSTREAM_NODES_SERVER_PORT")
func CheckEnvVarSet() { func CheckEnvVarSet() {
if NB_ACCESS_ID_HEADER == "" { if NB_ACCESS_ID_HEADER == "" {
NB_ACCESS_ID_HEADER = "x-node-balancer-access-id" NB_ACCESS_ID_HEADER = "x-node-balancer-access-id"
@ -52,14 +52,31 @@ func CheckEnvVarSet() {
if NB_DATA_SOURCE_HEADER == "" { if NB_DATA_SOURCE_HEADER == "" {
NB_DATA_SOURCE_HEADER = "x-node-balancer-data-source" NB_DATA_SOURCE_HEADER = "x-node-balancer-data-source"
} }
if MOONSTREAM_NODES_SERVER_PORT == "" {
fmt.Println("Environment variable MOONSTREAM_NODES_SERVER_PORT not set")
os.Exit(1)
}
} }
type Config struct { // Nodes configuration
type NodeConfig struct {
Blockchain string `json:"blockchain"`
Endpoint string `json:"endpoint"`
Internal bool `json:"internal"`
}
func LoadConfig(configPath string) (*[]NodeConfig, error) {
rawBytes, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, err
}
nodeConfigs := &[]NodeConfig{}
err = json.Unmarshal(rawBytes, nodeConfigs)
if err != nil {
return nil, err
}
return nodeConfigs, nil
}
type ConfigPlacement struct {
ConfigDirPath string ConfigDirPath string
ConfigDirExists bool ConfigDirExists bool
@ -67,28 +84,26 @@ type Config struct {
ConfigExists bool ConfigExists bool
} }
func CheckPathExists(path string) bool { func CheckPathExists(path string) (bool, error) {
var exists = true var exists = true
_, err := os.Stat(path) _, err := os.Stat(path)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
exists = false exists = false
} else { } else {
fmt.Println(err) return exists, fmt.Errorf("Error due checking file path exists, err: %v", err)
os.Exit(1)
} }
} }
return exists return exists, nil
} }
func GetConfigPath(providedPath string) *Config { func GetConfigPath(providedPath string) (*ConfigPlacement, error) {
var configDirPath, configPath string var configDirPath, configPath string
if providedPath == "" { if providedPath == "" {
homeDir, err := os.UserHomeDir() homeDir, err := os.UserHomeDir()
if err != nil { if err != nil {
fmt.Printf("Unable to find user home directory, %v", err) return nil, fmt.Errorf("Unable to find user home directory, %v", err)
os.Exit(1)
} }
configDirPath = fmt.Sprintf("%s/.nodebalancer", homeDir) configDirPath = fmt.Sprintf("%s/.nodebalancer", homeDir)
configPath = fmt.Sprintf("%s/config.txt", configDirPath) configPath = fmt.Sprintf("%s/config.txt", configDirPath)
@ -97,35 +112,48 @@ func GetConfigPath(providedPath string) *Config {
configDirPath = filepath.Dir(configPath) configDirPath = filepath.Dir(configPath)
} }
defaultConfig := &Config{ configDirPathExists, err := CheckPathExists(configDirPath)
ConfigDirPath: configDirPath, if err != nil {
ConfigDirExists: CheckPathExists(configDirPath), return nil, err
}
ConfigPath: configPath, configPathExists, err := CheckPathExists(configPath)
ConfigExists: CheckPathExists(configPath), if err != nil {
return nil, err
} }
return defaultConfig config := &ConfigPlacement{
ConfigDirPath: configDirPath,
ConfigDirExists: configDirPathExists,
ConfigPath: configPath,
ConfigExists: configPathExists,
}
return config, nil
} }
func GenerateDefaultConfig(config *Config) string { func GenerateDefaultConfig(config *ConfigPlacement) error {
if !config.ConfigDirExists { if !config.ConfigDirExists {
if err := os.MkdirAll(config.ConfigDirPath, os.ModePerm); err != nil { if err := os.MkdirAll(config.ConfigDirPath, os.ModePerm); err != nil {
fmt.Printf("Unable to create directory, %v", err) return fmt.Errorf("Unable to create directory, %v", err)
os.Exit(1)
} }
log.Printf("Config directory created at: %s", config.ConfigDirPath) log.Printf("Config directory created at: %s", config.ConfigDirPath)
} }
if !config.ConfigExists { if !config.ConfigExists {
tempConfigB := []byte("ethereum,127.0.0.1,8545") tempConfig := []NodeConfig{
err := os.WriteFile(config.ConfigPath, tempConfigB, 0644) {Blockchain: "ethereum", Endpoint: "http://127.0.0.1:8545", Internal: true},
}
tempConfigJson, err := json.Marshal(tempConfig)
if err != nil { if err != nil {
fmt.Printf("Unable to create temp config file, %v", err) return fmt.Errorf("Unable to marshal configuration data, err: %v", err)
os.Exit(1) }
err = ioutil.WriteFile(config.ConfigPath, tempConfigJson, os.ModePerm)
if err != nil {
return fmt.Errorf("Unable to write default config to file %s, err: %v", config.ConfigPath, err)
} }
log.Printf("Created default configuration at %s", config.ConfigPath) log.Printf("Created default configuration at %s", config.ConfigPath)
} }
return config.ConfigPath return nil
} }

Wyświetl plik

@ -4,8 +4,6 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
_ "github.com/lib/pq" _ "github.com/lib/pq"
) )
@ -19,18 +17,18 @@ type DatabaseClient struct {
// Establish connection with database // Establish connection with database
func InitDatabaseClient() error { func InitDatabaseClient() error {
db, err := sql.Open("postgres", configs.MOONSTREAM_DB_URI_READ_ONLY) db, err := sql.Open("postgres", MOONSTREAM_DB_URI_READ_ONLY)
if err != nil { if err != nil {
return fmt.Errorf("DSN parse error or another database initialization error: %v", err) return fmt.Errorf("DSN parse error or another database initialization error: %v", err)
} }
// Set the maximum number of concurrently idle connections, // Set the maximum number of concurrently idle connections,
// by default sql.DB allows a maximum of 2 idle connections. // by default sql.DB allows a maximum of 2 idle connections.
db.SetMaxIdleConns(configs.MOONSTREAM_DB_MAX_IDLE_CONNS) db.SetMaxIdleConns(MOONSTREAM_DB_MAX_IDLE_CONNS)
// Set the maximum lifetime of a connection. // Set the maximum lifetime of a connection.
// Longer lifetime increase memory usage. // Longer lifetime increase memory usage.
db.SetConnMaxLifetime(configs.MOONSTREAM_DB_CONN_MAX_LIFETIME) db.SetConnMaxLifetime(MOONSTREAM_DB_CONN_MAX_LIFETIME)
databaseClient = DatabaseClient{ databaseClient = DatabaseClient{
Client: db, Client: db,

Wyświetl plik

@ -1,5 +1,5 @@
package main package main
func main() { func main() {
CLI() cli()
} }

Wyświetl plik

@ -16,8 +16,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
humbug "github.com/bugout-dev/humbug/go/pkg" humbug "github.com/bugout-dev/humbug/go/pkg"
) )
@ -89,7 +87,7 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
tsNow := time.Now().Unix() tsNow := time.Now().Unix()
ac.mux.Lock() ac.mux.Lock()
for aId, aData := range ac.accessIds { for aId, aData := range ac.accessIds {
if tsNow-aData.LastAccessTs > configs.NB_CACHE_ACCESS_ID_LIFETIME { if tsNow-aData.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME {
delete(ac.accessIds, aId) delete(ac.accessIds, aId)
removedAccessIds++ removedAccessIds++
} else { } else {
@ -101,7 +99,7 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
} }
func initCacheCleaning(debug bool) { func initCacheCleaning(debug bool) {
t := time.NewTicker(configs.NB_CACHE_CLEANING_INTERVAL) t := time.NewTicker(NB_CACHE_CLEANING_INTERVAL)
for { for {
select { select {
case <-t.C: case <-t.C:
@ -118,7 +116,7 @@ func initCacheCleaning(debug bool) {
func extractAccessID(r *http.Request) string { func extractAccessID(r *http.Request) string {
var accessID string var accessID string
accessIDHeaders := r.Header[strings.Title(configs.NB_ACCESS_ID_HEADER)] accessIDHeaders := r.Header[strings.Title(NB_ACCESS_ID_HEADER)]
for _, h := range accessIDHeaders { for _, h := range accessIDHeaders {
accessID = h accessID = h
} }
@ -137,7 +135,7 @@ func extractAccessID(r *http.Request) string {
func extractDataSource(r *http.Request) string { func extractDataSource(r *http.Request) string {
dataSource := "database" dataSource := "database"
dataSources := r.Header[strings.Title(configs.NB_DATA_SOURCE_HEADER)] dataSources := r.Header[strings.Title(NB_DATA_SOURCE_HEADER)]
for _, h := range dataSources { for _, h := range dataSources {
dataSource = h dataSource = h
} }
@ -203,7 +201,7 @@ func logMiddleware(next http.Handler) http.Handler {
var jsonrpcRequest JSONRPCRequest var jsonrpcRequest JSONRPCRequest
err = json.Unmarshal(body, &jsonrpcRequest) err = json.Unmarshal(body, &jsonrpcRequest)
if err != nil { if err != nil {
log.Printf("Unable to parse body %v", err) log.Printf("Unable to parse body, err: %v", err)
} }
logStr += fmt.Sprintf(" %s", jsonrpcRequest.Method) logStr += fmt.Sprintf(" %s", jsonrpcRequest.Method)
} }
@ -236,7 +234,7 @@ func accessMiddleware(next http.Handler) http.Handler {
} }
// If access id does not belong to internal crawlers, then check cache or find it in Bugout resources // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources
if accessID == configs.NB_CONTROLLER_ACCESS_ID { if accessID == NB_CONTROLLER_ACCESS_ID {
if stateCLI.enableDebugFlag { if stateCLI.enableDebugFlag {
log.Printf("Access id belongs to internal crawlers") log.Printf("Access id belongs to internal crawlers")
} }
@ -254,8 +252,8 @@ func accessMiddleware(next http.Handler) http.Handler {
log.Printf("New access id, looking at Brood resources") log.Printf("New access id, looking at Brood resources")
} }
resources, err := bugoutClient.Brood.GetResources( resources, err := bugoutClient.Brood.GetResources(
configs.NB_CONTROLLER_TOKEN, NB_CONTROLLER_TOKEN,
configs.NB_APPLICATION_ID, NB_APPLICATION_ID,
map[string]string{"access_id": accessID}, map[string]string{"access_id": accessID},
) )
if err != nil { if err != nil {

Wyświetl plik

@ -12,8 +12,6 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
) )
type PingResponse struct { type PingResponse struct {
@ -42,8 +40,8 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
} }
attempts := GetAttemptsFromContext(r) attempts := GetAttemptsFromContext(r)
if attempts > configs.NB_CONNECTION_RETRIES { if attempts > NB_CONNECTION_RETRIES {
log.Printf("Max attempts reached from %s %s, terminating\n", r.RemoteAddr, r.URL.Path) log.Printf("Max attempts reached from %s %s, terminating", r.RemoteAddr, r.URL.Path)
http.Error(w, "Service not available", http.StatusServiceUnavailable) http.Error(w, "Service not available", http.StatusServiceUnavailable)
return return
} }
@ -147,7 +145,7 @@ func lbDatabaseHandler(w http.ResponseWriter, r *http.Request, blockchain string
block, err := databaseClient.GetBlock(blockchain, blockNumber) block, err := databaseClient.GetBlock(blockchain, blockNumber)
if err != nil { if err != nil {
fmt.Printf("Unable to get block from database %v", err) log.Printf("Unable to get block from database, err: %v", err)
http.Error(w, fmt.Sprintf("no such block %v", blockNumber), http.StatusBadRequest) http.Error(w, fmt.Sprintf("no such block %v", blockNumber), http.StatusBadRequest)
return return
} }

Wyświetl plik

@ -14,8 +14,6 @@ import (
"os" "os"
"time" "time"
configs "github.com/bugout-dev/moonstream/nodes/node_balancer/configs"
humbug "github.com/bugout-dev/humbug/go/pkg" humbug "github.com/bugout-dev/humbug/go/pkg"
"github.com/google/uuid" "github.com/google/uuid"
) )
@ -29,7 +27,7 @@ var (
// initHealthCheck runs a routine for check status of the nodes every 5 seconds // initHealthCheck runs a routine for check status of the nodes every 5 seconds
func initHealthCheck(debug bool) { func initHealthCheck(debug bool) {
t := time.NewTicker(configs.NB_HEALTH_CHECK_INTERVAL) t := time.NewTicker(NB_HEALTH_CHECK_INTERVAL)
for { for {
select { select {
case <-t.C: case <-t.C:
@ -37,7 +35,7 @@ func initHealthCheck(debug bool) {
ethereumClients := ethereumClientPool.CleanInactiveClientNodes() ethereumClients := ethereumClientPool.CleanInactiveClientNodes()
polygonClients := polygonClientPool.CleanInactiveClientNodes() polygonClients := polygonClientPool.CleanInactiveClientNodes()
xdaiClients := xdaiClientPool.CleanInactiveClientNodes() xdaiClients := xdaiClientPool.CleanInactiveClientNodes()
log.Printf("Active etehereum clients: %d, polygon clients: %d, xdai clients: %d\n", ethereumClients, polygonClients, xdaiClients) log.Printf("Active ethereum clients: %d, polygon clients: %d, xdai clients: %d", ethereumClients, polygonClients, xdaiClients)
if debug { if debug {
blockchainPool.StatusLog() blockchainPool.StatusLog()
} }
@ -71,13 +69,13 @@ func GetRetryFromContext(r *http.Request) int {
func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) { func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) { proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
retries := GetRetryFromContext(r) retries := GetRetryFromContext(r)
if retries < configs.NB_CONNECTION_RETRIES { if retries < NB_CONNECTION_RETRIES {
log.Printf( log.Printf(
"An error occurred while proxying to %s, number of retries: %d/%d. Error: %s\n", "An error occurred while proxying to %s, number of retries: %d/%d, err: %v",
url, retries+1, configs.NB_CONNECTION_RETRIES, e.Error(), url, retries+1, NB_CONNECTION_RETRIES, e.Error(),
) )
select { select {
case <-time.After(configs.NB_CONNECTION_RETRIES_INTERVAL): case <-time.After(NB_CONNECTION_RETRIES_INTERVAL):
ctx := context.WithValue(r.Context(), Retry, retries+1) ctx := context.WithValue(r.Context(), Retry, retries+1)
proxy.ServeHTTP(w, r.WithContext(ctx)) proxy.ServeHTTP(w, r.WithContext(ctx))
} }
@ -94,7 +92,7 @@ func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) {
// If the same request routing for few attempts with different nodes, increase the count // If the same request routing for few attempts with different nodes, increase the count
// of attempts and send request to next peer // of attempts and send request to next peer
attempts := GetAttemptsFromContext(r) attempts := GetAttemptsFromContext(r)
log.Printf("Attempting number: %d to fetch node %s\n", attempts, url) log.Printf("Attempting number: %d to fetch node %s", attempts, url)
ctx := context.WithValue(r.Context(), Attempts, attempts+1) ctx := context.WithValue(r.Context(), Attempts, attempts+1)
lbHandler(w, r.WithContext(ctx)) lbHandler(w, r.WithContext(ctx))
} }
@ -111,36 +109,36 @@ func Server() {
var err error var err error
sessionID := uuid.New().String() sessionID := uuid.New().String()
consent := humbug.CreateHumbugConsent(humbug.True) consent := humbug.CreateHumbugConsent(humbug.True)
reporter, err = humbug.CreateHumbugReporter(consent, "moonstream-node-balancer", sessionID, configs.HUMBUG_REPORTER_NB_TOKEN) reporter, err = humbug.CreateHumbugReporter(consent, "moonstream-node-balancer", sessionID, HUMBUG_REPORTER_NB_TOKEN)
if err != nil { if err != nil {
fmt.Printf("Invalid Humbug Crash configuration: %v", err) fmt.Printf("Invalid Humbug Crash configuration, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
// Record system information // Record system information
reporter.Publish(humbug.SystemReport()) reporter.Publish(humbug.SystemReport())
resources, err := bugoutClient.Brood.GetResources( resources, err := bugoutClient.Brood.GetResources(
configs.NB_CONTROLLER_TOKEN, NB_CONTROLLER_TOKEN,
configs.NB_APPLICATION_ID, NB_APPLICATION_ID,
map[string]string{"access_id": configs.NB_CONTROLLER_ACCESS_ID}, map[string]string{"access_id": NB_CONTROLLER_ACCESS_ID},
) )
if err != nil { if err != nil {
fmt.Printf("Unable to get user with provided access identifier %v", err) fmt.Printf("Unable to get user with provided access identifier, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
if len(resources.Resources) != 1 { if len(resources.Resources) != 1 {
fmt.Printf("User with provided access identifier has wrong number of resources %v", err) fmt.Printf("User with provided access identifier has wrong number of resources, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
resource_data, err := json.Marshal(resources.Resources[0].ResourceData) resource_data, err := json.Marshal(resources.Resources[0].ResourceData)
if err != nil { if err != nil {
fmt.Printf("Unable to encode resource data interface to json %v", err) fmt.Printf("Unable to encode resource data interface to json, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
var clientAccess ClientResourceData var clientAccess ClientResourceData
err = json.Unmarshal(resource_data, &clientAccess) err = json.Unmarshal(resource_data, &clientAccess)
if err != nil { if err != nil {
fmt.Printf("Unable to decode resource data json to structure %v", err) fmt.Printf("Unable to decode resource data json to structure, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
internalCrawlersAccess = ClientResourceData{ internalCrawlersAccess = ClientResourceData{
@ -158,43 +156,38 @@ func Server() {
err = InitDatabaseClient() err = InitDatabaseClient()
if err != nil { if err != nil {
log.Printf("Unable to initialize database connection %v\n", err) log.Printf("Unable to initialize database connection, err: %v", err)
} else { } else {
log.Printf("Connection with database established\n") log.Printf("Connection with database established")
} }
// Fill NodeConfigList with initial nodes from environment variables // Fill NodeConfigList with initial nodes from environment variables
nodeConfigs.InitNodeConfigList(stateCLI.configPathFlag) nodeConfig, err := LoadConfig(stateCLI.configPathFlag)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
// Parse nodes and set list of proxies // Parse nodes and set list of proxies
for i, nodeConfig := range nodeConfigs.NodeConfigs { for i, nodeConfig := range *nodeConfig {
gethUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", nodeConfig.Addr, nodeConfig.Port))
endpoint, err := url.Parse(nodeConfig.Endpoint)
if err != nil { if err != nil {
fmt.Printf("Unable to parse gethUrl with addr: %s and port: %d\n", nodeConfig.Addr, nodeConfig.Port) fmt.Println(err)
continue os.Exit(1)
}
statusUrl, err := url.Parse(fmt.Sprintf("http://%s:%s", nodeConfig.Addr, configs.MOONSTREAM_NODES_SERVER_PORT))
if err != nil {
fmt.Printf("Unable to parse statusUrl with addr: %s and port: %s\n", nodeConfig.Addr, configs.MOONSTREAM_NODES_SERVER_PORT)
continue
} }
proxyToStatus := httputil.NewSingleHostReverseProxy(statusUrl) proxyToEndpoint := httputil.NewSingleHostReverseProxy(endpoint)
proxyToGeth := httputil.NewSingleHostReverseProxy(gethUrl) proxyErrorHandler(proxyToEndpoint, endpoint)
proxyErrorHandler(proxyToStatus, statusUrl)
proxyErrorHandler(proxyToGeth, gethUrl)
blockchainPool.AddNode(&Node{ blockchainPool.AddNode(&Node{
StatusURL: statusUrl, Endpoint: endpoint,
GethURL: gethUrl, Alive: true,
Alive: true, GethReverseProxy: proxyToEndpoint,
StatusReverseProxy: proxyToStatus,
GethReverseProxy: proxyToGeth,
}, nodeConfig.Blockchain) }, nodeConfig.Blockchain)
log.Printf( log.Printf(
"Added new %s proxy blockchain under index %d from config file with geth url: %s and status url: %s\n", "Added new %s proxy blockchain under index %d from config file with geth url: %s://%s",
nodeConfig.Blockchain, i, gethUrl, statusUrl) nodeConfig.Blockchain, i, endpoint.Scheme, endpoint.Host)
} }
serveMux := http.NewServeMux() serveMux := http.NewServeMux()
@ -225,10 +218,10 @@ func Server() {
// Start access id cache cleaning // Start access id cache cleaning
go initCacheCleaning(stateCLI.enableDebugFlag) go initCacheCleaning(stateCLI.enableDebugFlag)
log.Printf("Starting node load balancer HTTP server at %s:%s\n", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag) log.Printf("Starting node load balancer HTTP server at %s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag)
err = server.ListenAndServe() err = server.ListenAndServe()
if err != nil { if err != nil {
fmt.Printf("Failed to start server listener %v", err) fmt.Printf("Failed to start server listener, err: %v\n", err)
os.Exit(1) os.Exit(1)
} }
} }

Wyświetl plik

@ -0,0 +1,3 @@
package main
var NB_VERSION = "0.1.1"

Wyświetl plik

@ -1,3 +0,0 @@
package configs
var NB_VERSION = "0.1.0"

Wyświetl plik

@ -4,7 +4,6 @@ export NB_APPLICATION_ID="<application_id_to_controll_access>"
export NB_CONTROLLER_TOKEN="<token_of_controller_user>" export NB_CONTROLLER_TOKEN="<token_of_controller_user>"
export NB_CONTROLLER_ACCESS_ID="<controller_access_id_for_internal_crawlers>" export NB_CONTROLLER_ACCESS_ID="<controller_access_id_for_internal_crawlers>"
export MOONSTREAM_DB_URI="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>" export MOONSTREAM_DB_URI="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>"
export MOONSTREAM_NODES_SERVER_PORT="<node_status_server_port>"
# Error humbug reporter # Error humbug reporter
export HUMBUG_REPORTER_NODE_BALANCER_TOKEN="<bugout_humbug_token_for_crash_reports>" export HUMBUG_REPORTER_NODE_BALANCER_TOKEN="<bugout_humbug_token_for_crash_reports>"