[PATCH 7/8] cts: fix starting/stopping of test_agents

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Signed-off-by: Angus Salkeld <asalkeld@xxxxxxxxxx>
---
 cts/corosync.py |  110 ++++++++++++++++++++++++++++++------------------------
 1 files changed, 61 insertions(+), 49 deletions(-)

diff --git a/cts/corosync.py b/cts/corosync.py
index 2b79b23..89b0808 100644
--- a/cts/corosync.py
+++ b/cts/corosync.py
@@ -258,11 +258,15 @@ class corosync_needle(ClusterManager):
         # 2 - up and stable
         # 1 - unstable
         # 0 - down
-        out = self.rsh(node, self["StatusCmd"], 1)
-        is_stopped = string.find(out, 'stopped')
-        is_dead = string.find(out, 'dead')
+        (rc, lines) = self.rsh(node, self["StatusCmd"], stdout=2)
+        out = str(lines)
 
-        ret = (is_dead is -1 and is_stopped is -1)
+        if 'systemctl' in out:
+                ret= (string.find(out, 'inactive (dead)') == -1)
+        else:
+            is_stopped = string.find(out, 'stopped')
+            is_dead = string.find(out, 'dead')
+            ret = (is_dead is -1 and is_stopped is -1)
 
         try:
             if ret:
@@ -373,10 +377,16 @@ class TestAgentComponent(ScenarioComponent):
                 raise RuntimeError ("corosync not up")
 
             if self.CM.start_cpg:
-                self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env)
-                self.CM.cpg_agent[node].start()
-            self.CM.sam_agent[node] = SamTestAgent(node, CM.Env)
-            self.CM.sam_agent[node].start()
+                if self.CM.cpg_agent.has_key(node):
+                    self.CM.cpg_agent[node].clean_start()
+                else:
+                    self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env)
+                    self.CM.cpg_agent[node].start()
+            if self.CM.sam_agent.has_key(node):
+                self.CM.sam_agent[node].clean_start()
+            else:
+                self.CM.sam_agent[node] = SamTestAgent(node, CM.Env)
+                self.CM.sam_agent[node].start()
             # votequorum agent started as needed.
             if self.CM.applied_config.has_key('quorum/provider'):
                 if CM.applied_config['quorum/provider'] is 'corosync_votequorum':
@@ -411,12 +421,13 @@ class TestAgent(object):
         self.send_recv = False
 
     def restart(self):
+        self.env.debug('%s:%s restarting' % (self.node, self.binary))
         self.stop()
         self.start()
 
     def clean_start(self):
         if self.used or not self.status():
-            self.env.debug('test agent: cleaning %s on node %s' % (self.binary, self.node))
+            self.env.debug('%s:%s cleaning' % (self.node, self.binary))
             self.stop()
             self.start()
 
@@ -425,20 +436,23 @@ class TestAgent(object):
             return False
 
         try:
-            self.send (["are_you_ok_dude"])  
-            self.read ()
+            self.send_internal(["are_you_ok_dude"])
+            self.read()
             self.started = True
             return True
         except RuntimeError, msg:
             self.started = False
             return False
-    
+
     def start(self):
         '''Set up the given ScenarioComponent'''
-        self.env.debug('test agent: starting %s on node %s' % (self.binary, self.node))
+        if self.status():
+            return
+        self.env.debug('%s:%s starting ' % (self.node, self.binary))
+
+        self.rsh(self.node, self.binary, synchronous=False)
         self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
         ip = socket.gethostbyname(self.node)
-        self.rsh(self.node, self.binary, synchronous=False)
         is_connected = False
         retries = 0
         while not is_connected:
@@ -448,44 +462,52 @@ class TestAgent(object):
                 is_connected = True
             except socket.error, msg:
                 if retries > 10:
-                    self.env.log("Tried connecting to %s on node %s %d times. %s" % (self.binary, self.node, retries, str(msg)))
+                    self.env.log("%s:%s Tried connecting %d times. %s" % (self.node, self.binary, retries, str(msg)))
                 if retries > 30:
-                    raise RuntimeError("can't connect to " % self.binary)
+                    raise RuntimeError("%s:%s can't connect" % (self.node, self.binary))
                 time.sleep(1)
         self.started = True
         self.used = False
 
     def stop(self):
         '''Tear down (undo) the given ScenarioComponent'''
-        self.env.debug('test agent: stopping %s on node %s' % (self.binary, self.node))
+        self.env.debug('%s:%s stopping' % (self.binary, self.node))
         self.rsh(self.node, "killall " + self.binary + " 2>/dev/null")
-        self.sock.close ()
-        self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null")
+        if self.sock:
+            self.sock.close ()
+            del self.sock
+            self.sock = None
+        while self.getpid() != '':
+            time.sleep(1)
         self.started = False
 
     def kill(self):
         '''Tear down (undo) the given ScenarioComponent'''
-        self.env.debug('test agent: killing %s on node %s' % (self.binary, self.node))
+        self.env.log('%s:%s killing' % (self.node, self.binary))
         self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null")
         self.started = False
 
     def getpid(self):
         return self.rsh(self.node, 'pidof ' + self.binary, 1)
 
-    def send (self, args):
-        if not self.started:
-            self.start()
+    def send_internal(self, args):
 
         real_msg = str (len (args))
         for a in args:
             a_str = str(a)
             real_msg += ":" + str (len (a_str)) + ":" + a_str
         real_msg += ";"
-        sent = 0
         try:
-            sent = self.sock.send (real_msg)
+            return self.sock.send (real_msg)
         except socket.error, msg:
             self.env.log("send(%s): %s; error: %s" % (self.node, real_msg, msg))
+            return 0
+
+    def send (self, args):
+        if not self.started:
+            self.start()
+
+        sent = self.send_internal(args)
 
         if sent == 0:
             raise RuntimeError ("socket connection broken")
@@ -516,7 +538,7 @@ class TestAgent(object):
 
     def send_dynamic (self, *args):
         if not self.started:
-            self.start()
+            raise RuntimeError ("agent not started")
 
         # number of args+func
         real_msg = str (len (args) + 1) + ":" + str(len(self.func_name)) + ":" + self.func_name
@@ -536,10 +558,10 @@ class TestAgent(object):
 
     def read (self):
 
-		msg = self.sock.recv (4096)
-		if msg == '':
-			raise RuntimeError("socket connection broken")
-		return msg
+        msg = self.sock.recv (4096)
+        if msg == '':
+            raise RuntimeError("socket connection broken")
+        return msg
 
 
 class CpgConfigEvent:
@@ -553,9 +575,9 @@ class CpgConfigEvent:
             self.is_member = False
         else:
             self.is_member = True
-    
+
     def __str__ (self):
-        
+
         str = self.group_name + "," + self.node_id + "," + self.pid + ","
         if self.is_member:
             return str + "joined"
@@ -571,36 +593,26 @@ class CpgTestAgent(TestAgent):
         self.nodeid = None
 
     def start(self):
-        if not self.started:
+        if not self.status():
             TestAgent.start(self)
             self.cpg_initialize()
             self.used = False
 
-    def stop(self):
-        try:
-            if self.started:
-                self.cpg_finalize()
-        except RuntimeError, msg:
-            # if cpg_agent is down, we are not going to stress
-            self.env.debug("CpgTestAgent::cpg_finalize() - %s" % msg)
-
-        TestAgent.stop(self)
-
     def cpg_local_get(self):
         if self.nodeid == None:
-            self.send (["cpg_local_get"])  
+            self.send (["cpg_local_get"])
             self.nodeid = self.read ()
         return self.nodeid
 
     def record_config_events(self, truncate=True):
         if truncate:
-            self.send (["record_config_events", "truncate"])  
+            self.send (["record_config_events", "truncate"])
         else:
-            self.send (["record_config_events", "append"])  
+            self.send (["record_config_events", "append"])
         return self.read ()
 
     def read_config_event(self):
-        self.send (["read_config_event"])  
+        self.send (["read_config_event"])
         msg = self.read ()
 
         if "None" in msg:
@@ -609,7 +621,7 @@ class CpgTestAgent(TestAgent):
             return CpgConfigEvent(msg)
 
     def read_messages(self, atmost):
-        self.send (["read_messages", atmost])  
+        self.send (["read_messages", atmost])
         msg = self.read ()
 
         if "None" in msg:
@@ -618,7 +630,7 @@ class CpgTestAgent(TestAgent):
             return msg
 
     def context_test(self):
-        self.send (["context_test"])  
+        self.send (["context_test"])
         return self.read ()
 
 ###################################################################
-- 
1.7.7.5

_______________________________________________
discuss mailing list
discuss@xxxxxxxxxxxx
http://lists.corosync.org/mailman/listinfo/discuss


[Index of Archives]     [Linux Clusters]     [Corosync Project]     [Linux USB Devel]     [Linux Audio Users]     [Photo]     [Yosemite News]    [Yosemite Photos]    [Linux Kernel]     [Linux SCSI]     [X.Org]

  Powered by Linux