From 0b4c5b9cb97d4549f699e11dcad20574b53bd86c Mon Sep 17 00:00:00 2001 From: bigeagle Date: Tue, 9 Dec 2014 13:27:56 +0800 Subject: [PATCH 1/5] add and options --- examples/tunasync.conf | 1 + tunasync/exec_pre_post.py | 35 +++++++++++++++++++++++++++++++++++ tunasync/jobs.py | 1 + tunasync/mirror_config.py | 10 ++++++++++ tunasync/mirror_provider.py | 3 ++- 5 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 tunasync/exec_pre_post.py diff --git a/examples/tunasync.conf b/examples/tunasync.conf index cf949a1..61d114e 100644 --- a/examples/tunasync.conf +++ b/examples/tunasync.conf @@ -31,6 +31,7 @@ provider = "shell" command = "sleep 10" local_dir = "/mnt/sdb1/mirror/archlinux/current/" # log_file = "/dev/null" +exec_post_sync = "/bin/bash -c 'date --utc \"+%s\" > ${TUNASYNC_WORKING_DIR}/.timestamp'" [[mirrors]] name = "arch2" diff --git a/tunasync/exec_pre_post.py b/tunasync/exec_pre_post.py new file mode 100644 index 0000000..892b673 --- /dev/null +++ b/tunasync/exec_pre_post.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import os +import sh +import shlex +from .hook import JobHook + + +class CmdExecHook(JobHook): + POST_SYNC = "post_sync" + PRE_SYNC = "pre_sync" + + def __init__(self, command, exec_at=POST_SYNC): + self.command = shlex.split(command) + if exec_at == self.POST_SYNC: + self.before_job = self._keep_calm + self.after_job = self._exec + elif exec_at == self.PRE_SYNC: + self.before_job = self._exec + self.after_job = self._keep_calm + + def _keep_calm(self, ctx={}, **kwargs): + pass + + def _exec(self, ctx={}, **kwargs): + new_env = os.environ.copy() + new_env["TUNASYNC_MIRROR_NAME"] = ctx["mirror_name"] + new_env["TUNASYNC_WORKING_DIR"] = ctx["current_dir"] + + _cmd = self.command[0] + _args = [] if len(self.command) == 1 else self.command[1:] + cmd = sh.Command(_cmd) + cmd(*_args, _env=new_env) + +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/jobs.py b/tunasync/jobs.py index 280c4f1..8656fb3 100644 --- a/tunasync/jobs.py +++ b/tunasync/jobs.py @@ -40,6 +40,7 @@ def run_job(sema, child_q, manager_q, provider, **settings): manager_q.put(("UPDATE", (provider.name, status))) ctx = {} # put context info in it ctx['current_dir'] = provider.local_dir + ctx['mirror_name'] = provider.name try: for hook in provider.hooks: diff --git a/tunasync/mirror_config.py b/tunasync/mirror_config.py index 3015f48..d294c04 100644 --- a/tunasync/mirror_config.py +++ b/tunasync/mirror_config.py @@ -5,6 +5,7 @@ from datetime import datetime from .mirror_provider import RsyncProvider, ShellProvider from .btrfs_snapshot import BtrfsHook from .loglimit import LogLimitHook +from .exec_pre_post import CmdExecHook class MirrorConfig(object): @@ -126,6 +127,15 @@ class MirrorConfig(object): hooks.append(BtrfsHook(service_dir, working_dir, gc_dir)) hooks.append(LogLimitHook()) + + if self.exec_pre_sync: + hooks.append( + CmdExecHook(self.exec_pre_sync, CmdExecHook.PRE_SYNC)) + + if self.exec_post_sync: + hooks.append( + CmdExecHook(self.exec_post_sync, CmdExecHook.POST_SYNC)) + return hooks # vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/mirror_provider.py b/tunasync/mirror_provider.py index 38cb843..2291ed5 100644 --- a/tunasync/mirror_provider.py +++ b/tunasync/mirror_provider.py @@ -2,6 +2,7 @@ # -*- coding:utf-8 -*- import sh import os +import shlex from datetime import datetime @@ -110,7 +111,7 @@ class ShellProvider(MirrorProvider): super(ShellProvider, self).__init__(name, local_dir, log_dir, log_file, interval, hooks) self.upstream_url = str(upstream_url) - self.command = command.split() + self.command = shlex.split(command) def run(self, ctx={}): From caac9b495083be23f453782e33c581fbb8f1c4d8 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Fri, 19 Dec 2014 14:28:01 +0800 Subject: [PATCH 2/5] add option to disable stdout/stderr redirection for shell provider --- tunasync/mirror_config.py | 1 + tunasync/mirror_provider.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tunasync/mirror_config.py b/tunasync/mirror_config.py index d294c04..df2c365 100644 --- a/tunasync/mirror_config.py +++ b/tunasync/mirror_config.py @@ -81,6 +81,7 @@ class MirrorConfig(object): local_dir=self.local_dir, log_dir=self.log_dir, log_file=self.log_file, + log_stdout=self.options.get("log_stdout", True), interval=self.interval, hooks=hooks ) diff --git a/tunasync/mirror_provider.py b/tunasync/mirror_provider.py index 2291ed5..78a3ec4 100644 --- a/tunasync/mirror_provider.py +++ b/tunasync/mirror_provider.py @@ -106,12 +106,13 @@ class RsyncProvider(MirrorProvider): class ShellProvider(MirrorProvider): def __init__(self, name, command, upstream_url, local_dir, log_dir, - log_file="/dev/null", interval=120, hooks=[]): + log_file="/dev/null", log_stdout=True, interval=120, hooks=[]): super(ShellProvider, self).__init__(name, local_dir, log_dir, log_file, interval, hooks) self.upstream_url = str(upstream_url) self.command = shlex.split(command) + self.log_stdout = log_stdout def run(self, ctx={}): @@ -128,8 +129,13 @@ class ShellProvider(MirrorProvider): _args = [] if len(self.command) == 1 else self.command[1:] cmd = sh.Command(_cmd) - self.p = cmd(*_args, _env=new_env, _out=log_file, - _err=log_file, _out_bufsize=1, _bg=True) + + if self.log_stdout: + self.p = cmd(*_args, _env=new_env, _out=log_file, + _err=log_file, _out_bufsize=1, _bg=True) + else: + self.p = cmd(*_args, _env=new_env, _out='/dev/null', + _err='/dev/null', _out_bufsize=1, _bg=True) # vim: ts=4 sw=4 sts=4 expandtab From a99ce6575f2236d777ae2fa231f7dac538ef7c2d Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sat, 16 May 2015 14:09:57 +0800 Subject: [PATCH 3/5] rewrite tunasynctl, add command --- tunasync/clt_server.py | 10 +++- tunasync/jobs.py | 8 ++- tunasync/status_manager.py | 6 +- tunasync/tunasync.py | 120 ++++++++++++++++++++++++------------- tunasynctl.py | 35 +++++++++-- 5 files changed, 125 insertions(+), 54 deletions(-) diff --git a/tunasync/clt_server.py b/tunasync/clt_server.py index 4a6ed3d..7a815f8 100644 --- a/tunasync/clt_server.py +++ b/tunasync/clt_server.py @@ -8,6 +8,10 @@ import struct class ControlServer(object): + valid_commands = set(( + "start", "stop", "restart", "status", "log", + )) + def __init__(self, address, mgr_chan, cld_chan): self.address = address self.mgr_chan = mgr_chan @@ -19,7 +23,7 @@ class ControlServer(object): raise Exception("file exists: {}".format(self.address)) self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.bind(self.address) - os.chmod(address, 0700) + os.chmod(address, 0o700) print("Control Server listening on: {}".format(self.address)) self.sock.listen(1) @@ -32,7 +36,9 @@ class ControlServer(object): length = struct.unpack('!H', conn.recv(2))[0] content = conn.recv(length) cmd = json.loads(content) - self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target']))) + if cmd['cmd'] not in self.valid_commands: + raise Exception("Invalid Command") + self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target'], cmd["kwargs"]))) except Exception as e: print(e) res = "Invalid Command" diff --git a/tunasync/jobs.py b/tunasync/jobs.py index 8656fb3..fd48d72 100644 --- a/tunasync/jobs.py +++ b/tunasync/jobs.py @@ -36,11 +36,11 @@ def run_job(sema, child_q, manager_q, provider, **settings): break aquired = True - status = "syncing" - manager_q.put(("UPDATE", (provider.name, status))) ctx = {} # put context info in it ctx['current_dir'] = provider.local_dir ctx['mirror_name'] = provider.name + status = "pre-syncing" + manager_q.put(("UPDATE", (provider.name, status, ctx))) try: for hook in provider.hooks: @@ -50,7 +50,9 @@ def run_job(sema, child_q, manager_q, provider, **settings): traceback.print_exc() status = "fail" else: + status = "syncing" for retry in range(max_retry): + manager_q.put(("UPDATE", (provider.name, status, ctx))) print("start syncing {}, retry: {}".format(provider.name, retry)) provider.run(ctx=ctx) @@ -78,7 +80,7 @@ def run_job(sema, child_q, manager_q, provider, **settings): provider.name, provider.interval )) - manager_q.put(("UPDATE", (provider.name, status))) + manager_q.put(("UPDATE", (provider.name, status, ctx))) try: msg = child_q.get(timeout=provider.interval * 60) diff --git a/tunasync/status_manager.py b/tunasync/status_manager.py index 62c09e8..4479141 100644 --- a/tunasync/status_manager.py +++ b/tunasync/status_manager.py @@ -32,8 +32,11 @@ class StatusManager(object): pass self.mirrors = mirrors + self.mirrors_ctx = {key: {} for key in self.mirrors} def get_info(self, name, key): + if key == "ctx": + return self.mirrors_ctx.get(name, {}) _m = self.mirrors.get(name, {}) return _m.get(key, None) @@ -50,7 +53,7 @@ class StatusManager(object): self.mirrors[name] = dict(_m.items()) self.commit_db() - def update_status(self, name, status): + def update_status(self, name, status, ctx={}): _m = self.mirrors.get(name, { 'name': name, @@ -68,6 +71,7 @@ class StatusManager(object): _m['last_update'] = update_time _m['status'] = status self.mirrors[name] = dict(_m.items()) + self.mirrors_ctx[name] = ctx self.commit_db() print("Updated status file, {}:{}".format(name, status)) diff --git a/tunasync/tunasync.py b/tunasync/tunasync.py index 13f40c3..93615fc 100644 --- a/tunasync/tunasync.py +++ b/tunasync/tunasync.py @@ -173,63 +173,99 @@ class TUNASync(object): def run_forever(self): while 1: - try: msg_hdr, msg_body = self.channel.get() except IOError: continue if msg_hdr == "UPDATE": - name, status = msg_body + mirror_name, status, ctx = msg_body try: - self.status_manager.update_status(name, status) + self.status_manager.update_status( + mirror_name, status, dict(ctx.items())) except Exception as e: print(e) elif msg_hdr == "CONFIG_ACK": - name, status = msg_body + mirror_name, status = msg_body if status == "QUIT": - print("New configuration applied to {}".format(name)) - self.run_provider(name) + print("New configuration applied to {}".format(mirror_name)) + self.run_provider(mirror_name) elif msg_hdr == "CMD": - cmd, name = msg_body - if (name not in self.mirrors) and (name != "__ALL__"): + cmd, mirror_name, kwargs = msg_body + if (mirror_name not in self.mirrors) and (mirror_name != "__ALL__"): self.ctrl_channel.put("Invalid target") continue - - if cmd == "restart": - _, p = self.processes[name] - p.terminate() - self.providers[name].set_delay(0) - self.run_provider(name) - res = "Restarted Job: {}".format(name) - - elif cmd == "stop": - if name not in self.processes: - res = "{} not running".format(name) - self.ctrl_channel.put(res) - continue - - _, p = self.processes.pop(name) - p.terminate() - res = "Stopped Job: {}".format(name) - - elif cmd == "start": - if name in self.processes: - res = "{} already running".format(name) - self.ctrl_channel.put(res) - continue - - self.run_provider(name) - res = "Started Job: {}".format(name) - elif cmd == "status": - if name == "__ALL__": - res = self.status_manager.list_status(_format=True) - else: - res = self.status_manager.get_status(name, _format=True) - else: - res = "Invalid command" - + res = self.handle_cmd(cmd, mirror_name, kwargs) self.ctrl_channel.put(res) + + def handle_cmd(self, cmd, mirror_name, kwargs): + if cmd == "restart": + _, p = self.processes[mirror_name] + p.terminate() + self.providers[mirror_name].set_delay(0) + self.run_provider(mirror_name) + res = "Restarted Job: {}".format(mirror_name) + + elif cmd == "stop": + if mirror_name not in self.processes: + res = "{} not running".format(mirror_name) + return res + + _, p = self.processes.pop(mirror_name) + p.terminate() + res = "Stopped Job: {}".format(mirror_name) + + elif cmd == "start": + if mirror_name in self.processes: + res = "{} already running".format(mirror_name) + return res + + self.run_provider(mirror_name) + res = "Started Job: {}".format(mirror_name) + + elif cmd == "status": + if mirror_name == "__ALL__": + res = self.status_manager.list_status(_format=True) + else: + res = self.status_manager.get_status(mirror_name, _format=True) + + elif cmd == "log": + job_ctx = self.status_manager.get_info(mirror_name, "ctx") + n = kwargs.get("n", 0) + if n == 0: + res = job_ctx.get("log_file", "/dev/null") + else: + import os + log_file = job_ctx.get("log_file", None) + if log_file is None: + return "/dev/null" + + log_dir = os.path.dirname(log_file) + lfiles = [ + os.path.join(log_dir, lfile) + for lfile in os.listdir(log_dir) + if lfile.startswith(mirror_name) and lfile != "latest" + ] + + if len(lfiles) <= n: + res = "Only {} log files available".format(len(lfiles)) + return res + + lfiles_set = set(lfiles) + # sort to get the newest 10 files + lfiles_ts = sorted( + [(os.path.getmtime(lfile), lfile) for lfile in lfiles], + key=lambda x: x[0], + reverse=True, + ) + return lfiles_ts[n][1] + + else: + res = "Invalid command" + + return res + + # vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasynctl.py b/tunasynctl.py index 4efe53f..faf44cc 100755 --- a/tunasynctl.py +++ b/tunasynctl.py @@ -10,22 +10,45 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(prog="tunasynctl") parser.add_argument("-s", "--socket", default="/var/run/tunasync.sock", help="socket file") - parser.add_argument("command", help="command") - parser.add_argument("target", nargs="?", default="__ALL__", help="target") - args = parser.parse_args() + subparsers = parser.add_subparsers(dest="command", help='sub-command help') + + sp = subparsers.add_parser('start', help="start job") + sp.add_argument("target", help="mirror job name") + + sp = subparsers.add_parser('stop', help="stop job") + sp.add_argument("target", help="mirror job name") + + sp = subparsers.add_parser('restart', help="restart job") + sp.add_argument("target", help="mirror job name") + + sp = subparsers.add_parser('status', help="show mirror status") + sp.add_argument("target", nargs="?", default="__ALL__", help="mirror job name") + + sp = subparsers.add_parser('log', help="return log file path") + sp.add_argument("-n", type=int, default=0, help="last n-th log, default 0 (latest)") + sp.add_argument("target", help="mirror job name") + + sp = subparsers.add_parser('help', help="show help message") + + args = vars(parser.parse_args()) + + if args['command'] == "help": + parser.print_help() + sys.exit(0) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: - sock.connect(args.socket) + sock.connect(args.pop("socket")) except socket.error as msg: print(msg) sys.exit(1) pack = json.dumps({ - 'cmd': args.command, - 'target': args.target, + "cmd": args.pop("command"), + "target": args.pop("target"), + "kwargs": args, }) try: From 2626102a5f615067cbe9b19b54611bae4398a4e9 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sat, 16 May 2015 14:11:27 +0800 Subject: [PATCH 4/5] todo --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 4238fd1..5fa4d70 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,12 @@ tunasync ## TODO -- [ ] implement `tunasynctl tail` and `tunasynctl log` or equivalent feature +- [ ] use context manager to handle job contexts +- [ ] Hooks need "pre_try", "post_try" +- [x] implement `tunasynctl tail` and `tunasynctl log` or equivalent feature - [x] status file - [ ] mirror size - - [ ] upstream + - [x] upstream - [x] btrfs backend (create snapshot before syncing) - [x] add mirror job online - [x] use toml as configuration From 7469cc7bf1beb8a8f154cf92644d27f54355d632 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Sat, 16 May 2015 14:31:50 +0800 Subject: [PATCH 5/5] status bug fix --- tunasync/status_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tunasync/status_manager.py b/tunasync/status_manager.py index 4479141..46bad12 100644 --- a/tunasync/status_manager.py +++ b/tunasync/status_manager.py @@ -61,7 +61,7 @@ class StatusManager(object): 'status': '-', }) - if status in ("syncing", "fail"): + if status in ("syncing", "fail", "pre-syncing"): update_time = _m["last_update"] elif status == "success": update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")