diff --git a/manager/server.go b/manager/server.go index 8a38cfc..94f40ad 100644 --- a/manager/server.go +++ b/manager/server.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "fmt" "net/http" + "time" "github.com/gin-gonic/gin" @@ -153,6 +154,7 @@ func (s *Manager) listWorkers(c *gin.Context) { func (s *Manager) registerWorker(c *gin.Context) { var _worker WorkerStatus c.BindJSON(&_worker) + _worker.LastOnline = time.Now() newWorker, err := s.adapter.CreateWorker(_worker) if err != nil { err := fmt.Errorf("failed to register worker: %s", @@ -230,7 +232,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) { } // post command to worker - _, err = postJSON(workerURL, workerCmd) + _, err = PostJSON(workerURL, workerCmd, s.tlsConfig) if err != nil { err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error()) c.Error(err) diff --git a/manager/server_test.go b/manager/server_test.go index 11909ca..1cc9259 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -65,7 +65,7 @@ func TestHTTPServer(t *testing.T) { w := WorkerStatus{ ID: "test_worker1", } - resp, err := postJSON(baseURL+"/workers", w) + resp, err := PostJSON(baseURL+"/workers", w, nil) So(err, ShouldBeNil) So(resp.StatusCode, ShouldEqual, http.StatusOK) @@ -90,7 +90,7 @@ func TestHTTPServer(t *testing.T) { Upstream: "mirrors.tuna.tsinghua.edu.cn", Size: "3GB", } - resp, err := postJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status) + resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil) defer resp.Body.Close() So(err, ShouldBeNil) So(resp.StatusCode, ShouldEqual, http.StatusOK) @@ -136,8 +136,8 @@ func TestHTTPServer(t *testing.T) { Upstream: "mirrors.tuna.tsinghua.edu.cn", Size: "4GB", } - resp, err := postJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", - baseURL, status.Worker, status.Name), status) + resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", + baseURL, status.Worker, status.Name), status, nil) So(err, ShouldBeNil) So(resp.StatusCode, ShouldEqual, http.StatusBadRequest) defer resp.Body.Close() @@ -156,7 +156,7 @@ func TestHTTPServer(t *testing.T) { ID: "test_worker_cmd", URL: workerBaseURL + "/cmd", } - resp, err := postJSON(baseURL+"/workers", w) + resp, err := PostJSON(baseURL+"/workers", w, nil) So(err, ShouldBeNil) So(resp.StatusCode, ShouldEqual, http.StatusOK) @@ -177,7 +177,7 @@ func TestHTTPServer(t *testing.T) { MirrorID: "ubuntu-sync", WorkerID: "not_exist_worker", } - resp, err := postJSON(baseURL+"/cmd", clientCmd) + resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil) defer resp.Body.Close() So(err, ShouldBeNil) So(resp.StatusCode, ShouldEqual, http.StatusBadRequest) @@ -190,7 +190,7 @@ func TestHTTPServer(t *testing.T) { WorkerID: w.ID, } - resp, err := postJSON(baseURL+"/cmd", clientCmd) + resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil) defer resp.Body.Close() So(err, ShouldBeNil) diff --git a/manager/util.go b/manager/util.go deleted file mode 100644 index 4174eea..0000000 --- a/manager/util.go +++ /dev/null @@ -1,13 +0,0 @@ -package manager - -import ( - "bytes" - "encoding/json" - "net/http" -) - -func postJSON(url string, obj interface{}) (*http.Response, error) { - b := new(bytes.Buffer) - json.NewEncoder(b).Encode(obj) - return http.Post(url, "application/json; charset=utf-8", b) -} diff --git a/worker/runner.go b/worker/runner.go index aa8519f..49e27bd 100644 --- a/worker/runner.go +++ b/worker/runner.go @@ -37,6 +37,14 @@ func newCmdJob(cmdAndArgs []string, workingDir string, env map[string]string) *c panic("Command length should be at least 1!") } + logger.Debug("Executing command %s at %s", cmdAndArgs[0], workingDir) + if _, err := os.Stat(workingDir); os.IsNotExist(err) { + logger.Debug("Making dir %s", workingDir) + if err = os.MkdirAll(workingDir, 0755); err != nil { + logger.Error("Error making dir %s", workingDir) + } + } + cmd.Dir = workingDir cmd.Env = newEnviron(env, true) diff --git a/worker/worker.go b/worker/worker.go index 02ed1f2..56bffa6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -49,6 +49,16 @@ func GetTUNASyncWorker(cfg *Config) *Worker { schedule: newScheduleQueue(), mirrorStatus: make(map[string]SyncStatus), } + + if cfg.Manager.CACert != "" { + tlsConfig, err := GetTLSConfig(cfg.Manager.CACert) + if err != nil { + logger.Error("Failed to init TLS config: %s", err.Error()) + return nil + } + w.tlsConfig = tlsConfig + } + w.initJobs() w.makeHTTPServer() tunasyncWorker = w @@ -75,7 +85,9 @@ func (w *Worker) initProviders() { logDir = c.Global.LogDir } if mirrorDir == "" { - mirrorDir = c.Global.MirrorDir + mirrorDir = filepath.Join( + c.Global.MirrorDir, mirror.Name, + ) } logDir = formatLogDir(logDir, mirror) @@ -87,7 +99,7 @@ func (w *Worker) initProviders() { name: mirror.Name, upstreamURL: mirror.Upstream, command: mirror.Command, - workingDir: filepath.Join(mirrorDir, mirror.Name), + workingDir: mirrorDir, logDir: logDir, logFile: filepath.Join(logDir, "latest.log"), interval: time.Duration(mirror.Interval) * time.Minute, @@ -102,9 +114,10 @@ func (w *Worker) initProviders() { rc := rsyncConfig{ name: mirror.Name, upstreamURL: mirror.Upstream, + rsyncCmd: mirror.Command, password: mirror.Password, excludeFile: mirror.ExcludeFile, - workingDir: filepath.Join(mirrorDir, mirror.Name), + workingDir: mirrorDir, logDir: logDir, logFile: filepath.Join(logDir, "latest.log"), useIPv6: mirror.UseIPv6, @@ -120,9 +133,10 @@ func (w *Worker) initProviders() { name: mirror.Name, stage1Profile: mirror.Stage1Profile, upstreamURL: mirror.Upstream, + rsyncCmd: mirror.Command, password: mirror.Password, excludeFile: mirror.ExcludeFile, - workingDir: filepath.Join(mirrorDir, mirror.Name), + workingDir: mirrorDir, logDir: logDir, logFile: filepath.Join(logDir, "latest.log"), useIPv6: mirror.UseIPv6, @@ -303,7 +317,7 @@ func (w *Worker) registorWorker() { func (w *Worker) updateStatus(jobMsg jobMessage) { url := fmt.Sprintf( - "%s/%s/jobs/%s", + "%s/workers/%s/jobs/%s", w.cfg.Manager.APIBase, w.Name(), jobMsg.name, @@ -329,7 +343,7 @@ func (w *Worker) fetchJobStatus() []MirrorStatus { var mirrorList []MirrorStatus url := fmt.Sprintf( - "%s/%s/jobs", + "%s/workers/%s/jobs", w.cfg.Manager.APIBase, w.Name(), )