diff --git a/tunasync/exec_pre_post.py b/tunasync/exec_pre_post.py index 892b673..1ade5ad 100644 --- a/tunasync/exec_pre_post.py +++ b/tunasync/exec_pre_post.py @@ -26,6 +26,7 @@ class CmdExecHook(JobHook): new_env = os.environ.copy() new_env["TUNASYNC_MIRROR_NAME"] = ctx["mirror_name"] new_env["TUNASYNC_WORKING_DIR"] = ctx["current_dir"] + new_env["TUNASYNC_JOB_EXIT_STATUS"] = kwargs.get("status", "") _cmd = self.command[0] _args = [] if len(self.command) == 1 else self.command[1:] diff --git a/tunasync/jobs.py b/tunasync/jobs.py index 854cb89..e45d041 100644 --- a/tunasync/jobs.py +++ b/tunasync/jobs.py @@ -5,6 +5,7 @@ import sys from setproctitle import setproctitle import signal import Queue +import traceback def run_job(sema, child_q, manager_q, provider, **settings): @@ -18,17 +19,87 @@ def run_job(sema, child_q, manager_q, provider, **settings): sema.release() sys.exit(0) - signal.signal(signal.SIGTERM, before_quit) - if provider.delay > 0: + def sleep_wait(timeout): try: - msg = child_q.get(timeout=provider.delay) + msg = child_q.get(timeout=timeout) if msg == "terminate": manager_q.put(("CONFIG_ACK", (provider.name, "QUIT"))) - return + return True except Queue.Empty: - pass + return False + + signal.signal(signal.SIGTERM, before_quit) + + if provider.delay > 0: + if sleep_wait(provider.delay): + return max_retry = settings.get("max_retry", 1) + + def _real_run(idx=0, stage="job_hook", ctx=None): + """\ + 4 stages: + 0 -> job_hook, 1 -> set_retry, 2 -> exec_hook, 3 -> exec + """ + + assert(ctx is not None) + + if stage == "exec": + # exec_job + try: + provider.run(ctx=ctx) + provider.wait() + except sh.ErrorReturnCode: + status = "fail" + else: + status = "success" + return status + + elif stage == "set_retry": + # enter stage 3 with retry + for retry in range(max_retry): + status = "syncing" + manager_q.put(("UPDATE", (provider.name, status, ctx))) + print("start syncing {}, retry: {}".format(provider.name, retry)) + status = _real_run(idx=0, stage="exec_hook", ctx=ctx) + if status == "success": + break + return status + + # job_hooks + elif stage == "job_hook": + if idx == len(provider.hooks): + return _real_run(idx=idx, stage="set_retry", ctx=ctx) + hook = provider.hooks[idx] + hook_before, hook_after = hook.before_job, hook.after_job + status = "pre-syncing" + + elif stage == "exec_hook": + if idx == len(provider.hooks): + return _real_run(idx=idx, stage="exec", ctx=ctx) + hook = provider.hooks[idx] + hook_before, hook_after = hook.before_exec, hook.after_exec + status = "syncing" + + try: + # print("%s run before_%s, %d" % (provider.name, stage, idx)) + hook_before(provider=provider, ctx=ctx) + status = _real_run(idx=idx+1, stage=stage, ctx=ctx) + except Exception: + traceback.print_exc() + status = "fail" + finally: + # print("%s run after_%s, %d" % (provider.name, stage, idx)) + # job may break when syncing + if status != "success": + status = "fail" + try: + hook_after(provider=provider, status=status, ctx=ctx) + except Exception: + traceback.print_exc() + + return status + while 1: try: sema.acquire(True) @@ -43,43 +114,10 @@ def run_job(sema, child_q, manager_q, provider, **settings): manager_q.put(("UPDATE", (provider.name, status, ctx))) try: - # before_job hooks - for hook in provider.hooks: - hook.before_job(provider=provider, ctx=ctx) - - for retry in range(max_retry): - # before_exec hooks - for hook in provider.hooks: - hook.before_exec(provider=provider, ctx=ctx) - - status = "syncing" - manager_q.put(("UPDATE", (provider.name, status, ctx))) - print("start syncing {}, retry: {}".format(provider.name, retry)) - - try: - provider.run(ctx=ctx) - provider.wait() - except sh.ErrorReturnCode: - status = "fail" - else: - status = "success" - - # after_exec hooks - for hook in provider.hooks: - hook.after_exec(provider=provider, status=status, ctx=ctx) - - if status == "success": - break - - # after_job hooks - for hook in provider.hooks[::-1]: - hook.after_job(provider=provider, status=status, ctx=ctx) - + status = _real_run(idx=0, stage="job_hook", ctx=ctx) except Exception: - import traceback traceback.print_exc() status = "fail" - finally: sema.release() aquired = False @@ -90,13 +128,8 @@ def run_job(sema, child_q, manager_q, provider, **settings): manager_q.put(("UPDATE", (provider.name, status, ctx))) - try: - msg = child_q.get(timeout=provider.interval * 60) - if msg == "terminate": - manager_q.put(("CONFIG_ACK", (provider.name, "QUIT"))) - break - except Queue.Empty: - pass + if sleep_wait(timeout=provider.interval * 60): + break # vim: ts=4 sw=4 sts=4 expandtab