Signed-off-by: Angus Salkeld <asalkeld@xxxxxxxxxx> --- cts/corosync.py | 110 ++++++++++++++++++++++++++++++------------------------ 1 files changed, 61 insertions(+), 49 deletions(-) diff --git a/cts/corosync.py b/cts/corosync.py index 2b79b23..89b0808 100644 --- a/cts/corosync.py +++ b/cts/corosync.py @@ -258,11 +258,15 @@ class corosync_needle(ClusterManager): # 2 - up and stable # 1 - unstable # 0 - down - out = self.rsh(node, self["StatusCmd"], 1) - is_stopped = string.find(out, 'stopped') - is_dead = string.find(out, 'dead') + (rc, lines) = self.rsh(node, self["StatusCmd"], stdout=2) + out = str(lines) - ret = (is_dead is -1 and is_stopped is -1) + if 'systemctl' in out: + ret= (string.find(out, 'inactive (dead)') == -1) + else: + is_stopped = string.find(out, 'stopped') + is_dead = string.find(out, 'dead') + ret = (is_dead is -1 and is_stopped is -1) try: if ret: @@ -373,10 +377,16 @@ class TestAgentComponent(ScenarioComponent): raise RuntimeError ("corosync not up") if self.CM.start_cpg: - self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env) - self.CM.cpg_agent[node].start() - self.CM.sam_agent[node] = SamTestAgent(node, CM.Env) - self.CM.sam_agent[node].start() + if self.CM.cpg_agent.has_key(node): + self.CM.cpg_agent[node].clean_start() + else: + self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env) + self.CM.cpg_agent[node].start() + if self.CM.sam_agent.has_key(node): + self.CM.sam_agent[node].clean_start() + else: + self.CM.sam_agent[node] = SamTestAgent(node, CM.Env) + self.CM.sam_agent[node].start() # votequorum agent started as needed. if self.CM.applied_config.has_key('quorum/provider'): if CM.applied_config['quorum/provider'] is 'corosync_votequorum': @@ -411,12 +421,13 @@ class TestAgent(object): self.send_recv = False def restart(self): + self.env.debug('%s:%s restarting' % (self.node, self.binary)) self.stop() self.start() def clean_start(self): if self.used or not self.status(): - self.env.debug('test agent: cleaning %s on node %s' % (self.binary, self.node)) + self.env.debug('%s:%s cleaning' % (self.node, self.binary)) self.stop() self.start() @@ -425,20 +436,23 @@ class TestAgent(object): return False try: - self.send (["are_you_ok_dude"]) - self.read () + self.send_internal(["are_you_ok_dude"]) + self.read() self.started = True return True except RuntimeError, msg: self.started = False return False - + def start(self): '''Set up the given ScenarioComponent''' - self.env.debug('test agent: starting %s on node %s' % (self.binary, self.node)) + if self.status(): + return + self.env.debug('%s:%s starting ' % (self.node, self.binary)) + + self.rsh(self.node, self.binary, synchronous=False) self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM) ip = socket.gethostbyname(self.node) - self.rsh(self.node, self.binary, synchronous=False) is_connected = False retries = 0 while not is_connected: @@ -448,44 +462,52 @@ class TestAgent(object): is_connected = True except socket.error, msg: if retries > 10: - self.env.log("Tried connecting to %s on node %s %d times. %s" % (self.binary, self.node, retries, str(msg))) + self.env.log("%s:%s Tried connecting %d times. %s" % (self.node, self.binary, retries, str(msg))) if retries > 30: - raise RuntimeError("can't connect to " % self.binary) + raise RuntimeError("%s:%s can't connect" % (self.node, self.binary)) time.sleep(1) self.started = True self.used = False def stop(self): '''Tear down (undo) the given ScenarioComponent''' - self.env.debug('test agent: stopping %s on node %s' % (self.binary, self.node)) + self.env.debug('%s:%s stopping' % (self.binary, self.node)) self.rsh(self.node, "killall " + self.binary + " 2>/dev/null") - self.sock.close () - self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null") + if self.sock: + self.sock.close () + del self.sock + self.sock = None + while self.getpid() != '': + time.sleep(1) self.started = False def kill(self): '''Tear down (undo) the given ScenarioComponent''' - self.env.debug('test agent: killing %s on node %s' % (self.binary, self.node)) + self.env.log('%s:%s killing' % (self.node, self.binary)) self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null") self.started = False def getpid(self): return self.rsh(self.node, 'pidof ' + self.binary, 1) - def send (self, args): - if not self.started: - self.start() + def send_internal(self, args): real_msg = str (len (args)) for a in args: a_str = str(a) real_msg += ":" + str (len (a_str)) + ":" + a_str real_msg += ";" - sent = 0 try: - sent = self.sock.send (real_msg) + return self.sock.send (real_msg) except socket.error, msg: self.env.log("send(%s): %s; error: %s" % (self.node, real_msg, msg)) + return 0 + + def send (self, args): + if not self.started: + self.start() + + sent = self.send_internal(args) if sent == 0: raise RuntimeError ("socket connection broken") @@ -516,7 +538,7 @@ class TestAgent(object): def send_dynamic (self, *args): if not self.started: - self.start() + raise RuntimeError ("agent not started") # number of args+func real_msg = str (len (args) + 1) + ":" + str(len(self.func_name)) + ":" + self.func_name @@ -536,10 +558,10 @@ class TestAgent(object): def read (self): - msg = self.sock.recv (4096) - if msg == '': - raise RuntimeError("socket connection broken") - return msg + msg = self.sock.recv (4096) + if msg == '': + raise RuntimeError("socket connection broken") + return msg class CpgConfigEvent: @@ -553,9 +575,9 @@ class CpgConfigEvent: self.is_member = False else: self.is_member = True - + def __str__ (self): - + str = self.group_name + "," + self.node_id + "," + self.pid + "," if self.is_member: return str + "joined" @@ -571,36 +593,26 @@ class CpgTestAgent(TestAgent): self.nodeid = None def start(self): - if not self.started: + if not self.status(): TestAgent.start(self) self.cpg_initialize() self.used = False - def stop(self): - try: - if self.started: - self.cpg_finalize() - except RuntimeError, msg: - # if cpg_agent is down, we are not going to stress - self.env.debug("CpgTestAgent::cpg_finalize() - %s" % msg) - - TestAgent.stop(self) - def cpg_local_get(self): if self.nodeid == None: - self.send (["cpg_local_get"]) + self.send (["cpg_local_get"]) self.nodeid = self.read () return self.nodeid def record_config_events(self, truncate=True): if truncate: - self.send (["record_config_events", "truncate"]) + self.send (["record_config_events", "truncate"]) else: - self.send (["record_config_events", "append"]) + self.send (["record_config_events", "append"]) return self.read () def read_config_event(self): - self.send (["read_config_event"]) + self.send (["read_config_event"]) msg = self.read () if "None" in msg: @@ -609,7 +621,7 @@ class CpgTestAgent(TestAgent): return CpgConfigEvent(msg) def read_messages(self, atmost): - self.send (["read_messages", atmost]) + self.send (["read_messages", atmost]) msg = self.read () if "None" in msg: @@ -618,7 +630,7 @@ class CpgTestAgent(TestAgent): return msg def context_test(self): - self.send (["context_test"]) + self.send (["context_test"]) return self.read () ################################################################### -- 1.7.7.5 _______________________________________________ discuss mailing list discuss@xxxxxxxxxxxx http://lists.corosync.org/mailman/listinfo/discuss