diff --git a/tunasync/__init__.py b/tunasync/__init__.py deleted file mode 100644 index c869f43..0000000 --- a/tunasync/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -from .tunasync import TUNASync -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/btrfs_snapshot.py b/tunasync/btrfs_snapshot.py deleted file mode 100644 index 163e0c7..0000000 --- a/tunasync/btrfs_snapshot.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -import sh -import os -from datetime import datetime -from .hook import JobHook - - -class BtrfsVolumeError(Exception): - pass - - -class BtrfsHook(JobHook): - - def __init__(self, service_dir, working_dir, gc_dir): - self.service_dir = service_dir - self.working_dir = working_dir - self.gc_dir = gc_dir - - def before_job(self, ctx={}, *args, **kwargs): - self._create_working_snapshot() - ctx['current_dir'] = self.working_dir - - def after_job(self, status=None, ctx={}, *args, **kwargs): - if status == "success": - self._commit_changes() - ctx['current_dir'] = self.service_dir - - def _ensure_subvolume(self): - # print(self.service_dir) - try: - ret = sh.btrfs("subvolume", "show", self.service_dir) - except Exception, e: - print(e) - raise BtrfsVolumeError("Invalid subvolume") - - if ret.stderr != '': - raise BtrfsVolumeError("Invalid subvolume") - - def _create_working_snapshot(self): - self._ensure_subvolume() - if os.path.exists(self.working_dir): - print("Warning: working dir existed, are you sure no rsync job is running?") - else: - # print("btrfs subvolume snapshot {} {}".format(self.service_dir, self.working_dir)) - sh.btrfs("subvolume", "snapshot", self.service_dir, self.working_dir) - - def _commit_changes(self): - self._ensure_subvolume() - self._ensure_subvolume() - gc_dir = self.gc_dir.format(timestamp=datetime.now().strftime("%s")) - - out = sh.mv(self.service_dir, gc_dir) - assert out.exit_code == 0 and out.stderr == "" - out = sh.mv(self.working_dir, self.service_dir) - assert out.exit_code == 0 and out.stderr == "" - # print("btrfs subvolume delete {}".format(self.tmp_dir)) - # sh.sleep(3) - # out = sh.btrfs("subvolume", "delete", self.tmp_dir) - # assert out.exit_code == 0 and out.stderr == "" - -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/clt_server.py b/tunasync/clt_server.py deleted file mode 100644 index 7a815f8..0000000 --- a/tunasync/clt_server.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -import socket -import os -import json -import struct - - -class ControlServer(object): - - valid_commands = set(( - "start", "stop", "restart", "status", "log", - )) - - def __init__(self, address, mgr_chan, cld_chan): - self.address = address - self.mgr_chan = mgr_chan - self.cld_chan = cld_chan - try: - os.unlink(self.address) - except OSError: - if os.path.exists(self.address): - raise Exception("file exists: {}".format(self.address)) - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.sock.bind(self.address) - os.chmod(address, 0o700) - - print("Control Server listening on: {}".format(self.address)) - self.sock.listen(1) - - def serve_forever(self): - while 1: - conn, _ = self.sock.accept() - - try: - length = struct.unpack('!H', conn.recv(2))[0] - content = conn.recv(length) - cmd = json.loads(content) - if cmd['cmd'] not in self.valid_commands: - raise Exception("Invalid Command") - self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target'], cmd["kwargs"]))) - except Exception as e: - print(e) - res = "Invalid Command" - else: - res = self.cld_chan.get() - - conn.sendall(struct.pack('!H', len(res))) - conn.sendall(res) - conn.close() - - -def run_control_server(address, mgr_chan, cld_chan): - cs = ControlServer(address, mgr_chan, cld_chan) - cs.serve_forever() - -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/exec_pre_post.py b/tunasync/exec_pre_post.py deleted file mode 100644 index 1ade5ad..0000000 --- a/tunasync/exec_pre_post.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -import os -import sh -import shlex -from .hook import JobHook - - -class CmdExecHook(JobHook): - POST_SYNC = "post_sync" - PRE_SYNC = "pre_sync" - - def __init__(self, command, exec_at=POST_SYNC): - self.command = shlex.split(command) - if exec_at == self.POST_SYNC: - self.before_job = self._keep_calm - self.after_job = self._exec - elif exec_at == self.PRE_SYNC: - self.before_job = self._exec - self.after_job = self._keep_calm - - def _keep_calm(self, ctx={}, **kwargs): - pass - - def _exec(self, ctx={}, **kwargs): - new_env = os.environ.copy() - new_env["TUNASYNC_MIRROR_NAME"] = ctx["mirror_name"] - new_env["TUNASYNC_WORKING_DIR"] = ctx["current_dir"] - new_env["TUNASYNC_JOB_EXIT_STATUS"] = kwargs.get("status", "") - - _cmd = self.command[0] - _args = [] if len(self.command) == 1 else self.command[1:] - cmd = sh.Command(_cmd) - cmd(*_args, _env=new_env) - -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/hook.py b/tunasync/hook.py deleted file mode 100644 index 3f31c30..0000000 --- a/tunasync/hook.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- - - -class JobHook(object): - - def before_job(self, *args, **kwargs): - raise NotImplementedError("") - - def after_job(self, *args, **kwargs): - raise NotImplementedError("") - - def before_exec(self, *args, **kwargs): - pass - - def after_exec(self, *args, **kwargs): - pass - -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/jobs.py b/tunasync/jobs.py deleted file mode 100644 index e45d041..0000000 --- a/tunasync/jobs.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -import sh -import sys -from setproctitle import setproctitle -import signal -import Queue -import traceback - - -def run_job(sema, child_q, manager_q, provider, **settings): - aquired = False - setproctitle("tunasync-{}".format(provider.name)) - - def before_quit(*args): - provider.terminate() - if aquired: - print("{} release semaphore".format(provider.name)) - sema.release() - sys.exit(0) - - def sleep_wait(timeout): - try: - msg = child_q.get(timeout=timeout) - if msg == "terminate": - manager_q.put(("CONFIG_ACK", (provider.name, "QUIT"))) - return True - except Queue.Empty: - return False - - signal.signal(signal.SIGTERM, before_quit) - - if provider.delay > 0: - if sleep_wait(provider.delay): - return - - max_retry = settings.get("max_retry", 1) - - def _real_run(idx=0, stage="job_hook", ctx=None): - """\ - 4 stages: - 0 -> job_hook, 1 -> set_retry, 2 -> exec_hook, 3 -> exec - """ - - assert(ctx is not None) - - if stage == "exec": - # exec_job - try: - provider.run(ctx=ctx) - provider.wait() - except sh.ErrorReturnCode: - status = "fail" - else: - status = "success" - return status - - elif stage == "set_retry": - # enter stage 3 with retry - for retry in range(max_retry): - status = "syncing" - manager_q.put(("UPDATE", (provider.name, status, ctx))) - print("start syncing {}, retry: {}".format(provider.name, retry)) - status = _real_run(idx=0, stage="exec_hook", ctx=ctx) - if status == "success": - break - return status - - # job_hooks - elif stage == "job_hook": - if idx == len(provider.hooks): - return _real_run(idx=idx, stage="set_retry", ctx=ctx) - hook = provider.hooks[idx] - hook_before, hook_after = hook.before_job, hook.after_job - status = "pre-syncing" - - elif stage == "exec_hook": - if idx == len(provider.hooks): - return _real_run(idx=idx, stage="exec", ctx=ctx) - hook = provider.hooks[idx] - hook_before, hook_after = hook.before_exec, hook.after_exec - status = "syncing" - - try: - # print("%s run before_%s, %d" % (provider.name, stage, idx)) - hook_before(provider=provider, ctx=ctx) - status = _real_run(idx=idx+1, stage=stage, ctx=ctx) - except Exception: - traceback.print_exc() - status = "fail" - finally: - # print("%s run after_%s, %d" % (provider.name, stage, idx)) - # job may break when syncing - if status != "success": - status = "fail" - try: - hook_after(provider=provider, status=status, ctx=ctx) - except Exception: - traceback.print_exc() - - return status - - while 1: - try: - sema.acquire(True) - except: - break - aquired = True - - ctx = {} # put context info in it - ctx['current_dir'] = provider.local_dir - ctx['mirror_name'] = provider.name - status = "pre-syncing" - manager_q.put(("UPDATE", (provider.name, status, ctx))) - - try: - status = _real_run(idx=0, stage="job_hook", ctx=ctx) - except Exception: - traceback.print_exc() - status = "fail" - finally: - sema.release() - aquired = False - - print("syncing {} finished, sleep {} minutes for the next turn".format( - provider.name, provider.interval - )) - - manager_q.put(("UPDATE", (provider.name, status, ctx))) - - if sleep_wait(timeout=provider.interval * 60): - break - - -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/loglimit.py b/tunasync/loglimit.py deleted file mode 100644 index 053c63a..0000000 --- a/tunasync/loglimit.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -import sh -import os -from .hook import JobHook -from datetime import datetime - - -class LogLimitHook(JobHook): - - def __init__(self, limit=10): - self.limit = limit - - def before_job(self, *args, **kwargs): - pass - - def after_job(self, *args, **kwargs): - pass - - def before_exec(self, provider, ctx={}, *args, **kwargs): - log_dir = provider.log_dir - self.ensure_log_dir(log_dir) - log_file = provider.log_file.format( - date=datetime.now().strftime("%Y-%m-%d_%H-%M")) - ctx['log_file'] = log_file - if log_file == "/dev/null": - return - - log_link = os.path.join(log_dir, "latest") - ctx['log_link'] = log_link - - lfiles = [os.path.join(log_dir, lfile) - for lfile in os.listdir(log_dir) - if lfile.startswith(provider.name)] - - lfiles_set = set(lfiles) - # sort to get the newest 10 files - lfiles_ts = sorted( - [(os.path.getmtime(lfile), lfile) for lfile in lfiles], - key=lambda x: x[0], - reverse=True) - lfiles_keep = set([x[1] for x in lfiles_ts[:self.limit]]) - lfiles_rm = lfiles_set - lfiles_keep - # remove old files - for lfile in lfiles_rm: - try: - sh.rm(lfile) - except: - pass - - # create a soft link - self.create_link(log_link, log_file) - - def after_exec(self, status=None, ctx={}, *args, **kwargs): - log_file = ctx.get('log_file', None) - log_link = ctx.get('log_link', None) - if log_file == "/dev/null": - return - if status == "fail": - log_file_save = log_file + ".fail" - try: - sh.mv(log_file, log_file_save) - except: - pass - self.create_link(log_link, log_file_save) - - def ensure_log_dir(self, log_dir): - if not os.path.exists(log_dir): - sh.mkdir("-p", log_dir) - - def create_link(self, log_link, log_file): - if log_link == log_file: - return - if not (log_link and log_file): - return - - if os.path.lexists(log_link): - try: - sh.rm(log_link) - except: - return - try: - sh.ln('-s', log_file, log_link) - except: - return - - -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/mirror_config.py b/tunasync/mirror_config.py deleted file mode 100644 index 8f57198..0000000 --- a/tunasync/mirror_config.py +++ /dev/null @@ -1,156 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -import os -from datetime import datetime -from .mirror_provider import RsyncProvider, TwoStageRsyncProvider, ShellProvider -from .btrfs_snapshot import BtrfsHook -from .loglimit import LogLimitHook -from .exec_pre_post import CmdExecHook - - -class MirrorConfig(object): - - _valid_providers = set(("rsync", "two-stage-rsync", "shell", )) - - def __init__(self, parent, options): - self._parent = parent - self._popt = self._parent._settings - self.options = dict(options.items()) # copy - self._validate() - - def _validate(self): - provider = self.options.get("provider", None) - assert provider in self._valid_providers - - if provider == "rsync": - assert "upstream" in self.options - - elif provider == "shell": - assert "command" in self.options - - local_dir_tmpl = self.options.get( - "local_dir", self._popt["global"]["local_dir"]) - - self.options["local_dir"] = local_dir_tmpl.format( - mirror_root=self._popt["global"]["mirror_root"], - mirror_name=self.name, - ) - - if "interval" not in self.options: - self.options["interval"] = self._popt["global"]["interval"] - - assert isinstance(self.options["interval"], int) - - log_dir = self.options.get( - "log_dir", self._popt["global"]["log_dir"]) - if "log_file" not in self.options: - self.options["log_file"] = os.path.join( - log_dir, self.name, self.name + "_{date}.log") - - self.log_dir = os.path.dirname(self.log_file) - - if "use_btrfs" not in self.options: - self.options["use_btrfs"] = self._parent.use_btrfs - assert self.options["use_btrfs"] in (True, False) - - if "env" in self.options: - assert isinstance(self.options["env"], dict) - - def __getattr__(self, key): - if key in self.__dict__: - return self.__dict__[key] - else: - return self.__dict__["options"].get(key, None) - - def to_provider(self, hooks=[], no_delay=False): - - kwargs = { - 'name': self.name, - 'upstream_url': self.upstream, - 'local_dir': self.local_dir, - 'log_dir': self.log_dir, - 'log_file': self.log_file, - 'interval': self.interval, - 'env': self.env, - 'hooks': hooks, - } - - if self.provider == "rsync": - kwargs.update({ - 'useIPv6': self.use_ipv6, - 'password': self.password, - 'exclude_file': self.exclude_file, - }) - provider = RsyncProvider(**kwargs) - - elif self.provider == "two-stage-rsync": - kwargs.update({ - 'useIPv6': self.use_ipv6, - 'password': self.password, - 'exclude_file': self.exclude_file, - }) - provider = TwoStageRsyncProvider(**kwargs) - provider.set_stage1_profile(self.stage1_profile) - - elif self.options["provider"] == "shell": - kwargs.update({ - 'command': self.command, - 'log_stdout': self.options.get("log_stdout", True), - }) - - provider = ShellProvider(**kwargs) - - if not no_delay: - sm = self._parent.status_manager - last_update = sm.get_info(self.name, 'last_update') - if last_update not in (None, '-'): - last_update = datetime.strptime( - last_update, '%Y-%m-%d %H:%M:%S') - delay = int(last_update.strftime("%s")) \ - + self.interval * 60 - int(datetime.now().strftime("%s")) - if delay < 0: - delay = 0 - provider.set_delay(delay) - - return provider - - def compare(self, other): - assert self.name == other.name - - for key, val in self.options.iteritems(): - if other.options.get(key, None) != val: - return False - - return True - - def hooks(self): - hooks = [] - parent = self._parent - if self.options["use_btrfs"]: - working_dir = parent.btrfs_working_dir_tmpl.format( - mirror_root=parent.mirror_root, - mirror_name=self.name - ) - service_dir = parent.btrfs_service_dir_tmpl.format( - mirror_root=parent.mirror_root, - mirror_name=self.name - ) - gc_dir = parent.btrfs_gc_dir_tmpl.format( - mirror_root=parent.mirror_root, - mirror_name=self.name - ) - hooks.append(BtrfsHook(service_dir, working_dir, gc_dir)) - - hooks.append(LogLimitHook()) - - if self.exec_pre_sync: - hooks.append( - CmdExecHook(self.exec_pre_sync, CmdExecHook.PRE_SYNC)) - - if self.exec_post_sync: - hooks.append( - CmdExecHook(self.exec_post_sync, CmdExecHook.POST_SYNC)) - - return hooks - -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/mirror_provider.py b/tunasync/mirror_provider.py deleted file mode 100644 index ec093be..0000000 --- a/tunasync/mirror_provider.py +++ /dev/null @@ -1,226 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -import sh -import os -import shlex -from datetime import datetime - - -class MirrorProvider(object): - ''' - Mirror method class, can be `rsync', `debmirror', etc. - ''' - - def __init__(self, name, local_dir, log_dir, log_file="/dev/null", - interval=120, hooks=[]): - self.name = name - self.local_dir = local_dir - self.log_file = log_file - self.log_dir = log_dir - self.interval = interval - self.hooks = hooks - self.p = None - self.delay = 0 - - # deprecated - def ensure_log_dir(self): - log_dir = os.path.dirname(self.log_file) - if not os.path.exists(log_dir): - sh.mkdir("-p", log_dir) - - def get_log_file(self, ctx={}): - if 'log_file' in ctx: - log_file = ctx['log_file'] - else: - now = datetime.now().strftime("%Y-%m-%d_%H") - log_file = self.log_file.format(date=now) - ctx['log_file'] = log_file - return log_file - - def set_delay(self, sec): - ''' Set start delay ''' - self.delay = sec - - def run(self, ctx={}): - raise NotImplementedError("run method should be implemented") - - def terminate(self): - if self.p is not None: - self.p.process.terminate() - print("{} terminated".format(self.name)) - self.p = None - - def wait(self): - if self.p is not None: - self.p.wait() - self.p = None - - -class RsyncProvider(MirrorProvider): - - _default_options = ['-aHvh', '--no-o', '--no-g', '--stats', - '--exclude', '.~tmp~/', - '--delete', '--delete-after', '--delay-updates', - '--safe-links', '--timeout=120', '--contimeout=120'] - - def __init__(self, name, upstream_url, local_dir, log_dir, - useIPv6=True, password=None, exclude_file=None, - log_file="/dev/null", interval=120, env=None, hooks=[]): - super(RsyncProvider, self).__init__(name, local_dir, log_dir, log_file, - interval, hooks) - - self.upstream_url = upstream_url - self.useIPv6 = useIPv6 - self.exclude_file = exclude_file - self.password = password - self.env = env - - @property - def options(self): - - _options = [o for o in self._default_options] # copy - - if self.useIPv6: - _options.append("-6") - - if self.exclude_file: - _options.append("--exclude-from") - _options.append(self.exclude_file) - - return _options - - def run(self, ctx={}): - _args = self.options - _args.append(self.upstream_url) - - working_dir = ctx.get("current_dir", self.local_dir) - _args.append(working_dir) - - log_file = self.get_log_file(ctx) - new_env = os.environ.copy() - if self.password is not None: - new_env["RSYNC_PASSWORD"] = self.password - if self.env is not None and isinstance(self.env, dict): - for k, v in self.env.items(): - new_env[k] = v - - self.p = sh.rsync(*_args, _env=new_env, _out=log_file, - _err_to_out=True, _out_bufsize=1, _bg=True) - - -class TwoStageRsyncProvider(RsyncProvider): - - _stage1_options = ['-aHvh', '--no-o', '--no-g', - '--exclude', '.~tmp~/', - '--safe-links', '--timeout=120', '--contimeout=120'] - - _stage2_options = ['-aHvh', '--no-o', '--no-g', '--stats', - '--exclude', '.~tmp~/', - '--delete', '--delete-after', '--delay-updates', - '--safe-links', '--timeout=120', '--contimeout=120'] - - _stage1_profiles = { - "debian": [ - 'dists/', - ], - "debian-oldstyle": [ - 'Packages*', 'Sources*', 'Release*', - 'InRelease', 'i18n/*', 'ls-lR*', 'dep11/*', - ] - } - - def set_stage1_profile(self, profile): - if profile not in self._stage1_profiles: - raise Exception("Profile Undefined: %s, %s" % (profile, self.name)) - - self._stage1_excludes = self._stage1_profiles[profile] - - def options(self, stage): - _default_options = self._stage1_options \ - if stage == 1 else self._stage2_options - _options = [o for o in _default_options] # copy - - if stage == 1: - for _exc in self._stage1_excludes: - _options.append("--exclude") - _options.append(_exc) - - if self.useIPv6: - _options.append("-6") - - if self.exclude_file: - _options.append("--exclude-from") - _options.append(self.exclude_file) - - return _options - - def run(self, ctx={}): - working_dir = ctx.get("current_dir", self.local_dir) - log_file = self.get_log_file(ctx) - new_env = os.environ.copy() - if self.password is not None: - new_env["RSYNC_PASSWORD"] = self.password - if self.env is not None and isinstance(self.env, dict): - for k, v in self.env.items(): - new_env[k] = v - - with open(log_file, 'w', buffering=1) as f: - def log_output(line): - f.write(line) - - for stage in (1, 2): - - _args = self.options(stage) - _args.append(self.upstream_url) - _args.append(working_dir) - f.write("==== Stage {} Begins ====\n\n".format(stage)) - - self.p = sh.rsync( - *_args, _env=new_env, _out=log_output, - _err_to_out=True, _out_bufsize=1, _bg=False - ) - self.p.wait() - - -class ShellProvider(MirrorProvider): - - def __init__(self, name, command, upstream_url, local_dir, log_dir, - log_file="/dev/null", log_stdout=True, interval=120, env=None, - hooks=[]): - - super(ShellProvider, self).__init__(name, local_dir, log_dir, log_file, - interval, hooks) - self.upstream_url = str(upstream_url) - self.command = shlex.split(command) - self.log_stdout = log_stdout - self.env = env - - def run(self, ctx={}): - - log_file = self.get_log_file(ctx) - - new_env = os.environ.copy() - new_env["TUNASYNC_MIRROR_NAME"] = self.name - new_env["TUNASYNC_LOCAL_DIR"] = self.local_dir - new_env["TUNASYNC_WORKING_DIR"] = ctx.get("current_dir", self.local_dir) - new_env["TUNASYNC_UPSTREAM_URL"] = self.upstream_url - new_env["TUNASYNC_LOG_FILE"] = log_file - - if self.env is not None and isinstance(self.env, dict): - for k, v in self.env.items(): - new_env[k] = v - - _cmd = self.command[0] - _args = [] if len(self.command) == 1 else self.command[1:] - - cmd = sh.Command(_cmd) - - if self.log_stdout: - self.p = cmd(*_args, _env=new_env, _out=log_file, - _err_to_out=True, _out_bufsize=1, _bg=True) - else: - self.p = cmd(*_args, _env=new_env, _out='/dev/null', - _err='/dev/null', _out_bufsize=1, _bg=True) - - -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/status_manager.py b/tunasync/status_manager.py deleted file mode 100644 index 46bad12..0000000 --- a/tunasync/status_manager.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -import json -from datetime import datetime - - -class StatusManager(object): - - def __init__(self, parent, dbfile): - self.parent = parent - self.dbfile = dbfile - self.init_mirrors() - - def init_mirrors(self): - mirrors = {} - for name, cfg in self.parent.mirrors.iteritems(): - mirrors[name] = { - 'name': name, - 'last_update': '-', - 'status': 'unknown', - 'upstream': cfg.upstream or '-', - } - - try: - with open(self.dbfile) as f: - _mirrors = json.load(f) - for m in _mirrors: - name = m["name"] - mirrors[name]["last_update"] = m["last_update"] - mirrors[name]["status"] = m["status"] - except: - pass - - self.mirrors = mirrors - self.mirrors_ctx = {key: {} for key in self.mirrors} - - def get_info(self, name, key): - if key == "ctx": - return self.mirrors_ctx.get(name, {}) - _m = self.mirrors.get(name, {}) - return _m.get(key, None) - - def refresh_mirror(self, name): - cfg = self.parent.mirrors.get(name, None) - if cfg is None: - return - _m = self.mirrors.get(name, { - 'name': name, - 'last_update': '-', - 'status': '-', - }) - _m['upstream'] = cfg.upstream or '-' - self.mirrors[name] = dict(_m.items()) - self.commit_db() - - def update_status(self, name, status, ctx={}): - - _m = self.mirrors.get(name, { - 'name': name, - 'last_update': '-', - 'status': '-', - }) - - if status in ("syncing", "fail", "pre-syncing"): - update_time = _m["last_update"] - elif status == "success": - update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - else: - print("Invalid status: {}, from {}".format(status, name)) - - _m['last_update'] = update_time - _m['status'] = status - self.mirrors[name] = dict(_m.items()) - self.mirrors_ctx[name] = ctx - - self.commit_db() - print("Updated status file, {}:{}".format(name, status)) - - def list_status(self, _format=False): - _mirrors = sorted( - [m for _, m in self.mirrors.items()], - key=lambda x: x['name'] - ) - if not _format: - return _mirrors - - name_len = max([len(_m['name']) for _m in _mirrors]) - update_len = max([len(_m['last_update']) for _m in _mirrors]) - status_len = max([len(_m['status']) for _m in _mirrors]) - heading = ' '.join([ - 'name'.ljust(name_len), - 'last update'.ljust(update_len), - 'status'.ljust(status_len) - ]) - line = ' '.join(['-'*name_len, '-'*update_len, '-'*status_len]) - tabular = '\n'.join( - [ - ' '.join( - (_m['name'].ljust(name_len), - _m['last_update'].ljust(update_len), - _m['status'].ljust(status_len)) - ) for _m in _mirrors - ] - ) - return '\n'.join((heading, line, tabular)) - - def get_status(self, name, _format=False): - if name not in self.mirrors: - return None - - mir = self.mirrors[name] - if not _format: - return mir - - tmpl = "{name} last_update: {last_update} status: {status}" - return tmpl.format(**mir) - - def commit_db(self): - with open(self.dbfile, 'wb') as f: - _mirrors = self.list_status() - json.dump(_mirrors, f, indent=2, separators=(',', ':')) - -# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/tunasync.py b/tunasync/tunasync.py deleted file mode 100644 index 5078cb3..0000000 --- a/tunasync/tunasync.py +++ /dev/null @@ -1,279 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding:utf-8 -*- -import signal -import sys -import toml - -from multiprocessing import Process, Semaphore, Queue -from . import jobs -from .hook import JobHook -from .mirror_config import MirrorConfig -from .status_manager import StatusManager -from .clt_server import run_control_server - - -class TUNASync(object): - - _instance = None - _settings = None - _inited = False - - def __new__(cls, *args, **kwargs): - if not cls._instance: - cls._instance = super(TUNASync, cls).__new__(cls, *args, **kwargs) - - return cls._instance - - def read_config(self, config_file): - self._config_file = config_file - with open(self._config_file) as f: - self._settings = toml.loads(f.read()) - - self._inited = True - self._mirrors = {} - self._providers = {} - self.processes = {} - self.semaphore = Semaphore(self._settings["global"]["concurrent"]) - self.channel = Queue() - self._hooks = [] - - self.mirror_root = self._settings["global"]["mirror_root"] - - self.use_btrfs = self._settings["global"]["use_btrfs"] - self.btrfs_service_dir_tmpl = self._settings["btrfs"]["service_dir"] - self.btrfs_working_dir_tmpl = self._settings["btrfs"]["working_dir"] - self.btrfs_gc_dir_tmpl = self._settings["btrfs"]["gc_dir"] - - self.status_file = self._settings["global"]["status_file"] - self.status_manager = StatusManager(self, self.status_file) - - self.ctrl_addr = self._settings["global"]["ctrl_addr"] - self.ctrl_channel = Queue() - p = Process( - target=run_control_server, - args=(self.ctrl_addr, self.channel, self.ctrl_channel), - ) - p.start() - self.processes["CTRL_SERVER"] = (self.ctrl_channel, p) - - def add_hook(self, h): - assert isinstance(h, JobHook) - self._hooks.append(h) - - def hooks(self): - return self._hooks - - @property - def mirrors(self): - if self._mirrors: - return self._mirrors - - for mirror_opt in self._settings["mirrors"]: - name = mirror_opt["name"] - self._mirrors[name] = \ - MirrorConfig(self, mirror_opt) - - return self._mirrors - - @property - def providers(self): - if self._providers: - return self._providers - - for name, mirror in self.mirrors.iteritems(): - hooks = mirror.hooks() + self.hooks() - provider = mirror.to_provider(hooks, no_delay=mirror.no_delay) - self._providers[name] = provider - - return self._providers - - def run_jobs(self): - for name in self.providers: - self.run_provider(name) - - def sig_handler(*args): - print("terminate subprocesses") - for _, np in self.processes.iteritems(): - _, p = np - p.terminate() - print("Good Bye") - sys.exit(0) - - signal.signal(signal.SIGINT, sig_handler) - signal.signal(signal.SIGTERM, sig_handler) - signal.signal(signal.SIGUSR1, self.reload_mirrors) - signal.signal(signal.SIGUSR2, self.reload_mirrors_force) - - self.run_forever() - - def run_provider(self, name): - if name not in self.providers: - print("{} doesnot exist".format(name)) - return - - provider = self.providers[name] - child_queue = Queue() - p = Process( - target=jobs.run_job, - args=(self.semaphore, child_queue, self.channel, provider, ), - kwargs={ - 'max_retry': self._settings['global']['max_retry']} - ) - p.start() - provider.set_delay(0) # clear delay after first start - self.processes[name] = (child_queue, p) - - def reload_mirrors(self, signum, frame): - try: - return self._reload_mirrors(signum, frame, force=False) - except Exception as e: - print(e) - - def reload_mirrors_force(self, signum, frame): - try: - return self._reload_mirrors(signum, frame, force=True) - except Exception as e: - print(e) - - def _reload_mirrors(self, signum, frame, force=False): - print("reload mirror configs, force restart: {}".format(force)) - - with open(self._config_file) as f: - self._settings = toml.loads(f.read()) - - for mirror_opt in self._settings["mirrors"]: - name = mirror_opt["name"] - newMirCfg = MirrorConfig(self, mirror_opt) - - if name in self._mirrors: - if newMirCfg.compare(self._mirrors[name]): - continue - - self._mirrors[name] = newMirCfg - - hooks = newMirCfg.hooks() + self.hooks() - newProvider = newMirCfg.to_provider(hooks, no_delay=True) - self._providers[name] = newProvider - - if name in self.processes: - q, p = self.processes[name] - - if force: - p.terminate() - print("Terminated Job: {}".format(name)) - self.run_provider(name) - else: - q.put("terminate") - print("New configuration queued to {}".format(name)) - else: - print("New mirror: {}".format(name)) - self.run_provider(name) - - self.status_manager.refresh_mirror(name) - - def run_forever(self): - while 1: - try: - msg_hdr, msg_body = self.channel.get() - except IOError: - continue - - if msg_hdr == "UPDATE": - mirror_name, status, ctx = msg_body - try: - self.status_manager.update_status( - mirror_name, status, dict(ctx.items())) - except Exception as e: - print(e) - - elif msg_hdr == "CONFIG_ACK": - mirror_name, status = msg_body - if status == "QUIT": - print("New configuration applied to {}".format(mirror_name)) - self.run_provider(mirror_name) - - elif msg_hdr == "CMD": - cmd, mirror_name, kwargs = msg_body - if (mirror_name not in self.mirrors) and (mirror_name != "__ALL__"): - self.ctrl_channel.put("Invalid target") - continue - res = self.handle_cmd(cmd, mirror_name, kwargs) - self.ctrl_channel.put(res) - - def handle_cmd(self, cmd, mirror_name, kwargs): - if cmd == "restart": - if mirror_name not in self.providers: - res = "Invalid job: {}".format(mirror_name) - return res - - if mirror_name in self.processes: - _, p = self.processes[mirror_name] - p.terminate() - self.providers[mirror_name].set_delay(0) - self.run_provider(mirror_name) - res = "Restarted Job: {}".format(mirror_name) - - elif cmd == "stop": - if mirror_name not in self.processes: - res = "{} not running".format(mirror_name) - return res - - _, p = self.processes.pop(mirror_name) - p.terminate() - res = "Stopped Job: {}".format(mirror_name) - - elif cmd == "start": - if mirror_name in self.processes: - res = "{} already running".format(mirror_name) - return res - - self.run_provider(mirror_name) - res = "Started Job: {}".format(mirror_name) - - elif cmd == "status": - if mirror_name == "__ALL__": - res = self.status_manager.list_status(_format=True) - else: - res = self.status_manager.get_status(mirror_name, _format=True) - - elif cmd == "log": - job_ctx = self.status_manager.get_info(mirror_name, "ctx") - n = kwargs.get("n", 0) - if n == 0: - res = job_ctx.get( - "log_link", - job_ctx.get("log_file", "/dev/null"), - ) - else: - import os - log_file = job_ctx.get("log_file", None) - if log_file is None: - return "/dev/null" - - log_dir = os.path.dirname(log_file) - lfiles = [ - os.path.join(log_dir, lfile) - for lfile in os.listdir(log_dir) - if lfile.startswith(mirror_name) and lfile != "latest" - ] - - if len(lfiles) <= n: - res = "Only {} log files available".format(len(lfiles)) - return res - - lfiles_set = set(lfiles) - # sort to get the newest 10 files - lfiles_ts = sorted( - [(os.path.getmtime(lfile), lfile) for lfile in lfiles_set], - key=lambda x: x[0], - reverse=True, - ) - return lfiles_ts[n][1] - - else: - res = "Invalid command" - - return res - - -# vim: ts=4 sw=4 sts=4 expandtab