geo-rep: Kill Geo-rep Worker when Agent process dies
authorAravinda VK <avishwan@redhat.com>
Fri, 30 Oct 2015 11:36:58 +0000 (17:06 +0530)
committerVenky Shankar <vshankar@redhat.com>
Mon, 9 Nov 2015 09:20:31 +0000 (01:20 -0800)
When Changelog agent process dies, Geo-replication fails to detect
and worker will run without respective Changelog agent. Status shows
Active/Passive without any progress.

With this patch, Worker process gets killed whenever Changelog
agent dies.

Change-Id: I30b4cc77f924f7e8174b8bfe415ac17f0b3851b4
Signed-off-by: Aravinda VK <avishwan@redhat.com>
BUG: 1277076
Reviewed-on: http://review.gluster.org/12485
Tested-by: NetBSD Build System <jenkins@build.gluster.org>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Venky Shankar <vshankar@redhat.com>
Reviewed-by: Kotresh HR <khiremat@redhat.com>
geo-replication/syncdaemon/changelogagent.py
geo-replication/syncdaemon/monitor.py
geo-replication/syncdaemon/resource.py

index ad5f69cfb2367f50c2571eb694e33a22163cf8af..731dbd06f57f21bb200afc5ef76f3e7dc0da9a04 100644 (file)
@@ -66,8 +66,6 @@ class Changelog(object):
 class ChangelogAgent(object):
     def __init__(self, obj, fd_tup):
         (inf, ouf, rw, ww) = fd_tup.split(',')
-        os.close(int(rw))
-        os.close(int(ww))
         repce = RepceServer(obj, int(inf), int(ouf), 1)
         t = syncdutils.Thread(target=lambda: (repce.service_loop(),
                                               syncdutils.finalize()))
index c41eb9691434530eca05e7c5a8a079df3a4dc6b7..5a6bf5033a4b1f7ae8f350540509da7e7d32c04b 100644 (file)
@@ -18,7 +18,7 @@ import xml.etree.ElementTree as XET
 from subprocess import PIPE
 from resource import Popen, FILE, GLUSTER, SSH
 from threading import Lock
-from errno import EEXIST
+from errno import ECHILD
 import re
 import random
 from gconf import gconf
@@ -188,10 +188,18 @@ class Monitor(object):
         ret = 0
 
         def nwait(p, o=0):
-            p2, r = waitpid(p, o)
-            if not p2:
-                return
-            return r
+            try:
+                p2, r = waitpid(p, o)
+                if not p2:
+                    return
+                return r
+            except OSError as e:
+                # no child process, this happens if the child process
+                # already died and has been cleaned up
+                if e.errno == ECHILD:
+                    return -1
+                else:
+                    raise
 
         def exit_signalled(s):
             """ child teminated due to receipt of SIGUSR1 """
@@ -240,6 +248,8 @@ class Monitor(object):
             # spawn the agent process
             apid = os.fork()
             if apid == 0:
+                os.close(rw)
+                os.close(ww)
                 os.execv(sys.executable, argv + ['--local-path', w[0],
                                                  '--agent',
                                                  '--rpc-fd',
@@ -249,6 +259,8 @@ class Monitor(object):
             cpid = os.fork()
             if cpid == 0:
                 os.close(pr)
+                os.close(ra)
+                os.close(wa)
                 os.execv(sys.executable, argv + ['--feedback-fd', str(pw),
                                                  '--local-path', w[0],
                                                  '--local-id',
@@ -277,30 +289,52 @@ class Monitor(object):
 
             if so:
                 ret = nwait(cpid, os.WNOHANG)
+                ret_agent = nwait(apid, os.WNOHANG)
+
+                if ret_agent is not None:
+                    # Agent is died Kill Worker
+                    logging.info("Changelog Agent died, "
+                                 "Aborting Worker(%s)" % w[0])
+                    os.kill(cpid, signal.SIGKILL)
+                    nwait(cpid)
+                    nwait(apid)
+
                 if ret is not None:
                     logging.info("worker(%s) died before establishing "
                                  "connection" % w[0])
-                    nwait(apid) #wait for agent
+                    nwait(apid)  # wait for agent
                 else:
                     logging.debug("worker(%s) connected" % w[0])
                     while time.time() < t0 + conn_timeout:
                         ret = nwait(cpid, os.WNOHANG)
+                        ret_agent = nwait(apid, os.WNOHANG)
+
                         if ret is not None:
                             logging.info("worker(%s) died in startup "
                                          "phase" % w[0])
-                            nwait(apid) #wait for agent
+                            nwait(apid)  # wait for agent
+                            break
+
+                        if ret_agent is not None:
+                            # Agent is died Kill Worker
+                            logging.info("Changelog Agent died, Aborting "
+                                         "Worker(%s)" % w[0])
+                            os.kill(cpid, signal.SIGKILL)
+                            nwait(cpid)
+                            nwait(apid)
                             break
+
                         time.sleep(1)
             else:
                 logging.info("worker(%s) not confirmed in %d sec, "
                              "aborting it" % (w[0], conn_timeout))
                 os.kill(cpid, signal.SIGKILL)
-                nwait(apid) #wait for agent
+                nwait(apid)  # wait for agent
                 ret = nwait(cpid)
             if ret is None:
                 self.status[w[0]].set_worker_status(self.ST_STABLE)
-                #If worker dies, agent terminates on EOF.
-                #So lets wait for agent first.
+                # If worker dies, agent terminates on EOF.
+                # So lets wait for agent first.
                 nwait(apid)
                 ret = nwait(cpid)
             if exit_signalled(ret):
index 51f88627a96ac187c069fff7e187ed6742b519ff..a44ca9142228faac0ed52601d21377ce0afeab10 100644 (file)
@@ -1394,8 +1394,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
             # g3 ==> changelog History
             changelog_register_failed = False
             (inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
-            os.close(int(ra))
-            os.close(int(wa))
             changelog_agent = RepceClient(int(inf), int(ouf))
             status = GeorepStatus(gconf.state_file, gconf.local_path)
             status.reset_on_worker_start()