diff --git a/examples/tunasync.conf b/examples/tunasync.conf index 191a7ce..cf94012 100644 --- a/examples/tunasync.conf +++ b/examples/tunasync.conf @@ -10,6 +10,7 @@ concurrent = 2 # interval in minutes interval = 1 max_retry = 2 +ctrl_addr = "/tmp/tunasync.sock" [btrfs] service_dir = "{mirror_root}/_current/{mirror_name}" diff --git a/tunasync/clt_server.py b/tunasync/clt_server.py new file mode 100644 index 0000000..6f95380 --- /dev/null +++ b/tunasync/clt_server.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import socket +import os +import json + + +class ControlServer(object): + + def __init__(self, address, mgr_chan, cld_chan): + self.address = address + self.mgr_chan = mgr_chan + self.cld_chan = cld_chan + try: + os.unlink(self.address) + except OSError: + if os.path.exists(self.address): + raise Exception("file exists: {}".format(self.address)) + + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.bind(self.address) + + print("Control Server listening on: {}".format(self.address)) + self.sock.listen(1) + + def serve_forever(self): + while 1: + conn, _ = self.sock.accept() + + try: + length = ord(conn.recv(1)) + content = conn.recv(length) + cmd = json.loads(content) + self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target']))) + except Exception as e: + print(e) + res = "Invalid Command" + else: + res = self.cld_chan.get() + + conn.sendall(chr(len(res)) + res) + conn.close() + + +def run_control_server(address, mgr_chan, cld_chan): + cs = ControlServer(address, mgr_chan, cld_chan) + cs.serve_forever() + +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/jobs.py b/tunasync/jobs.py index 92daf3d..88b51ae 100644 --- a/tunasync/jobs.py +++ b/tunasync/jobs.py @@ -29,7 +29,7 @@ def run_job(sema, child_q, manager_q, provider, **settings): aquired = True status = "syncing" - manager_q.put((provider.name, status)) + manager_q.put(("UPDATE", (provider.name, status))) try: for hook in provider.hooks: hook.before_job(name=provider.name) @@ -66,12 +66,12 @@ def run_job(sema, child_q, manager_q, provider, **settings): provider.name, provider.interval )) - manager_q.put((provider.name, status)) + manager_q.put(("UPDATE", (provider.name, status))) try: msg = child_q.get(timeout=provider.interval * 60) if msg == "terminate": - manager_q.put((provider.name, "QUIT")) + manager_q.put(("CONFIG_ACK", (provider.name, "QUIT"))) break except Queue.Empty: pass diff --git a/tunasync/tunasync.py b/tunasync/tunasync.py index 4916ebf..6a3bc37 100644 --- a/tunasync/tunasync.py +++ b/tunasync/tunasync.py @@ -9,6 +9,7 @@ from . import jobs from .hook import JobHook from .mirror_config import MirrorConfig from .status_manager import StatusManager +from .clt_server import run_control_server class TUNASync(object): @@ -46,6 +47,15 @@ class TUNASync(object): self.status_file = self._settings["global"]["status_file"] self.status_manager = StatusManager(self, self.status_file) + self.ctrl_addr = self._settings["global"]["ctrl_addr"] + self.ctrl_channel = Queue() + p = Process( + target=run_control_server, + args=(self.ctrl_addr, self.channel, self.ctrl_channel), + ) + p.start() + self.processes["CTRL_SERVER"] = (self.ctrl_channel, p) + def add_hook(self, h): assert isinstance(h, JobHook) self._hooks.append(h) @@ -94,20 +104,7 @@ class TUNASync(object): signal.signal(signal.SIGUSR1, self.reload_mirrors) signal.signal(signal.SIGUSR2, self.reload_mirrors_force) - while 1: - try: - name, status = self.channel.get() - except IOError: - continue - - if status == "QUIT": - print("New configuration applied to {}".format(name)) - self.run_provider(name) - else: - try: - self.status_manager.update_status(name, status) - except Exception as e: - print(e) + self.run_forever() def run_provider(self, name): if name not in self.providers: @@ -170,5 +167,59 @@ class TUNASync(object): print("New mirror: {}".format(name)) self.run_provider(name) + def run_forever(self): + while 1: + try: + msg_hdr, msg_body = self.channel.get() + except IOError: + continue + + if msg_hdr == "UPDATE": + name, status = msg_body + try: + self.status_manager.update_status(name, status) + except Exception as e: + print(e) + + elif msg_hdr == "CONFIG_ACK": + name, status = msg_body + if status == "QUIT": + print("New configuration applied to {}".format(name)) + self.run_provider(name) + + elif msg_hdr == "CMD": + cmd, name = msg_body + if name not in self.mirrors: + self.ctrl_channel.put("Invalid target") + continue + + if cmd == "restart": + _, p = self.processes[name] + p.terminate() + self.run_provider(name) + res = "Restarted Job: {}".format(name) + + elif cmd == "stop": + if name not in self.processes: + res = "{} not running".format(name) + self.ctrl_channel.put(res) + continue + + _, p = self.processes.pop(name) + p.terminate() + res = "Stopped Job: {}".format(name) + + elif cmd == "start": + if name in self.processes: + res = "{} already running".format(name) + self.ctrl_channel.put(res) + continue + + self.run_provider(name) + res = "Started Job: {}".format(name) + else: + res = "Invalid command" + + self.ctrl_channel.put(res) # vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasynctl.py b/tunasynctl.py new file mode 100755 index 0000000..352f119 --- /dev/null +++ b/tunasynctl.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import sys +import socket +import argparse +import json + +if __name__ == "__main__": + parser = argparse.ArgumentParser(prog="tunasynctl") + parser.add_argument("-s", "--socket", + default="/var/run/tunasync.sock", help="socket file") + parser.add_argument("command", help="command") + parser.add_argument("target", help="target") + + args = parser.parse_args() + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + try: + sock.connect(args.socket) + except socket.error as msg: + print(msg) + sys.exit(1) + + pack = json.dumps({ + 'cmd': args.command, + 'target': args.target, + }) + + try: + sock.sendall(chr(len(pack)) + pack) + length = ord(sock.recv(1)) + print(sock.recv(length)) + + except Exception as e: + print(e) + finally: + sock.close() + +# vim: ts=4 sw=4 sts=4 expandtab