refactor: goodbye, python2.7

This commit is contained in:
bigeagle 2016-04-29 21:48:46 +08:00
parent bd423eec4e
commit 4b52b9b53b
No known key found for this signature in database
GPG Key ID: 9171A4571C27920A
11 changed files with 0 additions and 1185 deletions

View File

@ -1,4 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
from .tunasync import TUNASync
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,62 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import sh
import os
from datetime import datetime
from .hook import JobHook
class BtrfsVolumeError(Exception):
pass
class BtrfsHook(JobHook):
def __init__(self, service_dir, working_dir, gc_dir):
self.service_dir = service_dir
self.working_dir = working_dir
self.gc_dir = gc_dir
def before_job(self, ctx={}, *args, **kwargs):
self._create_working_snapshot()
ctx['current_dir'] = self.working_dir
def after_job(self, status=None, ctx={}, *args, **kwargs):
if status == "success":
self._commit_changes()
ctx['current_dir'] = self.service_dir
def _ensure_subvolume(self):
# print(self.service_dir)
try:
ret = sh.btrfs("subvolume", "show", self.service_dir)
except Exception, e:
print(e)
raise BtrfsVolumeError("Invalid subvolume")
if ret.stderr != '':
raise BtrfsVolumeError("Invalid subvolume")
def _create_working_snapshot(self):
self._ensure_subvolume()
if os.path.exists(self.working_dir):
print("Warning: working dir existed, are you sure no rsync job is running?")
else:
# print("btrfs subvolume snapshot {} {}".format(self.service_dir, self.working_dir))
sh.btrfs("subvolume", "snapshot", self.service_dir, self.working_dir)
def _commit_changes(self):
self._ensure_subvolume()
self._ensure_subvolume()
gc_dir = self.gc_dir.format(timestamp=datetime.now().strftime("%s"))
out = sh.mv(self.service_dir, gc_dir)
assert out.exit_code == 0 and out.stderr == ""
out = sh.mv(self.working_dir, self.service_dir)
assert out.exit_code == 0 and out.stderr == ""
# print("btrfs subvolume delete {}".format(self.tmp_dir))
# sh.sleep(3)
# out = sh.btrfs("subvolume", "delete", self.tmp_dir)
# assert out.exit_code == 0 and out.stderr == ""
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,57 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import socket
import os
import json
import struct
class ControlServer(object):
valid_commands = set((
"start", "stop", "restart", "status", "log",
))
def __init__(self, address, mgr_chan, cld_chan):
self.address = address
self.mgr_chan = mgr_chan
self.cld_chan = cld_chan
try:
os.unlink(self.address)
except OSError:
if os.path.exists(self.address):
raise Exception("file exists: {}".format(self.address))
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.bind(self.address)
os.chmod(address, 0o700)
print("Control Server listening on: {}".format(self.address))
self.sock.listen(1)
def serve_forever(self):
while 1:
conn, _ = self.sock.accept()
try:
length = struct.unpack('!H', conn.recv(2))[0]
content = conn.recv(length)
cmd = json.loads(content)
if cmd['cmd'] not in self.valid_commands:
raise Exception("Invalid Command")
self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target'], cmd["kwargs"])))
except Exception as e:
print(e)
res = "Invalid Command"
else:
res = self.cld_chan.get()
conn.sendall(struct.pack('!H', len(res)))
conn.sendall(res)
conn.close()
def run_control_server(address, mgr_chan, cld_chan):
cs = ControlServer(address, mgr_chan, cld_chan)
cs.serve_forever()
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,36 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import os
import sh
import shlex
from .hook import JobHook
class CmdExecHook(JobHook):
POST_SYNC = "post_sync"
PRE_SYNC = "pre_sync"
def __init__(self, command, exec_at=POST_SYNC):
self.command = shlex.split(command)
if exec_at == self.POST_SYNC:
self.before_job = self._keep_calm
self.after_job = self._exec
elif exec_at == self.PRE_SYNC:
self.before_job = self._exec
self.after_job = self._keep_calm
def _keep_calm(self, ctx={}, **kwargs):
pass
def _exec(self, ctx={}, **kwargs):
new_env = os.environ.copy()
new_env["TUNASYNC_MIRROR_NAME"] = ctx["mirror_name"]
new_env["TUNASYNC_WORKING_DIR"] = ctx["current_dir"]
new_env["TUNASYNC_JOB_EXIT_STATUS"] = kwargs.get("status", "")
_cmd = self.command[0]
_args = [] if len(self.command) == 1 else self.command[1:]
cmd = sh.Command(_cmd)
cmd(*_args, _env=new_env)
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,19 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
class JobHook(object):
def before_job(self, *args, **kwargs):
raise NotImplementedError("")
def after_job(self, *args, **kwargs):
raise NotImplementedError("")
def before_exec(self, *args, **kwargs):
pass
def after_exec(self, *args, **kwargs):
pass
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,135 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import sh
import sys
from setproctitle import setproctitle
import signal
import Queue
import traceback
def run_job(sema, child_q, manager_q, provider, **settings):
aquired = False
setproctitle("tunasync-{}".format(provider.name))
def before_quit(*args):
provider.terminate()
if aquired:
print("{} release semaphore".format(provider.name))
sema.release()
sys.exit(0)
def sleep_wait(timeout):
try:
msg = child_q.get(timeout=timeout)
if msg == "terminate":
manager_q.put(("CONFIG_ACK", (provider.name, "QUIT")))
return True
except Queue.Empty:
return False
signal.signal(signal.SIGTERM, before_quit)
if provider.delay > 0:
if sleep_wait(provider.delay):
return
max_retry = settings.get("max_retry", 1)
def _real_run(idx=0, stage="job_hook", ctx=None):
"""\
4 stages:
0 -> job_hook, 1 -> set_retry, 2 -> exec_hook, 3 -> exec
"""
assert(ctx is not None)
if stage == "exec":
# exec_job
try:
provider.run(ctx=ctx)
provider.wait()
except sh.ErrorReturnCode:
status = "fail"
else:
status = "success"
return status
elif stage == "set_retry":
# enter stage 3 with retry
for retry in range(max_retry):
status = "syncing"
manager_q.put(("UPDATE", (provider.name, status, ctx)))
print("start syncing {}, retry: {}".format(provider.name, retry))
status = _real_run(idx=0, stage="exec_hook", ctx=ctx)
if status == "success":
break
return status
# job_hooks
elif stage == "job_hook":
if idx == len(provider.hooks):
return _real_run(idx=idx, stage="set_retry", ctx=ctx)
hook = provider.hooks[idx]
hook_before, hook_after = hook.before_job, hook.after_job
status = "pre-syncing"
elif stage == "exec_hook":
if idx == len(provider.hooks):
return _real_run(idx=idx, stage="exec", ctx=ctx)
hook = provider.hooks[idx]
hook_before, hook_after = hook.before_exec, hook.after_exec
status = "syncing"
try:
# print("%s run before_%s, %d" % (provider.name, stage, idx))
hook_before(provider=provider, ctx=ctx)
status = _real_run(idx=idx+1, stage=stage, ctx=ctx)
except Exception:
traceback.print_exc()
status = "fail"
finally:
# print("%s run after_%s, %d" % (provider.name, stage, idx))
# job may break when syncing
if status != "success":
status = "fail"
try:
hook_after(provider=provider, status=status, ctx=ctx)
except Exception:
traceback.print_exc()
return status
while 1:
try:
sema.acquire(True)
except:
break
aquired = True
ctx = {} # put context info in it
ctx['current_dir'] = provider.local_dir
ctx['mirror_name'] = provider.name
status = "pre-syncing"
manager_q.put(("UPDATE", (provider.name, status, ctx)))
try:
status = _real_run(idx=0, stage="job_hook", ctx=ctx)
except Exception:
traceback.print_exc()
status = "fail"
finally:
sema.release()
aquired = False
print("syncing {} finished, sleep {} minutes for the next turn".format(
provider.name, provider.interval
))
manager_q.put(("UPDATE", (provider.name, status, ctx)))
if sleep_wait(timeout=provider.interval * 60):
break
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,88 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import sh
import os
from .hook import JobHook
from datetime import datetime
class LogLimitHook(JobHook):
def __init__(self, limit=10):
self.limit = limit
def before_job(self, *args, **kwargs):
pass
def after_job(self, *args, **kwargs):
pass
def before_exec(self, provider, ctx={}, *args, **kwargs):
log_dir = provider.log_dir
self.ensure_log_dir(log_dir)
log_file = provider.log_file.format(
date=datetime.now().strftime("%Y-%m-%d_%H-%M"))
ctx['log_file'] = log_file
if log_file == "/dev/null":
return
log_link = os.path.join(log_dir, "latest")
ctx['log_link'] = log_link
lfiles = [os.path.join(log_dir, lfile)
for lfile in os.listdir(log_dir)
if lfile.startswith(provider.name)]
lfiles_set = set(lfiles)
# sort to get the newest 10 files
lfiles_ts = sorted(
[(os.path.getmtime(lfile), lfile) for lfile in lfiles],
key=lambda x: x[0],
reverse=True)
lfiles_keep = set([x[1] for x in lfiles_ts[:self.limit]])
lfiles_rm = lfiles_set - lfiles_keep
# remove old files
for lfile in lfiles_rm:
try:
sh.rm(lfile)
except:
pass
# create a soft link
self.create_link(log_link, log_file)
def after_exec(self, status=None, ctx={}, *args, **kwargs):
log_file = ctx.get('log_file', None)
log_link = ctx.get('log_link', None)
if log_file == "/dev/null":
return
if status == "fail":
log_file_save = log_file + ".fail"
try:
sh.mv(log_file, log_file_save)
except:
pass
self.create_link(log_link, log_file_save)
def ensure_log_dir(self, log_dir):
if not os.path.exists(log_dir):
sh.mkdir("-p", log_dir)
def create_link(self, log_link, log_file):
if log_link == log_file:
return
if not (log_link and log_file):
return
if os.path.lexists(log_link):
try:
sh.rm(log_link)
except:
return
try:
sh.ln('-s', log_file, log_link)
except:
return
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,156 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import os
from datetime import datetime
from .mirror_provider import RsyncProvider, TwoStageRsyncProvider, ShellProvider
from .btrfs_snapshot import BtrfsHook
from .loglimit import LogLimitHook
from .exec_pre_post import CmdExecHook
class MirrorConfig(object):
_valid_providers = set(("rsync", "two-stage-rsync", "shell", ))
def __init__(self, parent, options):
self._parent = parent
self._popt = self._parent._settings
self.options = dict(options.items()) # copy
self._validate()
def _validate(self):
provider = self.options.get("provider", None)
assert provider in self._valid_providers
if provider == "rsync":
assert "upstream" in self.options
elif provider == "shell":
assert "command" in self.options
local_dir_tmpl = self.options.get(
"local_dir", self._popt["global"]["local_dir"])
self.options["local_dir"] = local_dir_tmpl.format(
mirror_root=self._popt["global"]["mirror_root"],
mirror_name=self.name,
)
if "interval" not in self.options:
self.options["interval"] = self._popt["global"]["interval"]
assert isinstance(self.options["interval"], int)
log_dir = self.options.get(
"log_dir", self._popt["global"]["log_dir"])
if "log_file" not in self.options:
self.options["log_file"] = os.path.join(
log_dir, self.name, self.name + "_{date}.log")
self.log_dir = os.path.dirname(self.log_file)
if "use_btrfs" not in self.options:
self.options["use_btrfs"] = self._parent.use_btrfs
assert self.options["use_btrfs"] in (True, False)
if "env" in self.options:
assert isinstance(self.options["env"], dict)
def __getattr__(self, key):
if key in self.__dict__:
return self.__dict__[key]
else:
return self.__dict__["options"].get(key, None)
def to_provider(self, hooks=[], no_delay=False):
kwargs = {
'name': self.name,
'upstream_url': self.upstream,
'local_dir': self.local_dir,
'log_dir': self.log_dir,
'log_file': self.log_file,
'interval': self.interval,
'env': self.env,
'hooks': hooks,
}
if self.provider == "rsync":
kwargs.update({
'useIPv6': self.use_ipv6,
'password': self.password,
'exclude_file': self.exclude_file,
})
provider = RsyncProvider(**kwargs)
elif self.provider == "two-stage-rsync":
kwargs.update({
'useIPv6': self.use_ipv6,
'password': self.password,
'exclude_file': self.exclude_file,
})
provider = TwoStageRsyncProvider(**kwargs)
provider.set_stage1_profile(self.stage1_profile)
elif self.options["provider"] == "shell":
kwargs.update({
'command': self.command,
'log_stdout': self.options.get("log_stdout", True),
})
provider = ShellProvider(**kwargs)
if not no_delay:
sm = self._parent.status_manager
last_update = sm.get_info(self.name, 'last_update')
if last_update not in (None, '-'):
last_update = datetime.strptime(
last_update, '%Y-%m-%d %H:%M:%S')
delay = int(last_update.strftime("%s")) \
+ self.interval * 60 - int(datetime.now().strftime("%s"))
if delay < 0:
delay = 0
provider.set_delay(delay)
return provider
def compare(self, other):
assert self.name == other.name
for key, val in self.options.iteritems():
if other.options.get(key, None) != val:
return False
return True
def hooks(self):
hooks = []
parent = self._parent
if self.options["use_btrfs"]:
working_dir = parent.btrfs_working_dir_tmpl.format(
mirror_root=parent.mirror_root,
mirror_name=self.name
)
service_dir = parent.btrfs_service_dir_tmpl.format(
mirror_root=parent.mirror_root,
mirror_name=self.name
)
gc_dir = parent.btrfs_gc_dir_tmpl.format(
mirror_root=parent.mirror_root,
mirror_name=self.name
)
hooks.append(BtrfsHook(service_dir, working_dir, gc_dir))
hooks.append(LogLimitHook())
if self.exec_pre_sync:
hooks.append(
CmdExecHook(self.exec_pre_sync, CmdExecHook.PRE_SYNC))
if self.exec_post_sync:
hooks.append(
CmdExecHook(self.exec_post_sync, CmdExecHook.POST_SYNC))
return hooks
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,226 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import sh
import os
import shlex
from datetime import datetime
class MirrorProvider(object):
'''
Mirror method class, can be `rsync', `debmirror', etc.
'''
def __init__(self, name, local_dir, log_dir, log_file="/dev/null",
interval=120, hooks=[]):
self.name = name
self.local_dir = local_dir
self.log_file = log_file
self.log_dir = log_dir
self.interval = interval
self.hooks = hooks
self.p = None
self.delay = 0
# deprecated
def ensure_log_dir(self):
log_dir = os.path.dirname(self.log_file)
if not os.path.exists(log_dir):
sh.mkdir("-p", log_dir)
def get_log_file(self, ctx={}):
if 'log_file' in ctx:
log_file = ctx['log_file']
else:
now = datetime.now().strftime("%Y-%m-%d_%H")
log_file = self.log_file.format(date=now)
ctx['log_file'] = log_file
return log_file
def set_delay(self, sec):
''' Set start delay '''
self.delay = sec
def run(self, ctx={}):
raise NotImplementedError("run method should be implemented")
def terminate(self):
if self.p is not None:
self.p.process.terminate()
print("{} terminated".format(self.name))
self.p = None
def wait(self):
if self.p is not None:
self.p.wait()
self.p = None
class RsyncProvider(MirrorProvider):
_default_options = ['-aHvh', '--no-o', '--no-g', '--stats',
'--exclude', '.~tmp~/',
'--delete', '--delete-after', '--delay-updates',
'--safe-links', '--timeout=120', '--contimeout=120']
def __init__(self, name, upstream_url, local_dir, log_dir,
useIPv6=True, password=None, exclude_file=None,
log_file="/dev/null", interval=120, env=None, hooks=[]):
super(RsyncProvider, self).__init__(name, local_dir, log_dir, log_file,
interval, hooks)
self.upstream_url = upstream_url
self.useIPv6 = useIPv6
self.exclude_file = exclude_file
self.password = password
self.env = env
@property
def options(self):
_options = [o for o in self._default_options] # copy
if self.useIPv6:
_options.append("-6")
if self.exclude_file:
_options.append("--exclude-from")
_options.append(self.exclude_file)
return _options
def run(self, ctx={}):
_args = self.options
_args.append(self.upstream_url)
working_dir = ctx.get("current_dir", self.local_dir)
_args.append(working_dir)
log_file = self.get_log_file(ctx)
new_env = os.environ.copy()
if self.password is not None:
new_env["RSYNC_PASSWORD"] = self.password
if self.env is not None and isinstance(self.env, dict):
for k, v in self.env.items():
new_env[k] = v
self.p = sh.rsync(*_args, _env=new_env, _out=log_file,
_err_to_out=True, _out_bufsize=1, _bg=True)
class TwoStageRsyncProvider(RsyncProvider):
_stage1_options = ['-aHvh', '--no-o', '--no-g',
'--exclude', '.~tmp~/',
'--safe-links', '--timeout=120', '--contimeout=120']
_stage2_options = ['-aHvh', '--no-o', '--no-g', '--stats',
'--exclude', '.~tmp~/',
'--delete', '--delete-after', '--delay-updates',
'--safe-links', '--timeout=120', '--contimeout=120']
_stage1_profiles = {
"debian": [
'dists/',
],
"debian-oldstyle": [
'Packages*', 'Sources*', 'Release*',
'InRelease', 'i18n/*', 'ls-lR*', 'dep11/*',
]
}
def set_stage1_profile(self, profile):
if profile not in self._stage1_profiles:
raise Exception("Profile Undefined: %s, %s" % (profile, self.name))
self._stage1_excludes = self._stage1_profiles[profile]
def options(self, stage):
_default_options = self._stage1_options \
if stage == 1 else self._stage2_options
_options = [o for o in _default_options] # copy
if stage == 1:
for _exc in self._stage1_excludes:
_options.append("--exclude")
_options.append(_exc)
if self.useIPv6:
_options.append("-6")
if self.exclude_file:
_options.append("--exclude-from")
_options.append(self.exclude_file)
return _options
def run(self, ctx={}):
working_dir = ctx.get("current_dir", self.local_dir)
log_file = self.get_log_file(ctx)
new_env = os.environ.copy()
if self.password is not None:
new_env["RSYNC_PASSWORD"] = self.password
if self.env is not None and isinstance(self.env, dict):
for k, v in self.env.items():
new_env[k] = v
with open(log_file, 'w', buffering=1) as f:
def log_output(line):
f.write(line)
for stage in (1, 2):
_args = self.options(stage)
_args.append(self.upstream_url)
_args.append(working_dir)
f.write("==== Stage {} Begins ====\n\n".format(stage))
self.p = sh.rsync(
*_args, _env=new_env, _out=log_output,
_err_to_out=True, _out_bufsize=1, _bg=False
)
self.p.wait()
class ShellProvider(MirrorProvider):
def __init__(self, name, command, upstream_url, local_dir, log_dir,
log_file="/dev/null", log_stdout=True, interval=120, env=None,
hooks=[]):
super(ShellProvider, self).__init__(name, local_dir, log_dir, log_file,
interval, hooks)
self.upstream_url = str(upstream_url)
self.command = shlex.split(command)
self.log_stdout = log_stdout
self.env = env
def run(self, ctx={}):
log_file = self.get_log_file(ctx)
new_env = os.environ.copy()
new_env["TUNASYNC_MIRROR_NAME"] = self.name
new_env["TUNASYNC_LOCAL_DIR"] = self.local_dir
new_env["TUNASYNC_WORKING_DIR"] = ctx.get("current_dir", self.local_dir)
new_env["TUNASYNC_UPSTREAM_URL"] = self.upstream_url
new_env["TUNASYNC_LOG_FILE"] = log_file
if self.env is not None and isinstance(self.env, dict):
for k, v in self.env.items():
new_env[k] = v
_cmd = self.command[0]
_args = [] if len(self.command) == 1 else self.command[1:]
cmd = sh.Command(_cmd)
if self.log_stdout:
self.p = cmd(*_args, _env=new_env, _out=log_file,
_err_to_out=True, _out_bufsize=1, _bg=True)
else:
self.p = cmd(*_args, _env=new_env, _out='/dev/null',
_err='/dev/null', _out_bufsize=1, _bg=True)
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,123 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import json
from datetime import datetime
class StatusManager(object):
def __init__(self, parent, dbfile):
self.parent = parent
self.dbfile = dbfile
self.init_mirrors()
def init_mirrors(self):
mirrors = {}
for name, cfg in self.parent.mirrors.iteritems():
mirrors[name] = {
'name': name,
'last_update': '-',
'status': 'unknown',
'upstream': cfg.upstream or '-',
}
try:
with open(self.dbfile) as f:
_mirrors = json.load(f)
for m in _mirrors:
name = m["name"]
mirrors[name]["last_update"] = m["last_update"]
mirrors[name]["status"] = m["status"]
except:
pass
self.mirrors = mirrors
self.mirrors_ctx = {key: {} for key in self.mirrors}
def get_info(self, name, key):
if key == "ctx":
return self.mirrors_ctx.get(name, {})
_m = self.mirrors.get(name, {})
return _m.get(key, None)
def refresh_mirror(self, name):
cfg = self.parent.mirrors.get(name, None)
if cfg is None:
return
_m = self.mirrors.get(name, {
'name': name,
'last_update': '-',
'status': '-',
})
_m['upstream'] = cfg.upstream or '-'
self.mirrors[name] = dict(_m.items())
self.commit_db()
def update_status(self, name, status, ctx={}):
_m = self.mirrors.get(name, {
'name': name,
'last_update': '-',
'status': '-',
})
if status in ("syncing", "fail", "pre-syncing"):
update_time = _m["last_update"]
elif status == "success":
update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
print("Invalid status: {}, from {}".format(status, name))
_m['last_update'] = update_time
_m['status'] = status
self.mirrors[name] = dict(_m.items())
self.mirrors_ctx[name] = ctx
self.commit_db()
print("Updated status file, {}:{}".format(name, status))
def list_status(self, _format=False):
_mirrors = sorted(
[m for _, m in self.mirrors.items()],
key=lambda x: x['name']
)
if not _format:
return _mirrors
name_len = max([len(_m['name']) for _m in _mirrors])
update_len = max([len(_m['last_update']) for _m in _mirrors])
status_len = max([len(_m['status']) for _m in _mirrors])
heading = ' '.join([
'name'.ljust(name_len),
'last update'.ljust(update_len),
'status'.ljust(status_len)
])
line = ' '.join(['-'*name_len, '-'*update_len, '-'*status_len])
tabular = '\n'.join(
[
' '.join(
(_m['name'].ljust(name_len),
_m['last_update'].ljust(update_len),
_m['status'].ljust(status_len))
) for _m in _mirrors
]
)
return '\n'.join((heading, line, tabular))
def get_status(self, name, _format=False):
if name not in self.mirrors:
return None
mir = self.mirrors[name]
if not _format:
return mir
tmpl = "{name} last_update: {last_update} status: {status}"
return tmpl.format(**mir)
def commit_db(self):
with open(self.dbfile, 'wb') as f:
_mirrors = self.list_status()
json.dump(_mirrors, f, indent=2, separators=(',', ':'))
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -1,279 +0,0 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import signal
import sys
import toml
from multiprocessing import Process, Semaphore, Queue
from . import jobs
from .hook import JobHook
from .mirror_config import MirrorConfig
from .status_manager import StatusManager
from .clt_server import run_control_server
class TUNASync(object):
_instance = None
_settings = None
_inited = False
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(TUNASync, cls).__new__(cls, *args, **kwargs)
return cls._instance
def read_config(self, config_file):
self._config_file = config_file
with open(self._config_file) as f:
self._settings = toml.loads(f.read())
self._inited = True
self._mirrors = {}
self._providers = {}
self.processes = {}
self.semaphore = Semaphore(self._settings["global"]["concurrent"])
self.channel = Queue()
self._hooks = []
self.mirror_root = self._settings["global"]["mirror_root"]
self.use_btrfs = self._settings["global"]["use_btrfs"]
self.btrfs_service_dir_tmpl = self._settings["btrfs"]["service_dir"]
self.btrfs_working_dir_tmpl = self._settings["btrfs"]["working_dir"]
self.btrfs_gc_dir_tmpl = self._settings["btrfs"]["gc_dir"]
self.status_file = self._settings["global"]["status_file"]
self.status_manager = StatusManager(self, self.status_file)
self.ctrl_addr = self._settings["global"]["ctrl_addr"]
self.ctrl_channel = Queue()
p = Process(
target=run_control_server,
args=(self.ctrl_addr, self.channel, self.ctrl_channel),
)
p.start()
self.processes["CTRL_SERVER"] = (self.ctrl_channel, p)
def add_hook(self, h):
assert isinstance(h, JobHook)
self._hooks.append(h)
def hooks(self):
return self._hooks
@property
def mirrors(self):
if self._mirrors:
return self._mirrors
for mirror_opt in self._settings["mirrors"]:
name = mirror_opt["name"]
self._mirrors[name] = \
MirrorConfig(self, mirror_opt)
return self._mirrors
@property
def providers(self):
if self._providers:
return self._providers
for name, mirror in self.mirrors.iteritems():
hooks = mirror.hooks() + self.hooks()
provider = mirror.to_provider(hooks, no_delay=mirror.no_delay)
self._providers[name] = provider
return self._providers
def run_jobs(self):
for name in self.providers:
self.run_provider(name)
def sig_handler(*args):
print("terminate subprocesses")
for _, np in self.processes.iteritems():
_, p = np
p.terminate()
print("Good Bye")
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGUSR1, self.reload_mirrors)
signal.signal(signal.SIGUSR2, self.reload_mirrors_force)
self.run_forever()
def run_provider(self, name):
if name not in self.providers:
print("{} doesnot exist".format(name))
return
provider = self.providers[name]
child_queue = Queue()
p = Process(
target=jobs.run_job,
args=(self.semaphore, child_queue, self.channel, provider, ),
kwargs={
'max_retry': self._settings['global']['max_retry']}
)
p.start()
provider.set_delay(0) # clear delay after first start
self.processes[name] = (child_queue, p)
def reload_mirrors(self, signum, frame):
try:
return self._reload_mirrors(signum, frame, force=False)
except Exception as e:
print(e)
def reload_mirrors_force(self, signum, frame):
try:
return self._reload_mirrors(signum, frame, force=True)
except Exception as e:
print(e)
def _reload_mirrors(self, signum, frame, force=False):
print("reload mirror configs, force restart: {}".format(force))
with open(self._config_file) as f:
self._settings = toml.loads(f.read())
for mirror_opt in self._settings["mirrors"]:
name = mirror_opt["name"]
newMirCfg = MirrorConfig(self, mirror_opt)
if name in self._mirrors:
if newMirCfg.compare(self._mirrors[name]):
continue
self._mirrors[name] = newMirCfg
hooks = newMirCfg.hooks() + self.hooks()
newProvider = newMirCfg.to_provider(hooks, no_delay=True)
self._providers[name] = newProvider
if name in self.processes:
q, p = self.processes[name]
if force:
p.terminate()
print("Terminated Job: {}".format(name))
self.run_provider(name)
else:
q.put("terminate")
print("New configuration queued to {}".format(name))
else:
print("New mirror: {}".format(name))
self.run_provider(name)
self.status_manager.refresh_mirror(name)
def run_forever(self):
while 1:
try:
msg_hdr, msg_body = self.channel.get()
except IOError:
continue
if msg_hdr == "UPDATE":
mirror_name, status, ctx = msg_body
try:
self.status_manager.update_status(
mirror_name, status, dict(ctx.items()))
except Exception as e:
print(e)
elif msg_hdr == "CONFIG_ACK":
mirror_name, status = msg_body
if status == "QUIT":
print("New configuration applied to {}".format(mirror_name))
self.run_provider(mirror_name)
elif msg_hdr == "CMD":
cmd, mirror_name, kwargs = msg_body
if (mirror_name not in self.mirrors) and (mirror_name != "__ALL__"):
self.ctrl_channel.put("Invalid target")
continue
res = self.handle_cmd(cmd, mirror_name, kwargs)
self.ctrl_channel.put(res)
def handle_cmd(self, cmd, mirror_name, kwargs):
if cmd == "restart":
if mirror_name not in self.providers:
res = "Invalid job: {}".format(mirror_name)
return res
if mirror_name in self.processes:
_, p = self.processes[mirror_name]
p.terminate()
self.providers[mirror_name].set_delay(0)
self.run_provider(mirror_name)
res = "Restarted Job: {}".format(mirror_name)
elif cmd == "stop":
if mirror_name not in self.processes:
res = "{} not running".format(mirror_name)
return res
_, p = self.processes.pop(mirror_name)
p.terminate()
res = "Stopped Job: {}".format(mirror_name)
elif cmd == "start":
if mirror_name in self.processes:
res = "{} already running".format(mirror_name)
return res
self.run_provider(mirror_name)
res = "Started Job: {}".format(mirror_name)
elif cmd == "status":
if mirror_name == "__ALL__":
res = self.status_manager.list_status(_format=True)
else:
res = self.status_manager.get_status(mirror_name, _format=True)
elif cmd == "log":
job_ctx = self.status_manager.get_info(mirror_name, "ctx")
n = kwargs.get("n", 0)
if n == 0:
res = job_ctx.get(
"log_link",
job_ctx.get("log_file", "/dev/null"),
)
else:
import os
log_file = job_ctx.get("log_file", None)
if log_file is None:
return "/dev/null"
log_dir = os.path.dirname(log_file)
lfiles = [
os.path.join(log_dir, lfile)
for lfile in os.listdir(log_dir)
if lfile.startswith(mirror_name) and lfile != "latest"
]
if len(lfiles) <= n:
res = "Only {} log files available".format(len(lfiles))
return res
lfiles_set = set(lfiles)
# sort to get the newest 10 files
lfiles_ts = sorted(
[(os.path.getmtime(lfile), lfile) for lfile in lfiles_set],
key=lambda x: x[0],
reverse=True,
)
return lfiles_ts[n][1]
else:
res = "Invalid command"
return res
# vim: ts=4 sw=4 sts=4 expandtab