From 9b6a90fbcecb6a9d72492e2be6a65e0a298fc7b9 Mon Sep 17 00:00:00 2001 From: Sacha Ligthert Date: Sat, 8 Feb 2025 15:18:22 +0100 Subject: [PATCH] Initial work at making the server proper. --- README.md | 21 ++++++++++++ main.go | 2 +- server/Start.go | 64 +++++------------------------------ server/closeConnection.go | 6 ++++ server/handleConnections.go | 66 +++++++++++++++++++++++++++++++++++++ server/jsonDumper.go | 19 +++++++++++ server/parseMessage.go | 7 ++++ server/pruneSlice.go | 29 ++++++++++++++++ server/readMessages.go | 52 +++++++++++++++++++++++++++++ server/types.go | 29 ++++++++++++++++ server/writeMessages.go | 19 +++++++++++ 11 files changed, 258 insertions(+), 56 deletions(-) create mode 100644 server/closeConnection.go create mode 100644 server/handleConnections.go create mode 100644 server/jsonDumper.go create mode 100644 server/parseMessage.go create mode 100644 server/pruneSlice.go create mode 100644 server/readMessages.go create mode 100644 server/writeMessages.go diff --git a/README.md b/README.md index 07501d5..fe05a58 100644 --- a/README.md +++ b/README.md @@ -78,3 +78,24 @@ Both the manager and the agent will have similar setups: * All reads go to a `readProcessor()` which in turn parses the message, and in turn does the appropriate logic (read, calling go routine/functions) This does however presents me with a problem. Since all communication is reactive, state-less, and all over the same connection; so tracking things is a bit hard. And this is something for maybe the next phase. So for now, reactive content only. This said, I need figure out how on earth I can prevent an agent from taking two jobs at the same time. I guess I will need to build in some agent state tracking for this. + +#### Server setup +_(This piece assumes that the network is super fast, and turbo stable.)_ + +The current setup of the server is only the thing that its connected to. There is a writer that spams something, a reader that prints everything it receives to the console. And that is it. Pretty much split-brain and it currently lacks the ability to manage workloads on agents. + +For this exercise it needs to be able to handle multiple agents, but start at one. There needs to be an interface that should tell me how much CPU/MEMs an agent has. And finally tell the agent to solve something and accept any answers it returns. + +Another issue is that it needs a manager interfaces to keep track of Agents, agents metrics, status. But also allow users to issue commands. NFC how I am going to do that. But something tells me it will require me to find a way to communicate all this with the user, and I can do this with a simple REST API, or create a terminal interface, or do web things. + + +1. Incoming connection gets caught by `handleConnections()` +2. `handleConnections()` verifies an agent registers +3. After registeration it puts this into a map, along with the `conn` and stuffs this into a slice of maps +4. The manager pics up the new agent, and offers info to anyone who wants to see it. +5. A client can ask the manager to solve a puzzle, manager calculates which agents to use, sends of the work. +6. Solution is returned and stored. + +So from there. + +I think the biggest issue now is find a way to have a terminal or web interface with the user. diff --git a/main.go b/main.go index 0b7a838..92ffb2f 100644 --- a/main.go +++ b/main.go @@ -26,7 +26,7 @@ func main() { // Switch on the role switch vars.Role { case "scheduler": - server := server.Server{ListenAddress: vars.Address, ListenPort: vars.Port} + server := server.Server{ListenAddress: vars.Address, ListenPort: vars.Port, Agents: make(map[string]*server.Agent)} vars.Operator = &server case "agent": client := client.Client{ServerAddress: vars.Address, ServerPort: vars.Port} diff --git a/server/Start.go b/server/Start.go index ada28f9..42536be 100644 --- a/server/Start.go +++ b/server/Start.go @@ -4,63 +4,17 @@ import ( "log" "net/http" "strconv" - "time" - - "github.com/gorilla/websocket" ) +// Start the HTTP and WebSocket server func (server *Server) Start() { - // Start the server - http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { - handleConnections(w, r) - }) + // Declare ourselves up and running to the console. + log.Println("Started sudoku-funpark server") + + // Start the http server + http.HandleFunc("/json", server.jsonDumper) + + // Start the websocket server + http.HandleFunc("/ws", server.handleConnections) log.Fatal(http.ListenAndServe(server.ListenAddress+":"+strconv.Itoa(server.ListenPort), nil)) - - // return nil -} - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, -} - -func handleConnections(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Println(err) - return - } - defer conn.Close() - - log.Println("Starting server") - go readMessages(conn) - writeMessages(conn) - -} - -func readMessages(conn *websocket.Conn) { - for { - messageType, message, err := conn.ReadMessage() - if err != nil { - log.Println("Error reading message:", err) - break - } - log.Printf("Received(%d): %s\n", messageType, message) - - // This sets the time-out for any incoming messages. - // conn.SetReadDeadline(time.Now().Add(10 * time.Second)) - } -} - -func writeMessages(conn *websocket.Conn) { - for { - err := conn.WriteMessage(websocket.TextMessage, []byte("keep alive - staying alive")) - if err != nil { - log.Println("Error writing message:", err) - break - } - // This sets the time-out for any outgoing messages. - // conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) - time.Sleep(time.Minute) - } } diff --git a/server/closeConnection.go b/server/closeConnection.go new file mode 100644 index 0000000..98c8ffc --- /dev/null +++ b/server/closeConnection.go @@ -0,0 +1,6 @@ +package server + +func (server *Server) closeConnection(agentName string) { + server.Agents[agentName].conn.Close() + delete(server.Agents, agentName) +} diff --git a/server/handleConnections.go b/server/handleConnections.go new file mode 100644 index 0000000..c31c5cb --- /dev/null +++ b/server/handleConnections.go @@ -0,0 +1,66 @@ +package server + +import ( + "log" + "net/http" + "strconv" + "strings" + "time" +) + +// Handle incoming websocket connections +func (server *Server) handleConnections(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + // defer conn.Close() + + log.Println("Incoming connection") + + // Check for register + var registered bool + var msgs []string + for !registered { + _, message, err := conn.ReadMessage() + if err != nil { + log.Println("Error reading message:", err) + break + } + + msgs = server.parseMessage(message) + if string(msgs[0]) == "register" { + registered = true + } + } + log.Println("Connected to agent") + + // Create var of type Agent + cores_int, err := strconv.Atoi(msgs[2]) + if err != nil { + log.Fatal("ERROR: failed strconv.Atoi(cpu cores):", err) + } + mem_max_string := strings.ReplaceAll(msgs[3], "GB", "") + mem_max_float, err := strconv.ParseFloat(mem_max_string, 64) + if err != nil { + log.Fatal("ERROR: failed strconv.ParseFloat():", err) + } + agent := Agent{ + Name: msgs[1], + Reg: time.Now(), + Cores: cores_int, + Mem_max: mem_max_float, + TaskId: 0, + conn: conn, + } + + // Dump it into server.Agents + server.Agents[msgs[1]] = &agent + + defer server.closeConnection(msgs[1]) + + go server.readMessages(agent) + server.writeMessages(conn) + +} diff --git a/server/jsonDumper.go b/server/jsonDumper.go new file mode 100644 index 0000000..3ea8abf --- /dev/null +++ b/server/jsonDumper.go @@ -0,0 +1,19 @@ +package server + +import ( + "encoding/json" + "fmt" + "net/http" +) + +func (server *Server) jsonDumper(w http.ResponseWriter, r *http.Request) { + json, err := json.Marshal(server) + if err != nil { + fmt.Println("JSON Marshaling error:", err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("This is my Website")) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(json) +} diff --git a/server/parseMessage.go b/server/parseMessage.go new file mode 100644 index 0000000..db45af2 --- /dev/null +++ b/server/parseMessage.go @@ -0,0 +1,7 @@ +package server + +import "strings" + +func (server *Server) parseMessage(message []byte) []string { + return strings.Split(string(message), ";") +} diff --git a/server/pruneSlice.go b/server/pruneSlice.go new file mode 100644 index 0000000..e5a854e --- /dev/null +++ b/server/pruneSlice.go @@ -0,0 +1,29 @@ +package server + +import "slices" + +func (server *Server) pruneIntSlice(mySlice []int) []int { + if len(mySlice) > 10 { + for key := range mySlice { + if key == 10 { + mySlice = slices.Delete(mySlice, 9, 10) + } else { + mySlice[key] = mySlice[key+1] + } + } + } + return mySlice +} + +func (server *Server) pruneFloat64Slice(mySlice []float64) []float64 { + if len(mySlice) > 10 { + for key := range mySlice { + if key == 10 { + mySlice = slices.Delete(mySlice, 9, 10) + } else { + mySlice[key] = mySlice[key+1] + } + } + } + return mySlice +} diff --git a/server/readMessages.go b/server/readMessages.go new file mode 100644 index 0000000..c1fdd77 --- /dev/null +++ b/server/readMessages.go @@ -0,0 +1,52 @@ +package server + +import ( + "fmt" + "log" + "strconv" + "strings" +) + +// Rewrite to become a general input handler +func (server *Server) readMessages(agent Agent) { + for { + _, message, err := agent.conn.ReadMessage() + if err != nil { + log.Println("Error reading message:", err) + break + } + // log.Printf("Received(%d): %s\n", messageType, message) + + msgs := server.parseMessage(message) + + if msgs[0] == "update" { + cpu_usg, err := strconv.Atoi(msgs[1]) + if err != nil { + fmt.Println("ERROR: converting string to int", err) + } + server.Agents[agent.Name].Cpu = append(server.Agents[agent.Name].Cpu, cpu_usg) + server.Agents[agent.Name].Cpu = server.pruneIntSlice(server.Agents[agent.Name].Cpu) + + mem_usg_string := strings.ReplaceAll(msgs[2], "GB", "") + mem_usg_float, err := strconv.ParseFloat(mem_usg_string, 64) + if err != nil { + log.Fatal("ERROR: failed strconv.ParseFloat():", err) + } + server.Agents[agent.Name].Mem_usg = append(server.Agents[agent.Name].Mem_usg, mem_usg_float) + server.Agents[agent.Name].Mem_usg = server.pruneFloat64Slice(server.Agents[agent.Name].Mem_usg) + + taskId, err := strconv.Atoi(msgs[3]) + if err != nil { + log.Println("ERROR: cannot convert taskId:", err) + } + server.Agents[agent.Name].TaskId = taskId + } + + if msgs[0] == "deregister" { + server.closeConnection(agent.Name) + } + + // This sets the time-out for any incoming messages. + // conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + } +} diff --git a/server/types.go b/server/types.go index 3656900..f487942 100644 --- a/server/types.go +++ b/server/types.go @@ -1,6 +1,35 @@ package server +import ( + "time" + + "github.com/gorilla/websocket" +) + +type Agent struct { + Name string + Reg time.Time + Cores int + Cpu []int + Mem_max float64 + Mem_usg []float64 + TaskId int + conn *websocket.Conn +} + +type Task struct { + Puzzle [9]string + Solutions [][]string +} + type Server struct { ListenAddress string ListenPort int + Agents map[string]*Agent + Tasks []Task +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, } diff --git a/server/writeMessages.go b/server/writeMessages.go new file mode 100644 index 0000000..9e0f6af --- /dev/null +++ b/server/writeMessages.go @@ -0,0 +1,19 @@ +package server + +import ( + "log" + + "github.com/gorilla/websocket" +) + +func (server *Server) writeMessages(conn *websocket.Conn) { + for { + err := conn.WriteMessage(websocket.TextMessage, []byte("keep alive - staying alive")) + if err != nil { + log.Println("Error writing message:", err) + break + } + // This sets the time-out for any outgoing messages. + // conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + } +}