diff --git a/nodes/node_balancer/cmd/nodebalancer/balancer.go b/nodes/node_balancer/cmd/nodebalancer/balancer.go index a79f838c..267af421 100644 --- a/nodes/node_balancer/cmd/nodebalancer/balancer.go +++ b/nodes/node_balancer/cmd/nodebalancer/balancer.go @@ -1,5 +1,5 @@ /* -Load balancer, based on https://github.com/kasvith/simplelb/ +Load balancer logic. */ package main @@ -19,10 +19,9 @@ import ( // Main variable of pool of blockchains which contains pool of nodes // for each blockchain we work during session. -var blockchainPool BlockchainPool +var blockchainPools map[string]*NodePool // Node structure with -// StatusURL for status server at node endpoint // Endpoint for geth/bor/etc node http.server endpoint type Node struct { Endpoint *url.URL @@ -36,16 +35,16 @@ type Node struct { GethReverseProxy *httputil.ReverseProxy } -type NodePool struct { - Blockchain string - Nodes []*Node - - // Counter to observe all nodes - Current uint64 +type TopNodeBlock struct { + Block uint64 + Node *Node } -type BlockchainPool struct { - Blockchains []*NodePool +type NodePool struct { + NodesMap map[string][]*Node + NodesSet []*Node + + TopNode TopNodeBlock } // Node status response struct for HealthCheck @@ -58,24 +57,25 @@ type NodeStatusResponse struct { } // AddNode to the nodes pool -func (bpool *BlockchainPool) AddNode(node *Node, blockchain string) { - var nodePool *NodePool - for _, b := range bpool.Blockchains { - if b.Blockchain == blockchain { - nodePool = b - } +func AddNode(blockchain string, tags []string, node *Node) { + if blockchainPools == nil { + blockchainPools = make(map[string]*NodePool) + } + if blockchainPools[blockchain] == nil { + blockchainPools[blockchain] = &NodePool{} + } + if blockchainPools[blockchain].NodesMap == nil { + blockchainPools[blockchain].NodesMap = make(map[string][]*Node) + } + blockchainPools[blockchain].NodesSet = append(blockchainPools[blockchain].NodesSet, node) + + for _, tag := range tags { + blockchainPools[blockchain].NodesMap[tag] = append( + blockchainPools[blockchain].NodesMap[tag], + node, + ) } - // Check if blockchain not yet in pool - if nodePool == nil { - nodePool = &NodePool{ - Blockchain: blockchain, - } - nodePool.Nodes = append(nodePool.Nodes, node) - bpool.Blockchains = append(bpool.Blockchains, nodePool) - } else { - nodePool.Nodes = append(nodePool.Nodes, node) - } } // SetAlive with mutex for exact node @@ -117,59 +117,86 @@ func (node *Node) IncreaseCallCounter() { node.mux.Unlock() } -// GetNextNode returns next active peer to take a connection -// Loop through entire nodes to find out an alive one -func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node { - highestBlock := uint64(0) +func containsGeneric[T comparable](b []T, e T) bool { + for _, v := range b { + if v == e { + return true + } + } + return false +} - // Get NodePool with correct blockchain - var np *NodePool - for _, b := range bpool.Blockchains { - if b.Blockchain == blockchain { - np = b - for _, n := range b.Nodes { - if n.CurrentBlock > highestBlock { - highestBlock = n.CurrentBlock - } +func (npool *NodePool) FilterTagsNodes(tags []string) ([]*Node, TopNodeBlock) { + nodesMap := npool.NodesMap + nodesSet := npool.NodesSet + + tagSet := make(map[string]map[*Node]bool) + + for tag, nodes := range nodesMap { + if tagSet[tag] == nil { + tagSet[tag] = make(map[*Node]bool) + } + for _, node := range nodes { + tagSet[tag][node] = true + } + } + + topNode := TopNodeBlock{} + + var filteredNodes []*Node + for _, node := range nodesSet { + accept := true + for _, tag := range tags { + if tagSet[tag][node] != true { + accept = false + break + } + } + if accept { + filteredNodes = append(filteredNodes, node) + currentBlock := node.CurrentBlock + if currentBlock >= npool.TopNode.Block { + topNode.Block = currentBlock + topNode.Node = node } } } - // Increase Current value with 1 - currentInc := atomic.AddUint64(&np.Current, uint64(1)) + return filteredNodes, topNode +} - // next is an Atomic incrementer, value always in range from 0 to slice length, - // it returns an index of slice - next := int(currentInc % uint64(len(np.Nodes))) +// GetNextNode returns next active peer to take a connection +// Loop through entire nodes to find out an alive one and chose one with small CallCounter +func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node { + nextNode := topNode.Node - // Start from next one and move full cycle - l := len(np.Nodes) + next - - for i := next; i < l; i++ { - // Take an index by modding with length - idx := i % len(np.Nodes) - // If we have an alive one, use it and store if its not the original one - if np.Nodes[idx].IsAlive() { - if i != next { - // Mark the current one - atomic.StoreUint64(&np.Current, uint64(idx)) - } - // Pass nodes with low blocks - // TODO(kompotkot): Re-write to not rotate through not highest blocks - if np.Nodes[idx].CurrentBlock < highestBlock { + for _, node := range nodes { + if node.IsAlive() { + currentBlock := node.CurrentBlock + if currentBlock < topNode.Block-NB_HIGHEST_BLOCK_SHIFT { + // Bypass outdated nodes continue } - - return np.Nodes[idx] + if node.CallCounter < nextNode.CallCounter { + nextNode = node + } } } - return nil + + if nextNode == nil { + return nil + } + + // Increase CallCounter value with 1 + atomic.AddUint64(&nextNode.CallCounter, uint64(1)) + + return nextNode } // SetNodeStatus modify status of the node -func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) { - for _, b := range bpool.Blockchains { - for _, n := range b.Nodes { +func SetNodeStatus(url *url.URL, alive bool) { + for _, nodes := range blockchainPools { + for _, n := range nodes.NodesSet { if n.Endpoint.String() == url.String() { n.SetAlive(alive) break @@ -180,21 +207,21 @@ func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) { // StatusLog logs node status // TODO(kompotkot): Print list of alive and dead nodes -func (bpool *BlockchainPool) StatusLog() { - for _, b := range bpool.Blockchains { - for _, n := range b.Nodes { +func StatusLog() { + for blockchain, nodes := range blockchainPools { + for _, n := range nodes.NodesSet { log.Printf( - "Blockchain %s node %s is alive %t. Blockchain called %d times", - b.Blockchain, n.Endpoint.Host, n.Alive, b.Current, + "Blockchain %s node %s is alive %t", + blockchain, n.Endpoint.Host, n.Alive, ) } } } // HealthCheck fetch the node latest block -func (bpool *BlockchainPool) HealthCheck() { - for _, b := range bpool.Blockchains { - for _, n := range b.Nodes { +func HealthCheck() { + for blockchain, nodes := range blockchainPools { + for _, n := range nodes.NodesSet { alive := false httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT} @@ -239,8 +266,13 @@ func (bpool *BlockchainPool) HealthCheck() { } callCounter := n.UpdateNodeState(blockNumber, alive) + if blockNumber > nodes.TopNode.Block { + nodes.TopNode.Block = blockNumber + nodes.TopNode.Node = n + } + log.Printf( - "Node %s is alive: %t with current block: %d called: %d times", n.Endpoint.Host, alive, blockNumber, callCounter, + "In blockchain %s node %s is alive: %t with current block: %d called: %d times", blockchain, n.Endpoint.Host, alive, blockNumber, callCounter, ) } } diff --git a/nodes/node_balancer/cmd/nodebalancer/configs.go b/nodes/node_balancer/cmd/nodebalancer/configs.go index 18d31220..feff2c23 100644 --- a/nodes/node_balancer/cmd/nodebalancer/configs.go +++ b/nodes/node_balancer/cmd/nodebalancer/configs.go @@ -34,7 +34,8 @@ var ( NB_MAX_COUNTER_NUMBER = uint64(10000000) // Client configuration - NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds + NB_CLIENT_NODE_KEEP_ALIVE = int64(1) // How long to store node in hot list for client in seconds + NB_HIGHEST_BLOCK_SHIFT = uint64(50) // Allowed shift to prefer node with most highest block NB_ACCESS_ID_HEADER = os.Getenv("NB_ACCESS_ID_HEADER") NB_DATA_SOURCE_HEADER = os.Getenv("NB_DATA_SOURCE_HEADER") diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index 81811a5e..42f4f26f 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -98,13 +98,13 @@ func (ac *AccessCache) Cleanup() (int64, int64) { return removedAccessIds, totalAccessIds } -func initCacheCleaning(debug bool) { +func initCacheCleaning() { t := time.NewTicker(NB_CACHE_CLEANING_INTERVAL) for { select { case <-t.C: removedAccessIds, totalAccessIds := accessIdCache.Cleanup() - if debug { + if stateCLI.enableDebugFlag { log.Printf("Removed %d elements from access id cache", removedAccessIds) } log.Printf("Elements in access id cache: %d", totalAccessIds) diff --git a/nodes/node_balancer/cmd/nodebalancer/routes.go b/nodes/node_balancer/cmd/nodebalancer/routes.go index b9929369..586d9f00 100644 --- a/nodes/node_balancer/cmd/nodebalancer/routes.go +++ b/nodes/node_balancer/cmd/nodebalancer/routes.go @@ -53,25 +53,12 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { return } - // Chose one node - var node *Node - cpool := GetClientPool(blockchain) - node = cpool.GetClientNode(currentClientAccess.AccessID) - if node == nil { - node = blockchainPool.GetNextNode(blockchain) - if node == nil { - http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) - return - } - cpool.AddClientNode(currentClientAccess.AccessID, node) - } - // Save origin path, to use in proxyErrorHandler if node will not response r.Header.Add("X-Origin-Path", r.URL.Path) switch { case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)): - lbJSONRPCHandler(w, r, blockchain, node, currentClientAccess) + lbJSONRPCHandler(w, r, blockchain, currentClientAccess) return default: http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest) @@ -79,7 +66,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { } } -func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentClientAccess ClientResourceData) { +func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, currentClientAccess ClientResourceData) { body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "Unable to read body", http.StatusBadRequest) @@ -94,6 +81,39 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, return } + // Get tags from request params, sort and generate from it identifier + var tags []string + queries := r.URL.Query() + for k, v := range queries { + if k == "tag" { + for _, tag := range v { + tags = append(tags, tag) + } + } + } + + // Chose one node + var node *Node + cpool := GetClientPool(blockchain) + node = cpool.GetClientNode(currentClientAccess.AccessID) + if node == nil { + npool := blockchainPools[blockchain] + var nodes []*Node + var topNode TopNodeBlock + if len(tags) != 0 { + nodes, topNode = npool.FilterTagsNodes(tags) + } else { + topNode = npool.TopNode + nodes = npool.NodesSet + } + node = GetNextNode(nodes, topNode) + if node == nil { + http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) + return + } + cpool.AddClientNode(currentClientAccess.AccessID, node) + } + switch { case currentClientAccess.dataSource == "blockchain": if currentClientAccess.BlockchainAccess == false { diff --git a/nodes/node_balancer/cmd/nodebalancer/server.go b/nodes/node_balancer/cmd/nodebalancer/server.go index 185381a0..d3d693e0 100644 --- a/nodes/node_balancer/cmd/nodebalancer/server.go +++ b/nodes/node_balancer/cmd/nodebalancer/server.go @@ -5,7 +5,6 @@ package main import ( "context" - // "encoding/json" "fmt" "log" "net/http" @@ -29,12 +28,12 @@ var ( ) // initHealthCheck runs a routine for check status of the nodes every 5 seconds -func initHealthCheck(debug bool) { +func initHealthCheck() { t := time.NewTicker(NB_HEALTH_CHECK_INTERVAL) for { select { case <-t.C: - blockchainPool.HealthCheck() + HealthCheck() logStr := "Client pool healthcheck." for b := range configBlockchains { cp := clientPool[b] @@ -42,8 +41,8 @@ func initHealthCheck(debug bool) { logStr += fmt.Sprintf(" Active %s clients: %d.", b, clients) } log.Println(logStr) - if debug { - blockchainPool.StatusLog() + if stateCLI.enableDebugFlag { + StatusLog() } } } @@ -89,7 +88,7 @@ func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) { } // After 3 retries, mark this backend as down - blockchainPool.SetNodeStatus(url, false) + SetNodeStatus(url, false) // Set modified path back // TODO(kompotkot): Try r.RequestURI instead of header @@ -189,11 +188,13 @@ func Server() { } proxyErrorHandler(proxyToEndpoint, endpoint) - blockchainPool.AddNode(&Node{ + newNode := &Node{ Endpoint: endpoint, Alive: true, GethReverseProxy: proxyToEndpoint, - }, nodeConfig.Blockchain) + } + AddNode(nodeConfig.Blockchain, nodeConfig.Tags, newNode) + log.Printf( "Added new %s proxy blockchain under index %d from config file with geth url: %s://%s", nodeConfig.Blockchain, i, endpoint.Scheme, endpoint.Host) @@ -202,6 +203,12 @@ func Server() { // Generate map of clients CreateClientPools() + // Start node health checking and current block fetching + HealthCheck() + if stateCLI.enableHealthCheckFlag { + go initHealthCheck() + } + serveMux := http.NewServeMux() serveMux.Handle("/nb/", accessMiddleware(http.HandlerFunc(lbHandler))) log.Println("Authentication middleware enabled") @@ -218,14 +225,8 @@ func Server() { WriteTimeout: 40 * time.Second, } - // Start node health checking and current block fetching - blockchainPool.HealthCheck() - if stateCLI.enableHealthCheckFlag { - go initHealthCheck(stateCLI.enableDebugFlag) - } - // Start access id cache cleaning - go initCacheCleaning(stateCLI.enableDebugFlag) + go initCacheCleaning() log.Printf("Starting node load balancer HTTP server at %s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag) err = server.ListenAndServe() diff --git a/nodes/node_balancer/go.mod b/nodes/node_balancer/go.mod index a2e51e1f..73b2432c 100644 --- a/nodes/node_balancer/go.mod +++ b/nodes/node_balancer/go.mod @@ -1,6 +1,6 @@ module github.com/bugout-dev/moonstream/nodes/node_balancer -go 1.17 +go 1.18 require ( github.com/bugout-dev/bugout-go v0.3.4