diff --git a/README.md b/README.md index 3907e28..9e62aa8 100644 --- a/README.md +++ b/README.md @@ -5,5 +5,5 @@ tunasync - [ ] status file - [x] btrfs backend (create snapshot before syncing) -- [ ] add mirror job online +- [x] add mirror job online - [ ] debmirror provider diff --git a/examples/tunasync.ini b/examples/tunasync.ini index 3ff9659..90a77d4 100644 --- a/examples/tunasync.ini +++ b/examples/tunasync.ini @@ -2,12 +2,13 @@ log_dir = /var/log/tunasync ; mirror_root = /srv/mirror_disk mirror_root = /mnt/sdb1/mirror -use_btrfs = yes +use_btrfs = no local_dir = {mirror_root}/{mirror_name}/_working ; maximum numbers of running jobs -concurrent = 3 +concurrent = 2 ; interval in minutes -interval = 120 +interval = 1 +max_retry = 2 [btrfs] service_dir = {mirror_root}/{mirror_name}/_current @@ -15,17 +16,17 @@ working_dir = {mirror_root}/{mirror_name}/_working tmp_dir = {mirror_root}/{mirror_name}/_tmp -[mirror:archlinux] -provider = rsync -upstream = rsync://mirror.us.leaseweb.net/archlinux/ -log_file = /tmp/archlinux-{date}.log -use_ipv6 = yes - -# [mirror:archlinux] -# provider = shell -# command = sleep 10 -# local_dir = /mnt/sdb1/mirror/archlinux/current/ +# rmirror:archlinux] +# provider = rsync +# upstream = rsync://mirror.us.leaseweb.net/archlinux/ # log_file = /tmp/archlinux-{date}.log +# use_ipv6 = yes + +[mirror:archlinux] +provider = shell +command = sleep 20 +local_dir = /mnt/sdb1/mirror/archlinux/current/ +log_file = /tmp/archlinux-{date}.log [mirror:arch2] @@ -40,3 +41,9 @@ provider = shell command = ./shell_provider.sh log_file = /tmp/arch3-{date}.log use_btrfs = no + +[mirror:arch4] +provider = shell +command = ./shell_provider.sh +log_file = /tmp/arch4-{date}.log +use_btrfs = no diff --git a/tunasync/btrfs_snapshot.py b/tunasync/btrfs_snapshot.py index 59e3ff6..a4a9787 100644 --- a/tunasync/btrfs_snapshot.py +++ b/tunasync/btrfs_snapshot.py @@ -2,13 +2,14 @@ # -*- coding:utf-8 -*- import sh import os +from .hook import JobHook class BtrfsVolumeError(Exception): pass -class BtrfsHook(object): +class BtrfsHook(JobHook): def __init__(self, service_dir, working_dir, tmp_dir): self.service_dir = service_dir diff --git a/tunasync/hook.py b/tunasync/hook.py new file mode 100644 index 0000000..adf6092 --- /dev/null +++ b/tunasync/hook.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- + + +class JobHook(object): + + def before_job(self): + raise NotImplementedError("") + + def after_job(self): + raise NotImplementedError("") + +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/jobs.py b/tunasync/jobs.py index 735ac6e..6668611 100644 --- a/tunasync/jobs.py +++ b/tunasync/jobs.py @@ -1,22 +1,49 @@ #!/usr/bin/env python2 # -*- coding:utf-8 -*- +import sys import time +import signal -def run_job(sema, provider): +def run_job(sema, child_q, manager_q, provider): + aquired = False + + def before_quit(*args): + provider.terminate() + if aquired: + print("{} release semaphore".format(provider.name)) + sema.release() + sys.exit(0) + + signal.signal(signal.SIGTERM, before_quit) + while 1: - sema.acquire(True) + try: + sema.acquire(True) + except: + break + aquired = True print("start syncing {}".format(provider.name)) for hook in provider.hooks: hook.before_job() provider.run() + provider.wait() for hook in provider.hooks[::-1]: hook.after_job() sema.release() + aquired = False + try: + msg = child_q.get(timeout=1) + if msg == "terminate": + manager_q.put((provider.name, "QUIT")) + break + except: + pass + print("syncing {} finished, sleep {} minutes for the next turn".format( provider.name, provider.interval )) diff --git a/tunasync/mirror_provider.py b/tunasync/mirror_provider.py index d8dba92..26be83d 100644 --- a/tunasync/mirror_provider.py +++ b/tunasync/mirror_provider.py @@ -17,10 +17,22 @@ class MirrorProvider(object): self.log_file = log_file self.interval = interval self.hooks = hooks + self.p = None def run(self): raise NotImplementedError("run method should be implemented") + def terminate(self): + if self.p is not None: + self.p.process.terminate() + print("{} terminated".format(self.name)) + self.p = None + + def wait(self): + if self.p is not None: + self.p.wait() + self.p = None + class RsyncProvider(MirrorProvider): @@ -60,7 +72,8 @@ class RsyncProvider(MirrorProvider): now = datetime.now().strftime("%Y-%m-%d_%H") log_file = self.log_file.format(date=now) - sh.rsync(*_args, _out=log_file, _err=log_file, _out_bufsize=1) + self.p = sh.rsync(*_args, _out=log_file, _err=log_file, + _out_bufsize=1, _bg=True) class ShellProvider(MirrorProvider): @@ -78,6 +91,7 @@ class ShellProvider(MirrorProvider): log_file = self.log_file.format(date=now) new_env = os.environ.copy() + new_env["TUNASYNC_MIRROR_NAME"] = self.name new_env["TUNASYNC_LOCAL_DIR"] = self.local_dir new_env["TUNASYNC_LOG_FILE"] = log_file @@ -85,7 +99,8 @@ class ShellProvider(MirrorProvider): _args = [] if len(self.command) == 1 else self.command[1:] cmd = sh.Command(_cmd) - cmd(*_args, _env=new_env, _out=log_file, _err=log_file, _out_bufsize=1) + self.p = cmd(*_args, _env=new_env, _out=log_file, + _err=log_file, _out_bufsize=1, _bg=True) # vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/tunasync.py b/tunasync/tunasync.py index 3e66f97..fbae449 100644 --- a/tunasync/tunasync.py +++ b/tunasync/tunasync.py @@ -3,8 +3,9 @@ import ConfigParser import os.path import signal +import sys -from multiprocessing import Process, Semaphore +from multiprocessing import Process, Semaphore, Queue from . import jobs from .mirror_provider import RsyncProvider, ShellProvider from .btrfs_snapshot import BtrfsHook @@ -61,6 +62,65 @@ class MirrorConfig(object): except ConfigParser.NoOptionError: self.options["use_btrfs"] = self._parent.use_btrfs + def __getattr__(self, key): + if key in self.__dict__: + return self.__dict__[key] + else: + return self.__dict__["options"].get(key, None) + + def to_provider(self, hooks=[]): + if self.provider == "rsync": + provider = RsyncProvider( + self.name, + self.upstream, + self.local_dir, + self.use_ipv6, + self.exclude_file, + self.log_file, + self.interval, + hooks, + ) + elif self.options["provider"] == "shell": + provider = ShellProvider( + self.name, + self.command, + self.local_dir, + self.log_file, + self.interval, + hooks + ) + + return provider + + def compare(self, other): + assert self.name == other.name + + for key, val in self.options.iteritems(): + if other.options.get(key, None) != val: + return False + + return True + + def hooks(self): + hooks = [] + parent = self._parent + if self.options["use_btrfs"]: + working_dir = parent.btrfs_working_dir_tmpl.format( + mirror_root=parent.mirror_root, + mirror_name=self.name + ) + service_dir = parent.btrfs_service_dir_tmpl.format( + mirror_root=parent.mirror_root, + mirror_name=self.name + ) + tmp_dir = parent.btrfs_tmp_dir_tmpl.format( + mirror_root=parent.mirror_root, + mirror_name=self.name + ) + hooks.append(BtrfsHook(service_dir, working_dir, tmp_dir)) + + return hooks + class TUNASync(object): @@ -75,14 +135,16 @@ class TUNASync(object): return cls._instance def read_config(self, config_file): + self._config_file = config_file self._settings = ConfigParser.ConfigParser() self._settings.read(config_file) self._inited = True - self._mirrors = [] - self._providers = [] - self.processes = [] + self._mirrors = {} + self._providers = {} + self.processes = {} self.semaphore = Semaphore(self._settings.getint("global", "concurrent")) + self.channel = Queue() self.mirror_root = self._settings.get("global", "mirror_root") self.use_btrfs = self._settings.getboolean("global", "use_btrfs") @@ -93,6 +155,9 @@ class TUNASync(object): self.btrfs_tmp_dir_tmpl = self._settings.get( "btrfs", "tmp_dir") + def hooks(self): + return [] + @property def mirrors(self): if self._mirrors: @@ -102,8 +167,8 @@ class TUNASync(object): self._settings.sections()): _, name = section.split(":") - self._mirrors.append( - MirrorConfig(self, name, self._settings, section)) + self._mirrors[name] = \ + MirrorConfig(self, name, self._settings, section) return self._mirrors @property @@ -111,64 +176,95 @@ class TUNASync(object): if self._providers: return self._providers - for mirror in self.mirrors: - hooks = [] - if mirror.options["use_btrfs"]: - working_dir = self.btrfs_working_dir_tmpl.format( - mirror_root=self.mirror_root, - mirror_name=mirror.name - ) - service_dir = self.btrfs_service_dir_tmpl.format( - mirror_root=self.mirror_root, - mirror_name=mirror.name - ) - tmp_dir = self.btrfs_tmp_dir_tmpl.format( - mirror_root=self.mirror_root, - mirror_name=mirror.name - ) - hooks.append(BtrfsHook(service_dir, working_dir, tmp_dir)) - - if mirror.options["provider"] == "rsync": - self._providers.append( - RsyncProvider( - mirror.name, - mirror.options["upstream"], - mirror.options["local_dir"], - mirror.options["use_ipv6"], - mirror.options.get("exclude_file", None), - mirror.options["log_file"], - mirror.options["interval"], - hooks, - ) - ) - elif mirror.options["provider"] == "shell": - self._providers.append( - ShellProvider( - mirror.name, - mirror.options["command"], - mirror.options["local_dir"], - mirror.options["log_file"], - mirror.options["interval"], - hooks, - ) - ) + for name, mirror in self.mirrors.iteritems(): + hooks = mirror.hooks() + self.hooks() + provider = mirror.to_provider(hooks) + self._providers[name] = provider return self._providers def run_jobs(self): - for provider in self.providers: - p = Process(target=jobs.run_job, args=(self.semaphore, provider, )) - p.start() - self.processes.append(p) + for name in self.providers: + self.run_provider(name) def sig_handler(*args): print("terminate subprocesses") - for p in self.processes: + for _, np in self.processes.iteritems(): + _, p = np p.terminate() print("Good Bye") + sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) + signal.signal(signal.SIGUSR1, self.reload_mirrors) + signal.signal(signal.SIGUSR2, self.reload_mirrors_force) + + while 1: + name, status = self.channel.get() + if status == "QUIT": + print("New configuration applied to {}".format(name)) + self.run_provider(name) + + def run_provider(self, name): + if name not in self.providers: + print("{} doesnot exist".format(name)) + return + + provider = self.providers[name] + child_queue = Queue() + p = Process( + target=jobs.run_job, + args=(self.semaphore, child_queue, self.channel, provider, ) + ) + p.start() + self.processes[name] = (child_queue, p) + + def reload_mirrors(self, signum, frame): + try: + return self._reload_mirrors(signum, frame, force=False) + except Exception, e: + print(e) + + def reload_mirrors_force(self, signum, frame): + try: + return self._reload_mirrors(signum, frame, force=True) + except Exception, e: + print(e) + + def _reload_mirrors(self, signum, frame, force=False): + print("reload mirror configs, force restart: {}".format(force)) + self._settings.read(self._config_file) + + for section in filter(lambda s: s.startswith("mirror:"), + self._settings.sections()): + + _, name = section.split(":") + newMirCfg = MirrorConfig(self, name, self._settings, section) + + if name in self._mirrors: + if newMirCfg.compare(self._mirrors[name]): + continue + + self._mirrors[name] = newMirCfg + + hooks = newMirCfg.hooks() + self.hooks() + newProvider = newMirCfg.to_provider(hooks) + self._providers[name] = newProvider + + if name in self.processes: + q, p = self.processes[name] + + if force: + p.terminate() + print("Terminated Job: {}".format(name)) + self.run_provider(name) + else: + q.put("terminate") + print("New configuration queued to {}".format(name)) + else: + print("New mirror: {}".format(name)) + self.run_provider(name) # def config(self, option): # if self._settings is None: