Compare commits
4 Commits
1d16d30cb6
...
trunk
Author | SHA1 | Date | |
---|---|---|---|
8bd09a4983 | |||
6c38363694 | |||
9b6a90fbce | |||
b9c1b9e320 |
21
README.md
21
README.md
@ -78,3 +78,24 @@ Both the manager and the agent will have similar setups:
|
||||
* All reads go to a `readProcessor()` which in turn parses the message, and in turn does the appropriate logic (read, calling go routine/functions)
|
||||
|
||||
This does however presents me with a problem. Since all communication is reactive, state-less, and all over the same connection; so tracking things is a bit hard. And this is something for maybe the next phase. So for now, reactive content only. This said, I need figure out how on earth I can prevent an agent from taking two jobs at the same time. I guess I will need to build in some agent state tracking for this.
|
||||
|
||||
#### Server setup
|
||||
_(This piece assumes that the network is super fast, and turbo stable.)_
|
||||
|
||||
The current setup of the server is only the thing that its connected to. There is a writer that spams something, a reader that prints everything it receives to the console. And that is it. Pretty much split-brain and it currently lacks the ability to manage workloads on agents.
|
||||
|
||||
For this exercise it needs to be able to handle multiple agents, but start at one. There needs to be an interface that should tell me how much CPU/MEMs an agent has. And finally tell the agent to solve something and accept any answers it returns.
|
||||
|
||||
Another issue is that it needs a manager interfaces to keep track of Agents, agents metrics, status. But also allow users to issue commands. NFC how I am going to do that. But something tells me it will require me to find a way to communicate all this with the user, and I can do this with a simple REST API, or create a terminal interface, or do web things.
|
||||
|
||||
|
||||
1. Incoming connection gets caught by `handleConnections()`
|
||||
2. `handleConnections()` verifies an agent registers
|
||||
3. After registeration it puts this into a map, along with the `conn` and stuffs this into a slice of maps
|
||||
4. The manager pics up the new agent, and offers info to anyone who wants to see it.
|
||||
5. A client can ask the manager to solve a puzzle, manager calculates which agents to use, sends of the work.
|
||||
6. Solution is returned and stored.
|
||||
|
||||
So from there.
|
||||
|
||||
I think the biggest issue now is find a way to have a terminal or web interface with the user.
|
||||
|
@ -11,6 +11,14 @@ tasks:
|
||||
cmds:
|
||||
- go run . --help
|
||||
silent: true
|
||||
server:
|
||||
cmds:
|
||||
- go run . -role scheduler
|
||||
silent: true
|
||||
agent:
|
||||
cmds:
|
||||
- go run . -role agent
|
||||
silent: true
|
||||
precommit:
|
||||
cmds:
|
||||
- pre-commit autoupdate
|
||||
|
2
main.go
2
main.go
@ -26,7 +26,7 @@ func main() {
|
||||
// Switch on the role
|
||||
switch vars.Role {
|
||||
case "scheduler":
|
||||
server := server.Server{ListenAddress: vars.Address, ListenPort: vars.Port}
|
||||
server := server.Server{ListenAddress: vars.Address, ListenPort: vars.Port, Agents: make(map[string]*server.Agent)}
|
||||
vars.Operator = &server
|
||||
case "agent":
|
||||
client := client.Client{ServerAddress: vars.Address, ServerPort: vars.Port}
|
||||
|
@ -4,63 +4,21 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// Start the HTTP and WebSocket server
|
||||
func (server *Server) Start() {
|
||||
// Start the server
|
||||
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
|
||||
handleConnections(w, r)
|
||||
})
|
||||
// Declare ourselves up and running to the console.
|
||||
log.Println("Started sudoku-funpark server")
|
||||
|
||||
// Run the agent manager
|
||||
server.agentManager()
|
||||
|
||||
// Start the http server
|
||||
http.HandleFunc("GET /json", server.jsonDumper)
|
||||
http.HandleFunc("POST /addTask", server.addTask)
|
||||
|
||||
// Start the websocket server
|
||||
http.HandleFunc("/ws", server.handleConnections)
|
||||
log.Fatal(http.ListenAndServe(server.ListenAddress+":"+strconv.Itoa(server.ListenPort), nil))
|
||||
|
||||
// return nil
|
||||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
|
||||
func handleConnections(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
log.Println("Starting server")
|
||||
go readMessages(conn)
|
||||
writeMessages(conn)
|
||||
|
||||
}
|
||||
|
||||
func readMessages(conn *websocket.Conn) {
|
||||
for {
|
||||
messageType, message, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
log.Println("Error reading message:", err)
|
||||
break
|
||||
}
|
||||
log.Printf("Received(%d): %s\n", messageType, message)
|
||||
|
||||
// This sets the time-out for any incoming messages.
|
||||
// conn.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||
}
|
||||
}
|
||||
|
||||
func writeMessages(conn *websocket.Conn) {
|
||||
for {
|
||||
err := conn.WriteMessage(websocket.TextMessage, []byte("keep alive - staying alive"))
|
||||
if err != nil {
|
||||
log.Println("Error writing message:", err)
|
||||
break
|
||||
}
|
||||
// This sets the time-out for any outgoing messages.
|
||||
// conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
time.Sleep(time.Minute)
|
||||
}
|
||||
}
|
||||
|
61
server/addTask.go
Normal file
61
server/addTask.go
Normal file
@ -0,0 +1,61 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func (server *Server) addTask(w http.ResponseWriter, r *http.Request) {
|
||||
var valid bool = true
|
||||
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
if err := r.ParseForm(); err != nil {
|
||||
fmt.Fprintf(w, "ERROR: Failed ParseForm() err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
//fmt.Println(r.PostForm)
|
||||
// Validate r.PostForm["rows"]
|
||||
// Look at flags.validateRow()
|
||||
// 1. Make sure the rows is 9 in length
|
||||
if len(r.PostForm["rows"]) != 9 {
|
||||
fmt.Fprintf(w, "ERROR: There aren't 9 rows")
|
||||
return
|
||||
}
|
||||
|
||||
// 2. Validate the row
|
||||
for _, value := range r.PostForm["rows"] {
|
||||
if !server.validateRow(value) {
|
||||
valid = false
|
||||
}
|
||||
}
|
||||
|
||||
// Go/No-Go moment
|
||||
if !valid {
|
||||
w.Write([]byte("ERROR found"))
|
||||
return
|
||||
}
|
||||
|
||||
// Add the task
|
||||
var puzzle [9]string
|
||||
puzzle[0] = r.PostForm["rows"][0]
|
||||
puzzle[1] = r.PostForm["rows"][1]
|
||||
puzzle[2] = r.PostForm["rows"][2]
|
||||
puzzle[3] = r.PostForm["rows"][3]
|
||||
puzzle[4] = r.PostForm["rows"][4]
|
||||
puzzle[5] = r.PostForm["rows"][5]
|
||||
puzzle[6] = r.PostForm["rows"][6]
|
||||
puzzle[7] = r.PostForm["rows"][7]
|
||||
puzzle[8] = r.PostForm["rows"][8]
|
||||
|
||||
// Create task and chuck it in the server struct
|
||||
task := Task{Puzzle: puzzle}
|
||||
server.Tasks = append(server.Tasks, &task)
|
||||
|
||||
// Calling it
|
||||
w.Write([]byte("Ok"))
|
||||
|
||||
}
|
23
server/agentManager.go
Normal file
23
server/agentManager.go
Normal file
@ -0,0 +1,23 @@
|
||||
package server
|
||||
|
||||
import "time"
|
||||
|
||||
func (server *Server) agentManager() {
|
||||
// Start the great for-loop
|
||||
for {
|
||||
// Count the number of agents without a runnig task
|
||||
|
||||
// Check if there are any new Tasks up for grabs
|
||||
|
||||
// See if the task can be divided by the number of agents
|
||||
|
||||
// If so, split
|
||||
// if not, split-1
|
||||
// Try again
|
||||
|
||||
// Give agents task, maybe even look for the ones with the most CPUs
|
||||
|
||||
// 😴
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
}
|
80
server/agents.html
Normal file
80
server/agents.html
Normal file
@ -0,0 +1,80 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>JSON Data Display</title>
|
||||
<style>
|
||||
body {
|
||||
font-family: Arial, sans-serif;
|
||||
margin: 20px;
|
||||
}
|
||||
#data-container {
|
||||
margin-top: 20px;
|
||||
}
|
||||
.data-item {
|
||||
margin-bottom: 10px;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>JSON Data Display</h1>
|
||||
<div id="data-container">
|
||||
<!-- Data will be inserted here -->
|
||||
</div>
|
||||
|
||||
<script>
|
||||
const url = 'http://localhost:8080/json'; // Replace with your actual URL
|
||||
const dataContainer = document.getElementById('data-container');
|
||||
|
||||
async function fetchData() {
|
||||
try {
|
||||
const response = await fetch(url);
|
||||
if (!response.ok) {
|
||||
throw new Error('Network response was not ok');
|
||||
}
|
||||
const data = await response.json();
|
||||
updateData(data);
|
||||
} catch (error) {
|
||||
console.error('There was a problem with the fetch operation:', error);
|
||||
}
|
||||
}
|
||||
|
||||
function updateData(data) {
|
||||
const agent = data.Agents.blackbox;
|
||||
dataContainer.innerHTML = `
|
||||
<div class="data-item"><strong>Name:</strong> ${agent.Name}</div>
|
||||
<div class="data-item"><strong>Registration Time:</strong> ${agent.Reg}</div>
|
||||
<div class="data-item"><strong>Cores:</strong> ${agent.Cores}</div>
|
||||
<div class="data-item"><strong>CPU Usage:</strong> ${agent.Cpu.join(', ')}</div>
|
||||
<div class="data-item"><strong>Max Memory:</strong> ${agent.Mem_max}</div>
|
||||
<div class="data-item"><strong>Memory Usage:</strong> ${agent.Mem_usg.join(', ')}</div>
|
||||
<div class="data-item"><strong>Task ID:</strong> ${agent.TaskId}</div>
|
||||
`;
|
||||
}
|
||||
|
||||
// Fetch data initially
|
||||
fetchData();
|
||||
|
||||
// Fetch data every second
|
||||
setInterval(fetchData, 1000);
|
||||
</script>
|
||||
<hr/>
|
||||
<form action="http://localhost:8080/addTask" method="post">
|
||||
<table border="0">
|
||||
<tr><td>Row1</td><td>:</td><td><input type="text" value="000000000" name="rows"></td></tr>
|
||||
<tr><td>Row2</td><td>:</td><td><input type="text" value="000000000" name="rows"></td></tr>
|
||||
<tr><td>Row3</td><td>:</td><td><input type="text" value="000000000" name="rows"></td></tr>
|
||||
<tr><td>Row4</td><td>:</td><td><input type="text" value="000000000" name="rows"></td></tr>
|
||||
<tr><td>Row5</td><td>:</td><td><input type="text" value="000000000" name="rows"></td></tr>
|
||||
<tr><td>Row6</td><td>:</td><td><input type="text" value="000000000" name="rows"></td></tr>
|
||||
<tr><td>Row7</td><td>:</td><td><input type="text" value="000000000" name="rows"></td></tr>
|
||||
<tr><td>Row8</td><td>:</td><td><input type="text" value="000000000" name="rows"></td></tr>
|
||||
<tr><td>Row9</td><td>:</td><td><input type="text" value="000000000" name="rows"></td></tr>
|
||||
<tr><td colspan="3" align="right"><input type="submit"></td></tr>
|
||||
|
||||
</table>
|
||||
</form>
|
||||
<hr/>
|
||||
</body>
|
||||
</html>
|
6
server/closeConnection.go
Normal file
6
server/closeConnection.go
Normal file
@ -0,0 +1,6 @@
|
||||
package server
|
||||
|
||||
func (server *Server) closeConnection(agentName string) {
|
||||
server.Agents[agentName].conn.Close()
|
||||
delete(server.Agents, agentName)
|
||||
}
|
66
server/handleConnections.go
Normal file
66
server/handleConnections.go
Normal file
@ -0,0 +1,66 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Handle incoming websocket connections
|
||||
func (server *Server) handleConnections(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
// defer conn.Close()
|
||||
|
||||
log.Println("Incoming connection")
|
||||
|
||||
// Check for register
|
||||
var registered bool
|
||||
var msgs []string
|
||||
for !registered {
|
||||
_, message, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
log.Println("Error reading message:", err)
|
||||
break
|
||||
}
|
||||
|
||||
msgs = server.parseMessage(message)
|
||||
if string(msgs[0]) == "register" {
|
||||
registered = true
|
||||
}
|
||||
}
|
||||
log.Println("Connected to agent")
|
||||
|
||||
// Create var of type Agent
|
||||
cores_int, err := strconv.Atoi(msgs[2])
|
||||
if err != nil {
|
||||
log.Fatal("ERROR: failed strconv.Atoi(cpu cores):", err)
|
||||
}
|
||||
mem_max_string := strings.ReplaceAll(msgs[3], "GB", "")
|
||||
mem_max_float, err := strconv.ParseFloat(mem_max_string, 64)
|
||||
if err != nil {
|
||||
log.Fatal("ERROR: failed strconv.ParseFloat():", err)
|
||||
}
|
||||
agent := Agent{
|
||||
Name: msgs[1],
|
||||
Reg: time.Now(),
|
||||
Cores: cores_int,
|
||||
Mem_max: mem_max_float,
|
||||
TaskId: 0,
|
||||
conn: conn,
|
||||
}
|
||||
|
||||
// Dump it into server.Agents
|
||||
server.Agents[msgs[1]] = &agent
|
||||
|
||||
defer server.closeConnection(msgs[1])
|
||||
|
||||
go server.readMessages(agent)
|
||||
server.writeMessages(conn)
|
||||
|
||||
}
|
20
server/jsonDumper.go
Normal file
20
server/jsonDumper.go
Normal file
@ -0,0 +1,20 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func (server *Server) jsonDumper(w http.ResponseWriter, r *http.Request) {
|
||||
json, err := json.Marshal(server)
|
||||
if err != nil {
|
||||
fmt.Println("JSON Marshaling error:", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte("This is my Website"))
|
||||
}
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(json)
|
||||
}
|
7
server/parseMessage.go
Normal file
7
server/parseMessage.go
Normal file
@ -0,0 +1,7 @@
|
||||
package server
|
||||
|
||||
import "strings"
|
||||
|
||||
func (server *Server) parseMessage(message []byte) []string {
|
||||
return strings.Split(string(message), ";")
|
||||
}
|
29
server/pruneSlice.go
Normal file
29
server/pruneSlice.go
Normal file
@ -0,0 +1,29 @@
|
||||
package server
|
||||
|
||||
import "slices"
|
||||
|
||||
func (server *Server) pruneIntSlice(mySlice []int) []int {
|
||||
if len(mySlice) > 10 {
|
||||
for key := range mySlice {
|
||||
if key == 10 {
|
||||
mySlice = slices.Delete(mySlice, 9, 10)
|
||||
} else {
|
||||
mySlice[key] = mySlice[key+1]
|
||||
}
|
||||
}
|
||||
}
|
||||
return mySlice
|
||||
}
|
||||
|
||||
func (server *Server) pruneFloat64Slice(mySlice []float64) []float64 {
|
||||
if len(mySlice) > 10 {
|
||||
for key := range mySlice {
|
||||
if key == 10 {
|
||||
mySlice = slices.Delete(mySlice, 9, 10)
|
||||
} else {
|
||||
mySlice[key] = mySlice[key+1]
|
||||
}
|
||||
}
|
||||
}
|
||||
return mySlice
|
||||
}
|
52
server/readMessages.go
Normal file
52
server/readMessages.go
Normal file
@ -0,0 +1,52 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Rewrite to become a general input handler
|
||||
func (server *Server) readMessages(agent Agent) {
|
||||
for {
|
||||
_, message, err := agent.conn.ReadMessage()
|
||||
if err != nil {
|
||||
log.Println("Error reading message:", err)
|
||||
break
|
||||
}
|
||||
// log.Printf("Received(%d): %s\n", messageType, message)
|
||||
|
||||
msgs := server.parseMessage(message)
|
||||
|
||||
if msgs[0] == "update" {
|
||||
cpu_usg, err := strconv.Atoi(msgs[1])
|
||||
if err != nil {
|
||||
fmt.Println("ERROR: converting string to int", err)
|
||||
}
|
||||
server.Agents[agent.Name].Cpu = append(server.Agents[agent.Name].Cpu, cpu_usg)
|
||||
server.Agents[agent.Name].Cpu = server.pruneIntSlice(server.Agents[agent.Name].Cpu)
|
||||
|
||||
mem_usg_string := strings.ReplaceAll(msgs[2], "GB", "")
|
||||
mem_usg_float, err := strconv.ParseFloat(mem_usg_string, 64)
|
||||
if err != nil {
|
||||
log.Fatal("ERROR: failed strconv.ParseFloat():", err)
|
||||
}
|
||||
server.Agents[agent.Name].Mem_usg = append(server.Agents[agent.Name].Mem_usg, mem_usg_float)
|
||||
server.Agents[agent.Name].Mem_usg = server.pruneFloat64Slice(server.Agents[agent.Name].Mem_usg)
|
||||
|
||||
taskId, err := strconv.Atoi(msgs[3])
|
||||
if err != nil {
|
||||
log.Println("ERROR: cannot convert taskId:", err)
|
||||
}
|
||||
server.Agents[agent.Name].TaskId = taskId
|
||||
}
|
||||
|
||||
if msgs[0] == "deregister" {
|
||||
server.closeConnection(agent.Name)
|
||||
}
|
||||
|
||||
// This sets the time-out for any incoming messages.
|
||||
// conn.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||
}
|
||||
}
|
@ -1,6 +1,35 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type Agent struct {
|
||||
Name string
|
||||
Reg time.Time
|
||||
Cores int
|
||||
Cpu []int
|
||||
Mem_max float64
|
||||
Mem_usg []float64
|
||||
TaskId int
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Puzzle [9]string
|
||||
Solutions [][]string
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
ListenAddress string
|
||||
ListenPort int
|
||||
Agents map[string]*Agent
|
||||
Tasks []*Task
|
||||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
|
13
server/validChar.go
Normal file
13
server/validChar.go
Normal file
@ -0,0 +1,13 @@
|
||||
package server
|
||||
|
||||
func (server *Server) validChar(char rune) (valid bool) {
|
||||
decvals := [10]int{48, 49, 50, 51, 52, 53, 54, 55, 56, 57}
|
||||
|
||||
for _, value := range decvals {
|
||||
if char == rune(value) {
|
||||
valid = true
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
40
server/validateRow.go
Normal file
40
server/validateRow.go
Normal file
@ -0,0 +1,40 @@
|
||||
package server
|
||||
|
||||
func (server *Server) validateRow(row string) (valid bool) {
|
||||
// Declarations baby!
|
||||
valid = true
|
||||
var found bool
|
||||
var double bool
|
||||
count := make(map[rune]int)
|
||||
|
||||
// 1. Make sure row is 9 in length
|
||||
if len(row) != 9 {
|
||||
valid = false
|
||||
}
|
||||
|
||||
// 2. Ensure all digits are numbers
|
||||
for _, value := range row {
|
||||
found = server.validChar(value)
|
||||
}
|
||||
|
||||
if !found {
|
||||
valid = false
|
||||
}
|
||||
|
||||
// 3. Ensure all digits (except zero) are only present once
|
||||
for _, digits := range row {
|
||||
count[digits] = count[digits] + 1
|
||||
}
|
||||
|
||||
for key, value := range count {
|
||||
if value > 1 && key != 48 {
|
||||
double = true
|
||||
}
|
||||
}
|
||||
|
||||
if double {
|
||||
valid = false
|
||||
}
|
||||
|
||||
return
|
||||
}
|
19
server/writeMessages.go
Normal file
19
server/writeMessages.go
Normal file
@ -0,0 +1,19 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func (server *Server) writeMessages(conn *websocket.Conn) {
|
||||
for {
|
||||
err := conn.WriteMessage(websocket.TextMessage, []byte("keep alive - staying alive"))
|
||||
if err != nil {
|
||||
log.Println("Error writing message:", err)
|
||||
break
|
||||
}
|
||||
// This sets the time-out for any outgoing messages.
|
||||
// conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user