rewrite tunasynctl, add command

This commit is contained in:
bigeagle 2015-05-16 14:09:57 +08:00
parent caac9b4950
commit a99ce6575f
5 changed files with 125 additions and 54 deletions

View File

@ -8,6 +8,10 @@ import struct
class ControlServer(object):
valid_commands = set((
"start", "stop", "restart", "status", "log",
))
def __init__(self, address, mgr_chan, cld_chan):
self.address = address
self.mgr_chan = mgr_chan
@ -19,7 +23,7 @@ class ControlServer(object):
raise Exception("file exists: {}".format(self.address))
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.bind(self.address)
os.chmod(address, 0700)
os.chmod(address, 0o700)
print("Control Server listening on: {}".format(self.address))
self.sock.listen(1)
@ -32,7 +36,9 @@ class ControlServer(object):
length = struct.unpack('!H', conn.recv(2))[0]
content = conn.recv(length)
cmd = json.loads(content)
self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target'])))
if cmd['cmd'] not in self.valid_commands:
raise Exception("Invalid Command")
self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target'], cmd["kwargs"])))
except Exception as e:
print(e)
res = "Invalid Command"

View File

@ -36,11 +36,11 @@ def run_job(sema, child_q, manager_q, provider, **settings):
break
aquired = True
status = "syncing"
manager_q.put(("UPDATE", (provider.name, status)))
ctx = {} # put context info in it
ctx['current_dir'] = provider.local_dir
ctx['mirror_name'] = provider.name
status = "pre-syncing"
manager_q.put(("UPDATE", (provider.name, status, ctx)))
try:
for hook in provider.hooks:
@ -50,7 +50,9 @@ def run_job(sema, child_q, manager_q, provider, **settings):
traceback.print_exc()
status = "fail"
else:
status = "syncing"
for retry in range(max_retry):
manager_q.put(("UPDATE", (provider.name, status, ctx)))
print("start syncing {}, retry: {}".format(provider.name, retry))
provider.run(ctx=ctx)
@ -78,7 +80,7 @@ def run_job(sema, child_q, manager_q, provider, **settings):
provider.name, provider.interval
))
manager_q.put(("UPDATE", (provider.name, status)))
manager_q.put(("UPDATE", (provider.name, status, ctx)))
try:
msg = child_q.get(timeout=provider.interval * 60)

View File

@ -32,8 +32,11 @@ class StatusManager(object):
pass
self.mirrors = mirrors
self.mirrors_ctx = {key: {} for key in self.mirrors}
def get_info(self, name, key):
if key == "ctx":
return self.mirrors_ctx.get(name, {})
_m = self.mirrors.get(name, {})
return _m.get(key, None)
@ -50,7 +53,7 @@ class StatusManager(object):
self.mirrors[name] = dict(_m.items())
self.commit_db()
def update_status(self, name, status):
def update_status(self, name, status, ctx={}):
_m = self.mirrors.get(name, {
'name': name,
@ -68,6 +71,7 @@ class StatusManager(object):
_m['last_update'] = update_time
_m['status'] = status
self.mirrors[name] = dict(_m.items())
self.mirrors_ctx[name] = ctx
self.commit_db()
print("Updated status file, {}:{}".format(name, status))

View File

@ -173,63 +173,99 @@ class TUNASync(object):
def run_forever(self):
while 1:
try:
msg_hdr, msg_body = self.channel.get()
except IOError:
continue
if msg_hdr == "UPDATE":
name, status = msg_body
mirror_name, status, ctx = msg_body
try:
self.status_manager.update_status(name, status)
self.status_manager.update_status(
mirror_name, status, dict(ctx.items()))
except Exception as e:
print(e)
elif msg_hdr == "CONFIG_ACK":
name, status = msg_body
mirror_name, status = msg_body
if status == "QUIT":
print("New configuration applied to {}".format(name))
self.run_provider(name)
print("New configuration applied to {}".format(mirror_name))
self.run_provider(mirror_name)
elif msg_hdr == "CMD":
cmd, name = msg_body
if (name not in self.mirrors) and (name != "__ALL__"):
cmd, mirror_name, kwargs = msg_body
if (mirror_name not in self.mirrors) and (mirror_name != "__ALL__"):
self.ctrl_channel.put("Invalid target")
continue
res = self.handle_cmd(cmd, mirror_name, kwargs)
self.ctrl_channel.put(res)
def handle_cmd(self, cmd, mirror_name, kwargs):
if cmd == "restart":
_, p = self.processes[name]
_, p = self.processes[mirror_name]
p.terminate()
self.providers[name].set_delay(0)
self.run_provider(name)
res = "Restarted Job: {}".format(name)
self.providers[mirror_name].set_delay(0)
self.run_provider(mirror_name)
res = "Restarted Job: {}".format(mirror_name)
elif cmd == "stop":
if name not in self.processes:
res = "{} not running".format(name)
self.ctrl_channel.put(res)
continue
if mirror_name not in self.processes:
res = "{} not running".format(mirror_name)
return res
_, p = self.processes.pop(name)
_, p = self.processes.pop(mirror_name)
p.terminate()
res = "Stopped Job: {}".format(name)
res = "Stopped Job: {}".format(mirror_name)
elif cmd == "start":
if name in self.processes:
res = "{} already running".format(name)
self.ctrl_channel.put(res)
continue
if mirror_name in self.processes:
res = "{} already running".format(mirror_name)
return res
self.run_provider(mirror_name)
res = "Started Job: {}".format(mirror_name)
self.run_provider(name)
res = "Started Job: {}".format(name)
elif cmd == "status":
if name == "__ALL__":
if mirror_name == "__ALL__":
res = self.status_manager.list_status(_format=True)
else:
res = self.status_manager.get_status(name, _format=True)
res = self.status_manager.get_status(mirror_name, _format=True)
elif cmd == "log":
job_ctx = self.status_manager.get_info(mirror_name, "ctx")
n = kwargs.get("n", 0)
if n == 0:
res = job_ctx.get("log_file", "/dev/null")
else:
import os
log_file = job_ctx.get("log_file", None)
if log_file is None:
return "/dev/null"
log_dir = os.path.dirname(log_file)
lfiles = [
os.path.join(log_dir, lfile)
for lfile in os.listdir(log_dir)
if lfile.startswith(mirror_name) and lfile != "latest"
]
if len(lfiles) <= n:
res = "Only {} log files available".format(len(lfiles))
return res
lfiles_set = set(lfiles)
# sort to get the newest 10 files
lfiles_ts = sorted(
[(os.path.getmtime(lfile), lfile) for lfile in lfiles],
key=lambda x: x[0],
reverse=True,
)
return lfiles_ts[n][1]
else:
res = "Invalid command"
self.ctrl_channel.put(res)
return res
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -10,22 +10,45 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="tunasynctl")
parser.add_argument("-s", "--socket",
default="/var/run/tunasync.sock", help="socket file")
parser.add_argument("command", help="command")
parser.add_argument("target", nargs="?", default="__ALL__", help="target")
args = parser.parse_args()
subparsers = parser.add_subparsers(dest="command", help='sub-command help')
sp = subparsers.add_parser('start', help="start job")
sp.add_argument("target", help="mirror job name")
sp = subparsers.add_parser('stop', help="stop job")
sp.add_argument("target", help="mirror job name")
sp = subparsers.add_parser('restart', help="restart job")
sp.add_argument("target", help="mirror job name")
sp = subparsers.add_parser('status', help="show mirror status")
sp.add_argument("target", nargs="?", default="__ALL__", help="mirror job name")
sp = subparsers.add_parser('log', help="return log file path")
sp.add_argument("-n", type=int, default=0, help="last n-th log, default 0 (latest)")
sp.add_argument("target", help="mirror job name")
sp = subparsers.add_parser('help', help="show help message")
args = vars(parser.parse_args())
if args['command'] == "help":
parser.print_help()
sys.exit(0)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(args.socket)
sock.connect(args.pop("socket"))
except socket.error as msg:
print(msg)
sys.exit(1)
pack = json.dumps({
'cmd': args.command,
'target': args.target,
"cmd": args.pop("command"),
"target": args.pop("target"),
"kwargs": args,
})
try: