From 65fd5537615e06467dacbd49f3adcbe6c0b49b2c Mon Sep 17 00:00:00 2001 From: bigeagle Date: Fri, 17 Oct 2014 20:25:27 +0800 Subject: [PATCH] first working --- README.md | 6 ++ examples/shell_provider.sh | 4 + examples/tunasync.ini | 36 +++++++++ tunasync.py | 22 ++++++ tunasync/__init__.py | 4 + tunasync/jobs.py | 18 +++++ tunasync/mirror_provider.py | 83 +++++++++++++++++++++ tunasync/tunasync.py | 141 ++++++++++++++++++++++++++++++++++++ 8 files changed, 314 insertions(+) create mode 100755 examples/shell_provider.sh create mode 100644 examples/tunasync.ini create mode 100644 tunasync.py create mode 100644 tunasync/__init__.py create mode 100644 tunasync/jobs.py create mode 100644 tunasync/mirror_provider.py create mode 100644 tunasync/tunasync.py diff --git a/README.md b/README.md index 1514485..b816f43 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,8 @@ tunasync ======== + +## TODO + +- [ ] status file +- [ ] btrfs backend (create snapshot before syncing) +- [ ] debmirror provider diff --git a/examples/shell_provider.sh b/examples/shell_provider.sh new file mode 100755 index 0000000..cf328f1 --- /dev/null +++ b/examples/shell_provider.sh @@ -0,0 +1,4 @@ +#!/bin/bash +echo $TUNASYNC_LOCAL_DIR +echo $TUNASYNC_LOG_FILE +sleep 5 diff --git a/examples/tunasync.ini b/examples/tunasync.ini new file mode 100644 index 0000000..a23f6df --- /dev/null +++ b/examples/tunasync.ini @@ -0,0 +1,36 @@ +[global] +log_dir = /var/log/tunasync +local_dir = /srv/mirror +storage_backend = btrfs +; maximum numbers of running jobs +concurrent = 2 +; interval in minutes +interval = 1 + +# [mirror:archlinux] +# provider = rsync +# upstream = rsync://mirrors6.ustc.edu.cn/archlinux/ +# local_dir = /mnt/sdb1/mirror/archlinux/current/ +# log_file = /tmp/archlinux-{date}.log +# use_ipv6 = yes + +[mirror:archlinux] +provider = shell +command = sleep 10 +local_dir = /mnt/sdb1/mirror/archlinux/current/ +log_file = /tmp/archlinux-{date}.log + + +[mirror:arch2] +provider = shell +command = sleep 5 +local_dir = /mnt/sdb1/mirror/archlinux/current/ +log_file = /tmp/arch2-{date}.log + + +[mirror:arch3] +provider = shell +command = ./shell_provider.sh +local_dir = /mnt/sdb1/mirror/archlinux/current/ +log_file = /tmp/arch3-{date}.log + diff --git a/tunasync.py b/tunasync.py new file mode 100644 index 0000000..0be5e40 --- /dev/null +++ b/tunasync.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import os +import argparse +from tunasync import TUNASync + +if __name__ == "__main__": + parser = argparse.ArgumentParser(prog="tunasync") + parser.add_argument("-c", "--config", default="tunasync.ini", help="config file") + parser.add_argument("--pidfile", default="/var/run/tunasync.pid", help="pidfile") + + args = parser.parse_args() + + with open(args.pidfile, 'w') as f: + f.write("{}".format(os.getpid())) + + tunaSync = TUNASync() + tunaSync.read_config(args.config) + + tunaSync.run_jobs() + +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/__init__.py b/tunasync/__init__.py new file mode 100644 index 0000000..c869f43 --- /dev/null +++ b/tunasync/__init__.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +from .tunasync import TUNASync +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/jobs.py b/tunasync/jobs.py new file mode 100644 index 0000000..d5d3298 --- /dev/null +++ b/tunasync/jobs.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import time + + +def run_job(sema, provider): + while 1: + sema.acquire(True) + print("start syncing {}".format(provider.name)) + provider.run() + sema.release() + print("syncing {} finished, sleep {} minutes for the next turn".format( + provider.name, provider.interval + )) + time.sleep(provider.interval * 60) + + +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/mirror_provider.py b/tunasync/mirror_provider.py new file mode 100644 index 0000000..cc1c0d2 --- /dev/null +++ b/tunasync/mirror_provider.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import sh +import os +from datetime import datetime + + +class MirrorProvider(object): + ''' + Mirror method class, can be `rsync', `debmirror', etc. + ''' + + def run(self): + raise NotImplementedError("run method should be implemented") + + +class RsyncProvider(MirrorProvider): + + _default_options = "-av --delete-after" + + def __init__(self, name, upstream_url, local_dir, useIPv6=True, + exclude_file=None, log_file="/dev/null", interval=120): + + self.name = name + self.upstream_url = upstream_url + self.local_dir = local_dir + self.useIPv6 = useIPv6 + self.exclude_file = exclude_file + self.log_file = log_file + self.interval = interval + + @property + def options(self): + + _options = self._default_options.split() + + if self.useIPv6: + _options.append("-6") + else: + _options.append("-4") + + if self.exclude_file: + _options.append("--exclude-from") + _options.append(self.exclude_file) + + return _options + + def run(self): + _args = self.options + _args.append(self.upstream_url) + _args.append(self.local_dir) + now = datetime.now().strftime("%Y-%m-%d_%H") + log_file = self.log_file.format(date=now) + + sh.rsync(*_args, _out=log_file, _err=log_file) + + +class ShellProvider(MirrorProvider): + + def __init__(self, name, command, local_dir, + log_file="/dev/null", interval=120): + self.name = name + self.command = command.split() + self.local_dir = local_dir + self.log_file = log_file + self.interval = interval + + def run(self): + now = datetime.now().strftime("%Y-%m-%d_%H") + log_file = self.log_file.format(date=now) + + new_env = os.environ.copy() + new_env["TUNASYNC_LOCAL_DIR"] = self.local_dir + new_env["TUNASYNC_LOG_FILE"] = log_file + + _cmd = self.command[0] + _args = [] if len(self.command) == 1 else self.command[1:] + + cmd = sh.Command(_cmd) + cmd(*_args, _env=new_env, _out=log_file, _err=log_file) + + +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/tunasync.py b/tunasync/tunasync.py new file mode 100644 index 0000000..c2e9331 --- /dev/null +++ b/tunasync/tunasync.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import ConfigParser +import os.path +import signal + +from multiprocessing import Process, Semaphore +from . import jobs +from .mirror_provider import RsyncProvider, ShellProvider + + +class MirrorConfig(object): + + _valid_providers = set(("rsync", "debmirror", "shell", )) + + def __init__(self, name, cfgParser, section): + self._cp = cfgParser + self._sec = section + + self.name = name + self.options = dict(self._cp.items(self._sec)) + self._validate() + + def _validate(self): + provider = self.options.get("provider", None) + assert provider in self._valid_providers + + if provider == "rsync": + assert "upstream" in self.options + if "use_ipv6" in self.options: + self.options["use_ipv6"] = self._cp.getboolean(self._sec, + "use_ipv6") + + elif provider == "shell": + assert "command" in self.options + + if "local_dir" not in self.options: + self.options["local_dir"] = os.path.join( + self._cp.get("global", "local_dir"), + self.name) + + self.options["interval"] = int( + self.options.get("interval", + self._cp.getint("global", "interval")) + ) + + log_dir = self._cp.get("global", "log_dir") + self.options["log_file"] = self.options.get( + "log_file", + os.path.join(log_dir, self.name, "{date}.log") + ) + + +class TUNASync(object): + + _instance = None + _settings = None + _inited = False + + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super(TUNASync, cls).__new__(cls, *args, **kwargs) + + return cls._instance + + def read_config(self, config_file): + self._settings = ConfigParser.ConfigParser() + self._settings.read(config_file) + + self._inited = True + self._mirrors = [] + self._providers = [] + self.processes = [] + self.semaphore = Semaphore(self._settings.getint("global", "concurrent")) + + @property + def mirrors(self): + if self._mirrors: + return self._mirrors + + for section in filter(lambda s: s.startswith("mirror:"), + self._settings.sections()): + + _, name = section.split(":") + self._mirrors.append( + MirrorConfig(name, self._settings, section)) + return self._mirrors + + @property + def providers(self): + if self._providers: + return self._providers + + for mirror in self.mirrors: + if mirror.options["provider"] == "rsync": + self._providers.append( + RsyncProvider( + mirror.name, + mirror.options["upstream"], + mirror.options["local_dir"], + mirror.options["use_ipv6"], + mirror.options.get("exclude_file", None), + mirror.options["log_file"], + mirror.options["interval"] + ) + ) + elif mirror.options["provider"] == "shell": + self._providers.append( + ShellProvider( + mirror.name, + mirror.options["command"], + mirror.options["local_dir"], + mirror.options["log_file"], + mirror.options["interval"] + ) + ) + + return self._providers + + def run_jobs(self): + for provider in self.providers: + p = Process(target=jobs.run_job, args=(self.semaphore, provider, )) + p.start() + self.processes.append(p) + + def sig_handler(*args): + print("terminate subprocesses") + for p in self.processes: + p.terminate() + print("Good Bye") + + signal.signal(signal.SIGINT, sig_handler) + signal.signal(signal.SIGTERM, sig_handler) + + # def config(self, option): + # if self._settings is None: + # raise TUNASyncException("Config not inited") + # + + +# vim: ts=4 sw=4 sts=4 expandtab