From 1ce19c88b13c59b0caa52fbdcdc42049992069c2 Mon Sep 17 00:00:00 2001 From: bigeagle Date: Fri, 17 Oct 2014 23:30:57 +0800 Subject: [PATCH] btrfs support --- README.md | 2 +- examples/tunasync.ini | 38 +++++++++++++++----------- tunasync/btrfs_snapshot.py | 50 ++++++++++++++++++++++++++++++++++ tunasync/jobs.py | 8 ++++++ tunasync/mirror_provider.py | 32 +++++++++++++--------- tunasync/tunasync.py | 54 +++++++++++++++++++++++++++++++------ 6 files changed, 147 insertions(+), 37 deletions(-) create mode 100644 tunasync/btrfs_snapshot.py diff --git a/README.md b/README.md index b816f43..2f165bb 100644 --- a/README.md +++ b/README.md @@ -4,5 +4,5 @@ tunasync ## TODO - [ ] status file -- [ ] btrfs backend (create snapshot before syncing) +- [x] btrfs backend (create snapshot before syncing) - [ ] debmirror provider diff --git a/examples/tunasync.ini b/examples/tunasync.ini index a23f6df..3ff9659 100644 --- a/examples/tunasync.ini +++ b/examples/tunasync.ini @@ -1,36 +1,42 @@ [global] log_dir = /var/log/tunasync -local_dir = /srv/mirror -storage_backend = btrfs +; mirror_root = /srv/mirror_disk +mirror_root = /mnt/sdb1/mirror +use_btrfs = yes +local_dir = {mirror_root}/{mirror_name}/_working ; maximum numbers of running jobs -concurrent = 2 +concurrent = 3 ; interval in minutes -interval = 1 +interval = 120 + +[btrfs] +service_dir = {mirror_root}/{mirror_name}/_current +working_dir = {mirror_root}/{mirror_name}/_working +tmp_dir = {mirror_root}/{mirror_name}/_tmp -# [mirror:archlinux] -# provider = rsync -# upstream = rsync://mirrors6.ustc.edu.cn/archlinux/ -# local_dir = /mnt/sdb1/mirror/archlinux/current/ -# log_file = /tmp/archlinux-{date}.log -# use_ipv6 = yes [mirror:archlinux] -provider = shell -command = sleep 10 -local_dir = /mnt/sdb1/mirror/archlinux/current/ +provider = rsync +upstream = rsync://mirror.us.leaseweb.net/archlinux/ log_file = /tmp/archlinux-{date}.log +use_ipv6 = yes + +# [mirror:archlinux] +# provider = shell +# command = sleep 10 +# local_dir = /mnt/sdb1/mirror/archlinux/current/ +# log_file = /tmp/archlinux-{date}.log [mirror:arch2] provider = shell command = sleep 5 -local_dir = /mnt/sdb1/mirror/archlinux/current/ log_file = /tmp/arch2-{date}.log +use_btrfs = no [mirror:arch3] provider = shell command = ./shell_provider.sh -local_dir = /mnt/sdb1/mirror/archlinux/current/ log_file = /tmp/arch3-{date}.log - +use_btrfs = no diff --git a/tunasync/btrfs_snapshot.py b/tunasync/btrfs_snapshot.py new file mode 100644 index 0000000..cc027d2 --- /dev/null +++ b/tunasync/btrfs_snapshot.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python2 +# -*- coding:utf-8 -*- +import sh + + +class BtrfsVolumeError(Exception): + pass + + +class BtrfsHook(object): + + def __init__(self, service_dir, working_dir, tmp_dir): + self.service_dir = service_dir + self.working_dir = working_dir + self.tmp_dir = tmp_dir + + def before_job(self): + self._create_working_snapshot() + + def after_job(self): + self._commit_changes() + + def _ensure_subvolume(self): + # print(self.service_dir) + try: + ret = sh.btrfs("subvolume", "show", self.service_dir) + except Exception, e: + print(e) + raise BtrfsVolumeError("Invalid subvolume") + + if ret.stderr != '': + raise BtrfsVolumeError("Invalid subvolume") + + def _create_working_snapshot(self): + self._ensure_subvolume() + # print("btrfs subvolume snapshot {} {}".format(self.service_dir, self.working_dir)) + sh.btrfs("subvolume", "snapshot", self.service_dir, self.working_dir) + + def _commit_changes(self): + self._ensure_subvolume() + self._ensure_subvolume() + out = sh.mv(self.service_dir, self.tmp_dir) + assert out.exit_code == 0 and out.stderr == "" + out = sh.mv(self.working_dir, self.service_dir) + assert out.exit_code == 0 and out.stderr == "" + # print("btrfs subvolume delete {}".format(self.tmp_dir)) + out = sh.btrfs("subvolume", "delete", self.tmp_dir) + assert out.exit_code == 0 and out.stderr == "" + +# vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/jobs.py b/tunasync/jobs.py index d5d3298..735ac6e 100644 --- a/tunasync/jobs.py +++ b/tunasync/jobs.py @@ -7,7 +7,15 @@ def run_job(sema, provider): while 1: sema.acquire(True) print("start syncing {}".format(provider.name)) + + for hook in provider.hooks: + hook.before_job() + provider.run() + + for hook in provider.hooks[::-1]: + hook.after_job() + sema.release() print("syncing {} finished, sleep {} minutes for the next turn".format( provider.name, provider.interval diff --git a/tunasync/mirror_provider.py b/tunasync/mirror_provider.py index cc1c0d2..d8dba92 100644 --- a/tunasync/mirror_provider.py +++ b/tunasync/mirror_provider.py @@ -10,6 +10,14 @@ class MirrorProvider(object): Mirror method class, can be `rsync', `debmirror', etc. ''' + def __init__(self, name, local_dir, log_file="/dev/null", + interval=120, hooks=[]): + self.name = name + self.local_dir = local_dir + self.log_file = log_file + self.interval = interval + self.hooks = hooks + def run(self): raise NotImplementedError("run method should be implemented") @@ -19,15 +27,14 @@ class RsyncProvider(MirrorProvider): _default_options = "-av --delete-after" def __init__(self, name, upstream_url, local_dir, useIPv6=True, - exclude_file=None, log_file="/dev/null", interval=120): + exclude_file=None, log_file="/dev/null", interval=120, + hooks=[]): + super(RsyncProvider, self).__init__(name, local_dir, log_file, + interval, hooks) - self.name = name self.upstream_url = upstream_url - self.local_dir = local_dir self.useIPv6 = useIPv6 self.exclude_file = exclude_file - self.log_file = log_file - self.interval = interval @property def options(self): @@ -46,26 +53,27 @@ class RsyncProvider(MirrorProvider): return _options def run(self): + _args = self.options _args.append(self.upstream_url) _args.append(self.local_dir) now = datetime.now().strftime("%Y-%m-%d_%H") log_file = self.log_file.format(date=now) - sh.rsync(*_args, _out=log_file, _err=log_file) + sh.rsync(*_args, _out=log_file, _err=log_file, _out_bufsize=1) class ShellProvider(MirrorProvider): def __init__(self, name, command, local_dir, - log_file="/dev/null", interval=120): - self.name = name + log_file="/dev/null", interval=120, hooks=[]): + + super(ShellProvider, self).__init__(name, local_dir, log_file, + interval, hooks) self.command = command.split() - self.local_dir = local_dir - self.log_file = log_file - self.interval = interval def run(self): + now = datetime.now().strftime("%Y-%m-%d_%H") log_file = self.log_file.format(date=now) @@ -77,7 +85,7 @@ class ShellProvider(MirrorProvider): _args = [] if len(self.command) == 1 else self.command[1:] cmd = sh.Command(_cmd) - cmd(*_args, _env=new_env, _out=log_file, _err=log_file) + cmd(*_args, _env=new_env, _out=log_file, _err=log_file, _out_bufsize=1) # vim: ts=4 sw=4 sts=4 expandtab diff --git a/tunasync/tunasync.py b/tunasync/tunasync.py index c2e9331..3e66f97 100644 --- a/tunasync/tunasync.py +++ b/tunasync/tunasync.py @@ -7,13 +7,15 @@ import signal from multiprocessing import Process, Semaphore from . import jobs from .mirror_provider import RsyncProvider, ShellProvider +from .btrfs_snapshot import BtrfsHook class MirrorConfig(object): _valid_providers = set(("rsync", "debmirror", "shell", )) - def __init__(self, name, cfgParser, section): + def __init__(self, parent, name, cfgParser, section): + self._parent = parent self._cp = cfgParser self._sec = section @@ -34,10 +36,13 @@ class MirrorConfig(object): elif provider == "shell": assert "command" in self.options - if "local_dir" not in self.options: - self.options["local_dir"] = os.path.join( - self._cp.get("global", "local_dir"), - self.name) + local_dir_tmpl = self.options.get( + "local_dir", self._cp.get("global", "local_dir")) + + self.options["local_dir"] = local_dir_tmpl.format( + mirror_root=self._cp.get("global", "mirror_root"), + mirror_name=self.name, + ) self.options["interval"] = int( self.options.get("interval", @@ -50,6 +55,12 @@ class MirrorConfig(object): os.path.join(log_dir, self.name, "{date}.log") ) + try: + self.options["use_btrfs"] = self._cp.getboolean( + self._sec, "use_btrfs") + except ConfigParser.NoOptionError: + self.options["use_btrfs"] = self._parent.use_btrfs + class TUNASync(object): @@ -73,6 +84,15 @@ class TUNASync(object): self.processes = [] self.semaphore = Semaphore(self._settings.getint("global", "concurrent")) + self.mirror_root = self._settings.get("global", "mirror_root") + self.use_btrfs = self._settings.getboolean("global", "use_btrfs") + self.btrfs_service_dir_tmpl = self._settings.get( + "btrfs", "service_dir") + self.btrfs_working_dir_tmpl = self._settings.get( + "btrfs", "working_dir") + self.btrfs_tmp_dir_tmpl = self._settings.get( + "btrfs", "tmp_dir") + @property def mirrors(self): if self._mirrors: @@ -83,7 +103,7 @@ class TUNASync(object): _, name = section.split(":") self._mirrors.append( - MirrorConfig(name, self._settings, section)) + MirrorConfig(self, name, self._settings, section)) return self._mirrors @property @@ -92,6 +112,22 @@ class TUNASync(object): return self._providers for mirror in self.mirrors: + hooks = [] + if mirror.options["use_btrfs"]: + working_dir = self.btrfs_working_dir_tmpl.format( + mirror_root=self.mirror_root, + mirror_name=mirror.name + ) + service_dir = self.btrfs_service_dir_tmpl.format( + mirror_root=self.mirror_root, + mirror_name=mirror.name + ) + tmp_dir = self.btrfs_tmp_dir_tmpl.format( + mirror_root=self.mirror_root, + mirror_name=mirror.name + ) + hooks.append(BtrfsHook(service_dir, working_dir, tmp_dir)) + if mirror.options["provider"] == "rsync": self._providers.append( RsyncProvider( @@ -101,7 +137,8 @@ class TUNASync(object): mirror.options["use_ipv6"], mirror.options.get("exclude_file", None), mirror.options["log_file"], - mirror.options["interval"] + mirror.options["interval"], + hooks, ) ) elif mirror.options["provider"] == "shell": @@ -111,7 +148,8 @@ class TUNASync(object): mirror.options["command"], mirror.options["local_dir"], mirror.options["log_file"], - mirror.options["interval"] + mirror.options["interval"], + hooks, ) )