diff --git a/tunasync/clt_server.py b/tunasync/clt_server.py index 4a6ed3d..7a815f8 100644 --- a/tunasync/clt_server.py +++ b/tunasync/clt_server.py @@ -8,6 +8,10 @@ import struct class ControlServer(object): + valid_commands = set(( + "start", "stop", "restart", "status", "log", + )) + def __init__(self, address, mgr_chan, cld_chan): self.address = address self.mgr_chan = mgr_chan @@ -19,7 +23,7 @@ class ControlServer(object): raise Exception("file exists: {}".format(self.address)) self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.bind(self.address) - os.chmod(address, 0700) + os.chmod(address, 0o700) print("Control Server listening on: {}".format(self.address)) self.sock.listen(1) @@ -32,7 +36,9 @@ class ControlServer(object): length = struct.unpack('!H', conn.recv(2))[0] content = conn.recv(length) cmd = json.loads(content) - self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target']))) + if cmd['cmd'] not in self.valid_commands: + raise Exception("Invalid Command") + self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target'], cmd["kwargs"]))) except Exception as e: print(e) res = "Invalid Command" diff --git a/tunasync/jobs.py b/tunasync/jobs.py index 8656fb3..fd48d72 100644 --- a/tunasync/jobs.py +++ b/tunasync/jobs.py @@ -36,11 +36,11 @@ def run_job(sema, child_q, manager_q, provider, **settings): break aquired = True - status = "syncing" - manager_q.put(("UPDATE", (provider.name, status))) ctx = {} # put context info in it ctx['current_dir'] = provider.local_dir ctx['mirror_name'] = provider.name + status = "pre-syncing" + manager_q.put(("UPDATE", (provider.name, status, ctx))) try: for hook in provider.hooks: @@ -50,7 +50,9 @@ def run_job(sema, child_q, manager_q, provider, **settings): traceback.print_exc() status = "fail" else: + status = "syncing" for retry in range(max_retry): + manager_q.put(("UPDATE", (provider.name, status, ctx))) print("start syncing {}, retry: {}".format(provider.name, retry)) provider.run(ctx=ctx) @@ -78,7 +80,7 @@ def run_job(sema, child_q, manager_q, provider, **settings): provider.name, provider.interval )) - manager_q.put(("UPDATE", (provider.name, status))) + manager_q.put(("UPDATE", (provider.name, status, ctx))) try: msg = child_q.get(timeout=provider.interval * 60) diff --git a/tunasync/status_manager.py b/tunasync/status_manager.py index 62c09e8..4479141 100644 --- a/tunasync/status_manager.py +++ b/tunasync/status_manager.py @@ -32,8 +32,11 @@ class StatusManager(object): pass self.mirrors = mirrors + self.mirrors_ctx = {key: {} for key in self.mirrors} def get_info(self, name, key): + if key == "ctx": + return self.mirrors_ctx.get(name, {}) _m = self.mirrors.get(name, {}) return _m.get(key, None) @@ -50,7 +53,7 @@ class StatusManager(object): self.mirrors[name] = dict(_m.items()) self.commit_db() - def update_status(self, name, status): + def update_status(self, name, status, ctx={}): _m = self.mirrors.get(name, { 'name': name, @@ -68,6 +71,7 @@ class StatusManager(object): _m['last_update'] = update_time _m['status'] = status self.mirrors[name] = dict(_m.items()) + self.mirrors_ctx[name] = ctx self.commit_db() print("Updated status file, {}:{}".format(name, status)) diff --git a/tunasync/tunasync.py b/tunasync/tunasync.py index 13f40c3..93615fc 100644 --- a/tunasync/tunasync.py +++ b/tunasync/tunasync.py @@ -173,63 +173,99 @@ class TUNASync(object): def run_forever(self): while 1: - try: msg_hdr, msg_body = self.channel.get() except IOError: continue if msg_hdr == "UPDATE": - name, status = msg_body + mirror_name, status, ctx = msg_body try: - self.status_manager.update_status(name, status) + self.status_manager.update_status( + mirror_name, status, dict(ctx.items())) except Exception as e: print(e) elif msg_hdr == "CONFIG_ACK": - name, status = msg_body + mirror_name, status = msg_body if status == "QUIT": - print("New configuration applied to {}".format(name)) - self.run_provider(name) + print("New configuration applied to {}".format(mirror_name)) + self.run_provider(mirror_name) elif msg_hdr == "CMD": - cmd, name = msg_body - if (name not in self.mirrors) and (name != "__ALL__"): + cmd, mirror_name, kwargs = msg_body + if (mirror_name not in self.mirrors) and (mirror_name != "__ALL__"): self.ctrl_channel.put("Invalid target") continue - - if cmd == "restart": - _, p = self.processes[name] - p.terminate() - self.providers[name].set_delay(0) - self.run_provider(name) - res = "Restarted Job: {}".format(name) - - elif cmd == "stop": - if name not in self.processes: - res = "{} not running".format(name) - self.ctrl_channel.put(res) - continue - - _, p = self.processes.pop(name) - p.terminate() - res = "Stopped Job: {}".format(name) - - elif cmd == "start": - if name in self.processes: - res = "{} already running".format(name) - self.ctrl_channel.put(res) - continue - - self.run_provider(name) - res = "Started Job: {}".format(name) - elif cmd == "status": - if name == "__ALL__": - res = self.status_manager.list_status(_format=True) - else: - res = self.status_manager.get_status(name, _format=True) - else: - res = "Invalid command" - + res = self.handle_cmd(cmd, mirror_name, kwargs) self.ctrl_channel.put(res) + + def handle_cmd(self, cmd, mirror_name, kwargs): + if cmd == "restart": + _, p = self.processes[mirror_name] + p.terminate() + self.providers[mirror_name].set_delay(0) + self.run_provider(mirror_name) + res = "Restarted Job: {}".format(mirror_name) + + elif cmd == "stop": + if mirror_name not in self.processes: + res = "{} not running".format(mirror_name) + return res + + _, p = self.processes.pop(mirror_name) + p.terminate() + res = "Stopped Job: {}".format(mirror_name) + + elif cmd == "start": + if mirror_name in self.processes: + res = "{} already running".format(mirror_name) + return res + + self.run_provider(mirror_name) + res = "Started Job: {}".format(mirror_name) + + elif cmd == "status": + if mirror_name == "__ALL__": + res = self.status_manager.list_status(_format=True) + else: + res = self.status_manager.get_status(mirror_name, _format=True) + + elif cmd == "log": + job_ctx = self.status_manager.get_info(mirror_name, "ctx") + n = kwargs.get("n", 0) + if n == 0: + res = job_ctx.get("log_file", "/dev/null") + else: + import os + log_file = job_ctx.get("log_file", None) + if log_file is None: + return "/dev/null" + + log_dir = os.path.dirname(log_file) + lfiles = [ + os.path.join(log_dir, lfile) + for lfile in os.listdir(log_dir) + if lfile.startswith(mirror_name) and lfile != "latest" + ] + + if len(lfiles) <= n: + res = "Only {} log files available".format(len(lfiles)) + return res + + lfiles_set = set(lfiles) + # sort to get the newest 10 files + lfiles_ts = sorted( + [(os.path.getmtime(lfile), lfile) for lfile in lfiles], + key=lambda x: x[0], + reverse=True, + ) + return lfiles_ts[n][1] + + else: + res = "Invalid command" + + return res + + # vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasynctl.py b/tunasynctl.py index 4efe53f..faf44cc 100755 --- a/tunasynctl.py +++ b/tunasynctl.py @@ -10,22 +10,45 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(prog="tunasynctl") parser.add_argument("-s", "--socket", default="/var/run/tunasync.sock", help="socket file") - parser.add_argument("command", help="command") - parser.add_argument("target", nargs="?", default="__ALL__", help="target") - args = parser.parse_args() + subparsers = parser.add_subparsers(dest="command", help='sub-command help') + + sp = subparsers.add_parser('start', help="start job") + sp.add_argument("target", help="mirror job name") + + sp = subparsers.add_parser('stop', help="stop job") + sp.add_argument("target", help="mirror job name") + + sp = subparsers.add_parser('restart', help="restart job") + sp.add_argument("target", help="mirror job name") + + sp = subparsers.add_parser('status', help="show mirror status") + sp.add_argument("target", nargs="?", default="__ALL__", help="mirror job name") + + sp = subparsers.add_parser('log', help="return log file path") + sp.add_argument("-n", type=int, default=0, help="last n-th log, default 0 (latest)") + sp.add_argument("target", help="mirror job name") + + sp = subparsers.add_parser('help', help="show help message") + + args = vars(parser.parse_args()) + + if args['command'] == "help": + parser.print_help() + sys.exit(0) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: - sock.connect(args.socket) + sock.connect(args.pop("socket")) except socket.error as msg: print(msg) sys.exit(1) pack = json.dumps({ - 'cmd': args.command, - 'target': args.target, + "cmd": args.pop("command"), + "target": args.pop("target"), + "kwargs": args, }) try: