add status manager

This commit is contained in:
bigeagle 2014-10-25 19:19:04 +08:00
parent 54e6d5b34a
commit 5725113a91
6 changed files with 198 additions and 190 deletions

View File

@ -4,6 +4,7 @@ log_dir = "/var/log/tunasync"
mirror_root = "/mnt/sdb1/mirror"
use_btrfs = false
local_dir = "{mirror_root}/_working/{mirror_name}/"
status_file = "/tmp/tunasync.json"
# maximum numbers of running jobs
concurrent = 2
# interval in minutes
@ -16,19 +17,26 @@ working_dir = "{mirror_root}/_working/{mirror_name}"
gc_root = "{mirror_root}/_garbage/"
gc_dir = "{mirror_root}/_garbage/_gc_{mirror_name}_{{timestamp}}"
# [[mirrors]]
# name = "archlinux"
# provider = "rsync"
# upstream = "rsync://mirror.us.leaseweb.net/archlinux/"
# log_file = "/tmp/archlinux-{date}.log"
# use_ipv6 = true
[[mirrors]]
name = "archlinux"
provider = "rsync"
upstream = "rsync://mirror.us.leaseweb.net/archlinux/"
log_file = "/tmp/archlinux-{date}.log"
use_ipv6 = true
name = "arch1"
provider = "shell"
command = "sleep 10"
local_dir = "/mnt/sdb1/mirror/archlinux/current/"
log_file = "/dev/null"
[[mirrors]]
name = "arch2"
provider = "shell"
command = "sleep 20"
local_dir = "/mnt/sdb1/mirror/archlinux/current/"
log_file = "/tmp/arch2-{date}.log"
log_file = "/dev/null"
[[mirrors]]
name = "arch4"

View File

@ -2,79 +2,8 @@
# -*- coding:utf-8 -*-
import os
import argparse
import json
from datetime import datetime
from tunasync import TUNASync
from tunasync.hook import JobHook
class IndexPageHook(JobHook):
def __init__(self, parent, dbfile):
self.parent = parent
self.dbfile = dbfile
@property
def mirrors(self):
mirrors = {}
try:
with open(self.dbfile) as f:
_mirrors = json.load(f)
for m in _mirrors:
mirrors[m["name"]] = m
except:
for name, _ in self.parent.mirrors.iteritems():
mirrors[name] = {
'name': name,
'last_update': '-',
'status': 'unknown',
}
return mirrors
def before_job(self, name=None, *args, **kwargs):
if name is None:
return
mirrors = self.mirrors
_m = mirrors.get(name, {
'name': name,
'last_update': '-',
'status': '-',
})
mirrors[name] = {
'name': name,
'last_update': _m['last_update'],
'status': 'syncing'
}
with open(self.dbfile, 'wb') as f:
_mirrors = sorted(
[m for _, m in mirrors.items()],
key=lambda x: x['name']
)
json.dump(_mirrors, f)
def after_job(self, name=None, status="unknown", *args, **kwargs):
if name is None:
return
print("Updating tunasync.json")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
mirrors = self.mirrors
mirrors[name] = {
'name': name,
'last_update': now,
'status': status
}
with open(self.dbfile, 'wb') as f:
_mirrors = sorted(
[m for _, m in mirrors.items()],
key=lambda x: x['name']
)
json.dump(_mirrors, f)
if __name__ == "__main__":
@ -83,9 +12,6 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="tunasync")
parser.add_argument("-c", "--config",
default="tunasync.ini", help="config file")
parser.add_argument("--dbfile",
default="tunasync.json",
help="mirror status db file")
parser.add_argument("--pidfile", default="/var/run/tunasync.pid",
help="pidfile")
@ -97,10 +23,6 @@ if __name__ == "__main__":
tunaSync = TUNASync()
tunaSync.read_config(args.config)
index_hook = IndexPageHook(tunaSync, args.dbfile)
tunaSync.add_hook(index_hook)
tunaSync.run_jobs()
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -28,7 +28,8 @@ def run_job(sema, child_q, manager_q, provider, **settings):
break
aquired = True
status = "unkown"
status = "syncing"
manager_q.put((provider.name, status))
try:
for hook in provider.hooks:
hook.before_job(name=provider.name)
@ -65,6 +66,8 @@ def run_job(sema, child_q, manager_q, provider, **settings):
provider.name, provider.interval
))
manager_q.put((provider.name, status))
try:
msg = child_q.get(timeout=provider.interval * 60)
if msg == "terminate":

109
tunasync/mirror_config.py Normal file
View File

@ -0,0 +1,109 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import os
from .mirror_provider import RsyncProvider, ShellProvider
from .btrfs_snapshot import BtrfsHook
class MirrorConfig(object):
_valid_providers = set(("rsync", "debmirror", "shell", ))
def __init__(self, parent, options):
self._parent = parent
self._popt = self._parent._settings
self.options = dict(options.items()) # copy
self._validate()
def _validate(self):
provider = self.options.get("provider", None)
assert provider in self._valid_providers
if provider == "rsync":
assert "upstream" in self.options
elif provider == "shell":
assert "command" in self.options
local_dir_tmpl = self.options.get(
"local_dir", self._popt["global"]["local_dir"])
self.options["local_dir"] = local_dir_tmpl.format(
mirror_root=self._popt["global"]["mirror_root"],
mirror_name=self.name,
)
if "interval" not in self.options:
self.options["interval"] = self._popt["global"]["interval"]
assert isinstance(self.options["interval"], int)
log_dir = self._popt["global"]["log_dir"]
if "log_file" not in self.options:
self.options["log_file"] = os.path.join(
log_dir, self.name, "{date}.log")
if "use_btrfs" not in self.options:
self.options["use_btrfs"] = self._parent.use_btrfs
assert self.options["use_btrfs"] in (True, False)
def __getattr__(self, key):
if key in self.__dict__:
return self.__dict__[key]
else:
return self.__dict__["options"].get(key, None)
def to_provider(self, hooks=[]):
if self.provider == "rsync":
provider = RsyncProvider(
self.name,
self.upstream,
self.local_dir,
self.use_ipv6,
self.exclude_file,
self.log_file,
self.interval,
hooks,
)
elif self.options["provider"] == "shell":
provider = ShellProvider(
self.name,
self.command,
self.local_dir,
self.log_file,
self.interval,
hooks
)
return provider
def compare(self, other):
assert self.name == other.name
for key, val in self.options.iteritems():
if other.options.get(key, None) != val:
return False
return True
def hooks(self):
hooks = []
parent = self._parent
if self.options["use_btrfs"]:
working_dir = parent.btrfs_working_dir_tmpl.format(
mirror_root=parent.mirror_root,
mirror_name=self.name
)
service_dir = parent.btrfs_service_dir_tmpl.format(
mirror_root=parent.mirror_root,
mirror_name=self.name
)
gc_dir = parent.btrfs_gc_dir_tmpl.format(
mirror_root=parent.mirror_root,
mirror_name=self.name
)
hooks.append(BtrfsHook(service_dir, working_dir, gc_dir))
return hooks
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -0,0 +1,61 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import json
from datetime import datetime
class StatusManager(object):
def __init__(self, parent, dbfile):
self.parent = parent
self.dbfile = dbfile
self.init_mirrors()
def init_mirrors(self):
mirrors = {}
try:
with open(self.dbfile) as f:
_mirrors = json.load(f)
for m in _mirrors:
mirrors[m["name"]] = m
except:
for name, _ in self.parent.mirrors.iteritems():
mirrors[name] = {
'name': name,
'last_update': '-',
'status': 'unknown',
}
self.mirrors = mirrors
def update_status(self, name, status):
_m = self.mirrors.get(name, {
'name': name,
'last_update': '-',
'status': '-',
})
if status in ("syncing", "fail"):
update_time = _m["last_update"]
elif status == "success":
update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
print("Invalid status: {}, from {}".format(status, name))
self.mirrors[name] = {
'name': name,
'last_update': update_time,
'status': status,
}
with open(self.dbfile, 'wb') as f:
_mirrors = sorted(
[m for _, m in self.mirrors.items()],
key=lambda x: x['name']
)
print("Updated status file, {}:{}".format(name, status))
json.dump(_mirrors, f)
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,117 +1,14 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import os.path
import signal
import sys
import toml
from multiprocessing import Process, Semaphore, Queue
from . import jobs
from .mirror_provider import RsyncProvider, ShellProvider
from .btrfs_snapshot import BtrfsHook
from .hook import JobHook
class MirrorConfig(object):
_valid_providers = set(("rsync", "debmirror", "shell", ))
def __init__(self, parent, options):
self._parent = parent
self._popt = self._parent._settings
self.options = dict(options.items()) # copy
self._validate()
def _validate(self):
provider = self.options.get("provider", None)
assert provider in self._valid_providers
if provider == "rsync":
assert "upstream" in self.options
elif provider == "shell":
assert "command" in self.options
local_dir_tmpl = self.options.get(
"local_dir", self._popt["global"]["local_dir"])
self.options["local_dir"] = local_dir_tmpl.format(
mirror_root=self._popt["global"]["mirror_root"],
mirror_name=self.name,
)
if "interval" not in self.options:
self.options["interval"] = self._popt["global"]["interval"]
assert isinstance(self.options["interval"], int)
log_dir = self._popt["global"]["log_dir"]
if "log_file" not in self.options:
self.options["log_file"] = os.path.join(
log_dir, self.name, "{date}.log")
if "use_btrfs" not in self.options:
self.options["use_btrfs"] = self._parent.use_btrfs
assert self.options["use_btrfs"] in (True, False)
def __getattr__(self, key):
if key in self.__dict__:
return self.__dict__[key]
else:
return self.__dict__["options"].get(key, None)
def to_provider(self, hooks=[]):
if self.provider == "rsync":
provider = RsyncProvider(
self.name,
self.upstream,
self.local_dir,
self.use_ipv6,
self.exclude_file,
self.log_file,
self.interval,
hooks,
)
elif self.options["provider"] == "shell":
provider = ShellProvider(
self.name,
self.command,
self.local_dir,
self.log_file,
self.interval,
hooks
)
return provider
def compare(self, other):
assert self.name == other.name
for key, val in self.options.iteritems():
if other.options.get(key, None) != val:
return False
return True
def hooks(self):
hooks = []
parent = self._parent
if self.options["use_btrfs"]:
working_dir = parent.btrfs_working_dir_tmpl.format(
mirror_root=parent.mirror_root,
mirror_name=self.name
)
service_dir = parent.btrfs_service_dir_tmpl.format(
mirror_root=parent.mirror_root,
mirror_name=self.name
)
gc_dir = parent.btrfs_gc_dir_tmpl.format(
mirror_root=parent.mirror_root,
mirror_name=self.name
)
hooks.append(BtrfsHook(service_dir, working_dir, gc_dir))
return hooks
from .mirror_config import MirrorConfig
from .status_manager import StatusManager
class TUNASync(object):
@ -146,6 +43,9 @@ class TUNASync(object):
self.btrfs_working_dir_tmpl = self._settings["btrfs"]["working_dir"]
self.btrfs_gc_dir_tmpl = self._settings["btrfs"]["gc_dir"]
self.status_file = self._settings["global"]["status_file"]
self.status_manager = StatusManager(self, self.status_file)
def add_hook(self, h):
assert isinstance(h, JobHook)
self._hooks.append(h)
@ -203,6 +103,11 @@ class TUNASync(object):
if status == "QUIT":
print("New configuration applied to {}".format(name))
self.run_provider(name)
else:
try:
self.status_manager.update_status(name, status)
except Exception as e:
print(e)
def run_provider(self, name):
if name not in self.providers: