refactor(tunasync):

1. refactored manager and worker to support TLS transport
2. if mirror_dir is specified from a mirror config, don't add the mirror name
This commit is contained in:
bigeagle 2016-04-28 21:02:39 +08:00
parent 9865f28259
commit 9fbb8ab155
No known key found for this signature in database
GPG Key ID: 9171A4571C27920A
5 changed files with 38 additions and 27 deletions

View File

@ -4,6 +4,7 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net/http" "net/http"
"time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -153,6 +154,7 @@ func (s *Manager) listWorkers(c *gin.Context) {
func (s *Manager) registerWorker(c *gin.Context) { func (s *Manager) registerWorker(c *gin.Context) {
var _worker WorkerStatus var _worker WorkerStatus
c.BindJSON(&_worker) c.BindJSON(&_worker)
_worker.LastOnline = time.Now()
newWorker, err := s.adapter.CreateWorker(_worker) newWorker, err := s.adapter.CreateWorker(_worker)
if err != nil { if err != nil {
err := fmt.Errorf("failed to register worker: %s", err := fmt.Errorf("failed to register worker: %s",
@ -230,7 +232,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
} }
// post command to worker // post command to worker
_, err = postJSON(workerURL, workerCmd) _, err = PostJSON(workerURL, workerCmd, s.tlsConfig)
if err != nil { if err != nil {
err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error()) err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
c.Error(err) c.Error(err)

View File

@ -65,7 +65,7 @@ func TestHTTPServer(t *testing.T) {
w := WorkerStatus{ w := WorkerStatus{
ID: "test_worker1", ID: "test_worker1",
} }
resp, err := postJSON(baseURL+"/workers", w) resp, err := PostJSON(baseURL+"/workers", w, nil)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
@ -90,7 +90,7 @@ func TestHTTPServer(t *testing.T) {
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB", Size: "3GB",
} }
resp, err := postJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status) resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
defer resp.Body.Close() defer resp.Body.Close()
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
@ -136,8 +136,8 @@ func TestHTTPServer(t *testing.T) {
Upstream: "mirrors.tuna.tsinghua.edu.cn", Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB", Size: "4GB",
} }
resp, err := postJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
baseURL, status.Worker, status.Name), status) baseURL, status.Worker, status.Name), status, nil)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest) So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
defer resp.Body.Close() defer resp.Body.Close()
@ -156,7 +156,7 @@ func TestHTTPServer(t *testing.T) {
ID: "test_worker_cmd", ID: "test_worker_cmd",
URL: workerBaseURL + "/cmd", URL: workerBaseURL + "/cmd",
} }
resp, err := postJSON(baseURL+"/workers", w) resp, err := PostJSON(baseURL+"/workers", w, nil)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK) So(resp.StatusCode, ShouldEqual, http.StatusOK)
@ -177,7 +177,7 @@ func TestHTTPServer(t *testing.T) {
MirrorID: "ubuntu-sync", MirrorID: "ubuntu-sync",
WorkerID: "not_exist_worker", WorkerID: "not_exist_worker",
} }
resp, err := postJSON(baseURL+"/cmd", clientCmd) resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
defer resp.Body.Close() defer resp.Body.Close()
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest) So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
@ -190,7 +190,7 @@ func TestHTTPServer(t *testing.T) {
WorkerID: w.ID, WorkerID: w.ID,
} }
resp, err := postJSON(baseURL+"/cmd", clientCmd) resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
defer resp.Body.Close() defer resp.Body.Close()
So(err, ShouldBeNil) So(err, ShouldBeNil)

View File

@ -1,13 +0,0 @@
package manager
import (
"bytes"
"encoding/json"
"net/http"
)
func postJSON(url string, obj interface{}) (*http.Response, error) {
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(obj)
return http.Post(url, "application/json; charset=utf-8", b)
}

View File

@ -37,6 +37,14 @@ func newCmdJob(cmdAndArgs []string, workingDir string, env map[string]string) *c
panic("Command length should be at least 1!") panic("Command length should be at least 1!")
} }
logger.Debug("Executing command %s at %s", cmdAndArgs[0], workingDir)
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
logger.Debug("Making dir %s", workingDir)
if err = os.MkdirAll(workingDir, 0755); err != nil {
logger.Error("Error making dir %s", workingDir)
}
}
cmd.Dir = workingDir cmd.Dir = workingDir
cmd.Env = newEnviron(env, true) cmd.Env = newEnviron(env, true)

View File

@ -49,6 +49,16 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
schedule: newScheduleQueue(), schedule: newScheduleQueue(),
mirrorStatus: make(map[string]SyncStatus), mirrorStatus: make(map[string]SyncStatus),
} }
if cfg.Manager.CACert != "" {
tlsConfig, err := GetTLSConfig(cfg.Manager.CACert)
if err != nil {
logger.Error("Failed to init TLS config: %s", err.Error())
return nil
}
w.tlsConfig = tlsConfig
}
w.initJobs() w.initJobs()
w.makeHTTPServer() w.makeHTTPServer()
tunasyncWorker = w tunasyncWorker = w
@ -75,7 +85,9 @@ func (w *Worker) initProviders() {
logDir = c.Global.LogDir logDir = c.Global.LogDir
} }
if mirrorDir == "" { if mirrorDir == "" {
mirrorDir = c.Global.MirrorDir mirrorDir = filepath.Join(
c.Global.MirrorDir, mirror.Name,
)
} }
logDir = formatLogDir(logDir, mirror) logDir = formatLogDir(logDir, mirror)
@ -87,7 +99,7 @@ func (w *Worker) initProviders() {
name: mirror.Name, name: mirror.Name,
upstreamURL: mirror.Upstream, upstreamURL: mirror.Upstream,
command: mirror.Command, command: mirror.Command,
workingDir: filepath.Join(mirrorDir, mirror.Name), workingDir: mirrorDir,
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
interval: time.Duration(mirror.Interval) * time.Minute, interval: time.Duration(mirror.Interval) * time.Minute,
@ -102,9 +114,10 @@ func (w *Worker) initProviders() {
rc := rsyncConfig{ rc := rsyncConfig{
name: mirror.Name, name: mirror.Name,
upstreamURL: mirror.Upstream, upstreamURL: mirror.Upstream,
rsyncCmd: mirror.Command,
password: mirror.Password, password: mirror.Password,
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
workingDir: filepath.Join(mirrorDir, mirror.Name), workingDir: mirrorDir,
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6, useIPv6: mirror.UseIPv6,
@ -120,9 +133,10 @@ func (w *Worker) initProviders() {
name: mirror.Name, name: mirror.Name,
stage1Profile: mirror.Stage1Profile, stage1Profile: mirror.Stage1Profile,
upstreamURL: mirror.Upstream, upstreamURL: mirror.Upstream,
rsyncCmd: mirror.Command,
password: mirror.Password, password: mirror.Password,
excludeFile: mirror.ExcludeFile, excludeFile: mirror.ExcludeFile,
workingDir: filepath.Join(mirrorDir, mirror.Name), workingDir: mirrorDir,
logDir: logDir, logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"), logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6, useIPv6: mirror.UseIPv6,
@ -303,7 +317,7 @@ func (w *Worker) registorWorker() {
func (w *Worker) updateStatus(jobMsg jobMessage) { func (w *Worker) updateStatus(jobMsg jobMessage) {
url := fmt.Sprintf( url := fmt.Sprintf(
"%s/%s/jobs/%s", "%s/workers/%s/jobs/%s",
w.cfg.Manager.APIBase, w.cfg.Manager.APIBase,
w.Name(), w.Name(),
jobMsg.name, jobMsg.name,
@ -329,7 +343,7 @@ func (w *Worker) fetchJobStatus() []MirrorStatus {
var mirrorList []MirrorStatus var mirrorList []MirrorStatus
url := fmt.Sprintf( url := fmt.Sprintf(
"%s/%s/jobs", "%s/workers/%s/jobs",
w.cfg.Manager.APIBase, w.cfg.Manager.APIBase,
w.Name(), w.Name(),
) )