control server

This commit is contained in:
bigeagle 2014-10-25 20:30:35 +08:00
parent 5725113a91
commit 199ca93e66
5 changed files with 158 additions and 17 deletions

View File

@ -10,6 +10,7 @@ concurrent = 2
# interval in minutes
interval = 1
max_retry = 2
ctrl_addr = "/tmp/tunasync.sock"
[btrfs]
service_dir = "{mirror_root}/_current/{mirror_name}"

49
tunasync/clt_server.py Normal file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import socket
import os
import json
class ControlServer(object):
def __init__(self, address, mgr_chan, cld_chan):
self.address = address
self.mgr_chan = mgr_chan
self.cld_chan = cld_chan
try:
os.unlink(self.address)
except OSError:
if os.path.exists(self.address):
raise Exception("file exists: {}".format(self.address))
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.bind(self.address)
print("Control Server listening on: {}".format(self.address))
self.sock.listen(1)
def serve_forever(self):
while 1:
conn, _ = self.sock.accept()
try:
length = ord(conn.recv(1))
content = conn.recv(length)
cmd = json.loads(content)
self.mgr_chan.put(("CMD", (cmd['cmd'], cmd['target'])))
except Exception as e:
print(e)
res = "Invalid Command"
else:
res = self.cld_chan.get()
conn.sendall(chr(len(res)) + res)
conn.close()
def run_control_server(address, mgr_chan, cld_chan):
cs = ControlServer(address, mgr_chan, cld_chan)
cs.serve_forever()
# vim: ts=4 sw=4 sts=4 expandtab

View File

@ -29,7 +29,7 @@ def run_job(sema, child_q, manager_q, provider, **settings):
aquired = True
status = "syncing"
manager_q.put((provider.name, status))
manager_q.put(("UPDATE", (provider.name, status)))
try:
for hook in provider.hooks:
hook.before_job(name=provider.name)
@ -66,12 +66,12 @@ def run_job(sema, child_q, manager_q, provider, **settings):
provider.name, provider.interval
))
manager_q.put((provider.name, status))
manager_q.put(("UPDATE", (provider.name, status)))
try:
msg = child_q.get(timeout=provider.interval * 60)
if msg == "terminate":
manager_q.put((provider.name, "QUIT"))
manager_q.put(("CONFIG_ACK", (provider.name, "QUIT")))
break
except Queue.Empty:
pass

View File

@ -9,6 +9,7 @@ from . import jobs
from .hook import JobHook
from .mirror_config import MirrorConfig
from .status_manager import StatusManager
from .clt_server import run_control_server
class TUNASync(object):
@ -46,6 +47,15 @@ class TUNASync(object):
self.status_file = self._settings["global"]["status_file"]
self.status_manager = StatusManager(self, self.status_file)
self.ctrl_addr = self._settings["global"]["ctrl_addr"]
self.ctrl_channel = Queue()
p = Process(
target=run_control_server,
args=(self.ctrl_addr, self.channel, self.ctrl_channel),
)
p.start()
self.processes["CTRL_SERVER"] = (self.ctrl_channel, p)
def add_hook(self, h):
assert isinstance(h, JobHook)
self._hooks.append(h)
@ -94,20 +104,7 @@ class TUNASync(object):
signal.signal(signal.SIGUSR1, self.reload_mirrors)
signal.signal(signal.SIGUSR2, self.reload_mirrors_force)
while 1:
try:
name, status = self.channel.get()
except IOError:
continue
if status == "QUIT":
print("New configuration applied to {}".format(name))
self.run_provider(name)
else:
try:
self.status_manager.update_status(name, status)
except Exception as e:
print(e)
self.run_forever()
def run_provider(self, name):
if name not in self.providers:
@ -170,5 +167,59 @@ class TUNASync(object):
print("New mirror: {}".format(name))
self.run_provider(name)
def run_forever(self):
while 1:
try:
msg_hdr, msg_body = self.channel.get()
except IOError:
continue
if msg_hdr == "UPDATE":
name, status = msg_body
try:
self.status_manager.update_status(name, status)
except Exception as e:
print(e)
elif msg_hdr == "CONFIG_ACK":
name, status = msg_body
if status == "QUIT":
print("New configuration applied to {}".format(name))
self.run_provider(name)
elif msg_hdr == "CMD":
cmd, name = msg_body
if name not in self.mirrors:
self.ctrl_channel.put("Invalid target")
continue
if cmd == "restart":
_, p = self.processes[name]
p.terminate()
self.run_provider(name)
res = "Restarted Job: {}".format(name)
elif cmd == "stop":
if name not in self.processes:
res = "{} not running".format(name)
self.ctrl_channel.put(res)
continue
_, p = self.processes.pop(name)
p.terminate()
res = "Stopped Job: {}".format(name)
elif cmd == "start":
if name in self.processes:
res = "{} already running".format(name)
self.ctrl_channel.put(res)
continue
self.run_provider(name)
res = "Started Job: {}".format(name)
else:
res = "Invalid command"
self.ctrl_channel.put(res)
# vim: ts=4 sw=4 sts=4 expandtab

40
tunasynctl.py Executable file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python2
# -*- coding:utf-8 -*-
import sys
import socket
import argparse
import json
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="tunasynctl")
parser.add_argument("-s", "--socket",
default="/var/run/tunasync.sock", help="socket file")
parser.add_argument("command", help="command")
parser.add_argument("target", help="target")
args = parser.parse_args()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(args.socket)
except socket.error as msg:
print(msg)
sys.exit(1)
pack = json.dumps({
'cmd': args.command,
'target': args.target,
})
try:
sock.sendall(chr(len(pack)) + pack)
length = ord(sock.recv(1))
print(sock.recv(length))
except Exception as e:
print(e)
finally:
sock.close()
# vim: ts=4 sw=4 sts=4 expandtab