Initial work at making the server proper.

This commit is contained in:
Sacha Ligthert 2025-02-08 15:18:22 +01:00
parent b9c1b9e320
commit 9b6a90fbce
11 changed files with 258 additions and 56 deletions

View File

@ -78,3 +78,24 @@ Both the manager and the agent will have similar setups:
* All reads go to a `readProcessor()` which in turn parses the message, and in turn does the appropriate logic (read, calling go routine/functions)
This does however presents me with a problem. Since all communication is reactive, state-less, and all over the same connection; so tracking things is a bit hard. And this is something for maybe the next phase. So for now, reactive content only. This said, I need figure out how on earth I can prevent an agent from taking two jobs at the same time. I guess I will need to build in some agent state tracking for this.
#### Server setup
_(This piece assumes that the network is super fast, and turbo stable.)_
The current setup of the server is only the thing that its connected to. There is a writer that spams something, a reader that prints everything it receives to the console. And that is it. Pretty much split-brain and it currently lacks the ability to manage workloads on agents.
For this exercise it needs to be able to handle multiple agents, but start at one. There needs to be an interface that should tell me how much CPU/MEMs an agent has. And finally tell the agent to solve something and accept any answers it returns.
Another issue is that it needs a manager interfaces to keep track of Agents, agents metrics, status. But also allow users to issue commands. NFC how I am going to do that. But something tells me it will require me to find a way to communicate all this with the user, and I can do this with a simple REST API, or create a terminal interface, or do web things.
1. Incoming connection gets caught by `handleConnections()`
2. `handleConnections()` verifies an agent registers
3. After registeration it puts this into a map, along with the `conn` and stuffs this into a slice of maps
4. The manager pics up the new agent, and offers info to anyone who wants to see it.
5. A client can ask the manager to solve a puzzle, manager calculates which agents to use, sends of the work.
6. Solution is returned and stored.
So from there.
I think the biggest issue now is find a way to have a terminal or web interface with the user.

View File

@ -26,7 +26,7 @@ func main() {
// Switch on the role
switch vars.Role {
case "scheduler":
server := server.Server{ListenAddress: vars.Address, ListenPort: vars.Port}
server := server.Server{ListenAddress: vars.Address, ListenPort: vars.Port, Agents: make(map[string]*server.Agent)}
vars.Operator = &server
case "agent":
client := client.Client{ServerAddress: vars.Address, ServerPort: vars.Port}

View File

@ -4,63 +4,17 @@ import (
"log"
"net/http"
"strconv"
"time"
"github.com/gorilla/websocket"
)
// Start the HTTP and WebSocket server
func (server *Server) Start() {
// Start the server
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
handleConnections(w, r)
})
// Declare ourselves up and running to the console.
log.Println("Started sudoku-funpark server")
// Start the http server
http.HandleFunc("/json", server.jsonDumper)
// Start the websocket server
http.HandleFunc("/ws", server.handleConnections)
log.Fatal(http.ListenAndServe(server.ListenAddress+":"+strconv.Itoa(server.ListenPort), nil))
// return nil
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
func handleConnections(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
defer conn.Close()
log.Println("Starting server")
go readMessages(conn)
writeMessages(conn)
}
func readMessages(conn *websocket.Conn) {
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
break
}
log.Printf("Received(%d): %s\n", messageType, message)
// This sets the time-out for any incoming messages.
// conn.SetReadDeadline(time.Now().Add(10 * time.Second))
}
}
func writeMessages(conn *websocket.Conn) {
for {
err := conn.WriteMessage(websocket.TextMessage, []byte("keep alive - staying alive"))
if err != nil {
log.Println("Error writing message:", err)
break
}
// This sets the time-out for any outgoing messages.
// conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
time.Sleep(time.Minute)
}
}

View File

@ -0,0 +1,6 @@
package server
func (server *Server) closeConnection(agentName string) {
server.Agents[agentName].conn.Close()
delete(server.Agents, agentName)
}

View File

@ -0,0 +1,66 @@
package server
import (
"log"
"net/http"
"strconv"
"strings"
"time"
)
// Handle incoming websocket connections
func (server *Server) handleConnections(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// defer conn.Close()
log.Println("Incoming connection")
// Check for register
var registered bool
var msgs []string
for !registered {
_, message, err := conn.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
break
}
msgs = server.parseMessage(message)
if string(msgs[0]) == "register" {
registered = true
}
}
log.Println("Connected to agent")
// Create var of type Agent
cores_int, err := strconv.Atoi(msgs[2])
if err != nil {
log.Fatal("ERROR: failed strconv.Atoi(cpu cores):", err)
}
mem_max_string := strings.ReplaceAll(msgs[3], "GB", "")
mem_max_float, err := strconv.ParseFloat(mem_max_string, 64)
if err != nil {
log.Fatal("ERROR: failed strconv.ParseFloat():", err)
}
agent := Agent{
Name: msgs[1],
Reg: time.Now(),
Cores: cores_int,
Mem_max: mem_max_float,
TaskId: 0,
conn: conn,
}
// Dump it into server.Agents
server.Agents[msgs[1]] = &agent
defer server.closeConnection(msgs[1])
go server.readMessages(agent)
server.writeMessages(conn)
}

19
server/jsonDumper.go Normal file
View File

@ -0,0 +1,19 @@
package server
import (
"encoding/json"
"fmt"
"net/http"
)
func (server *Server) jsonDumper(w http.ResponseWriter, r *http.Request) {
json, err := json.Marshal(server)
if err != nil {
fmt.Println("JSON Marshaling error:", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("This is my Website"))
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(json)
}

7
server/parseMessage.go Normal file
View File

@ -0,0 +1,7 @@
package server
import "strings"
func (server *Server) parseMessage(message []byte) []string {
return strings.Split(string(message), ";")
}

29
server/pruneSlice.go Normal file
View File

@ -0,0 +1,29 @@
package server
import "slices"
func (server *Server) pruneIntSlice(mySlice []int) []int {
if len(mySlice) > 10 {
for key := range mySlice {
if key == 10 {
mySlice = slices.Delete(mySlice, 9, 10)
} else {
mySlice[key] = mySlice[key+1]
}
}
}
return mySlice
}
func (server *Server) pruneFloat64Slice(mySlice []float64) []float64 {
if len(mySlice) > 10 {
for key := range mySlice {
if key == 10 {
mySlice = slices.Delete(mySlice, 9, 10)
} else {
mySlice[key] = mySlice[key+1]
}
}
}
return mySlice
}

52
server/readMessages.go Normal file
View File

@ -0,0 +1,52 @@
package server
import (
"fmt"
"log"
"strconv"
"strings"
)
// Rewrite to become a general input handler
func (server *Server) readMessages(agent Agent) {
for {
_, message, err := agent.conn.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
break
}
// log.Printf("Received(%d): %s\n", messageType, message)
msgs := server.parseMessage(message)
if msgs[0] == "update" {
cpu_usg, err := strconv.Atoi(msgs[1])
if err != nil {
fmt.Println("ERROR: converting string to int", err)
}
server.Agents[agent.Name].Cpu = append(server.Agents[agent.Name].Cpu, cpu_usg)
server.Agents[agent.Name].Cpu = server.pruneIntSlice(server.Agents[agent.Name].Cpu)
mem_usg_string := strings.ReplaceAll(msgs[2], "GB", "")
mem_usg_float, err := strconv.ParseFloat(mem_usg_string, 64)
if err != nil {
log.Fatal("ERROR: failed strconv.ParseFloat():", err)
}
server.Agents[agent.Name].Mem_usg = append(server.Agents[agent.Name].Mem_usg, mem_usg_float)
server.Agents[agent.Name].Mem_usg = server.pruneFloat64Slice(server.Agents[agent.Name].Mem_usg)
taskId, err := strconv.Atoi(msgs[3])
if err != nil {
log.Println("ERROR: cannot convert taskId:", err)
}
server.Agents[agent.Name].TaskId = taskId
}
if msgs[0] == "deregister" {
server.closeConnection(agent.Name)
}
// This sets the time-out for any incoming messages.
// conn.SetReadDeadline(time.Now().Add(10 * time.Second))
}
}

View File

@ -1,6 +1,35 @@
package server
import (
"time"
"github.com/gorilla/websocket"
)
type Agent struct {
Name string
Reg time.Time
Cores int
Cpu []int
Mem_max float64
Mem_usg []float64
TaskId int
conn *websocket.Conn
}
type Task struct {
Puzzle [9]string
Solutions [][]string
}
type Server struct {
ListenAddress string
ListenPort int
Agents map[string]*Agent
Tasks []Task
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

19
server/writeMessages.go Normal file
View File

@ -0,0 +1,19 @@
package server
import (
"log"
"github.com/gorilla/websocket"
)
func (server *Server) writeMessages(conn *websocket.Conn) {
for {
err := conn.WriteMessage(websocket.TextMessage, []byte("keep alive - staying alive"))
if err != nil {
log.Println("Error writing message:", err)
break
}
// This sets the time-out for any outgoing messages.
// conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
}
}