Reviewed-by: Steven Dake <sdake@xxxxxxxxxx> On 01/24/2012 06:15 AM, Angus Salkeld wrote: > Signed-off-by: Angus Salkeld <asalkeld@xxxxxxxxxx> > --- > cts/corosync.py | 110 ++++++++++++++++++++++++++++++------------------------ > 1 files changed, 61 insertions(+), 49 deletions(-) > > diff --git a/cts/corosync.py b/cts/corosync.py > index 2b79b23..89b0808 100644 > --- a/cts/corosync.py > +++ b/cts/corosync.py > @@ -258,11 +258,15 @@ class corosync_needle(ClusterManager): > # 2 - up and stable > # 1 - unstable > # 0 - down > - out = self.rsh(node, self["StatusCmd"], 1) > - is_stopped = string.find(out, 'stopped') > - is_dead = string.find(out, 'dead') > + (rc, lines) = self.rsh(node, self["StatusCmd"], stdout=2) > + out = str(lines) > > - ret = (is_dead is -1 and is_stopped is -1) > + if 'systemctl' in out: > + ret= (string.find(out, 'inactive (dead)') == -1) > + else: > + is_stopped = string.find(out, 'stopped') > + is_dead = string.find(out, 'dead') > + ret = (is_dead is -1 and is_stopped is -1) > > try: > if ret: > @@ -373,10 +377,16 @@ class TestAgentComponent(ScenarioComponent): > raise RuntimeError ("corosync not up") > > if self.CM.start_cpg: > - self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env) > - self.CM.cpg_agent[node].start() > - self.CM.sam_agent[node] = SamTestAgent(node, CM.Env) > - self.CM.sam_agent[node].start() > + if self.CM.cpg_agent.has_key(node): > + self.CM.cpg_agent[node].clean_start() > + else: > + self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env) > + self.CM.cpg_agent[node].start() > + if self.CM.sam_agent.has_key(node): > + self.CM.sam_agent[node].clean_start() > + else: > + self.CM.sam_agent[node] = SamTestAgent(node, CM.Env) > + self.CM.sam_agent[node].start() > # votequorum agent started as needed. > if self.CM.applied_config.has_key('quorum/provider'): > if CM.applied_config['quorum/provider'] is 'corosync_votequorum': > @@ -411,12 +421,13 @@ class TestAgent(object): > self.send_recv = False > > def restart(self): > + self.env.debug('%s:%s restarting' % (self.node, self.binary)) > self.stop() > self.start() > > def clean_start(self): > if self.used or not self.status(): > - self.env.debug('test agent: cleaning %s on node %s' % (self.binary, self.node)) > + self.env.debug('%s:%s cleaning' % (self.node, self.binary)) > self.stop() > self.start() > > @@ -425,20 +436,23 @@ class TestAgent(object): > return False > > try: > - self.send (["are_you_ok_dude"]) > - self.read () > + self.send_internal(["are_you_ok_dude"]) > + self.read() > self.started = True > return True > except RuntimeError, msg: > self.started = False > return False > - > + > def start(self): > '''Set up the given ScenarioComponent''' > - self.env.debug('test agent: starting %s on node %s' % (self.binary, self.node)) > + if self.status(): > + return > + self.env.debug('%s:%s starting ' % (self.node, self.binary)) > + > + self.rsh(self.node, self.binary, synchronous=False) > self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM) > ip = socket.gethostbyname(self.node) > - self.rsh(self.node, self.binary, synchronous=False) > is_connected = False > retries = 0 > while not is_connected: > @@ -448,44 +462,52 @@ class TestAgent(object): > is_connected = True > except socket.error, msg: > if retries > 10: > - self.env.log("Tried connecting to %s on node %s %d times. %s" % (self.binary, self.node, retries, str(msg))) > + self.env.log("%s:%s Tried connecting %d times. %s" % (self.node, self.binary, retries, str(msg))) > if retries > 30: > - raise RuntimeError("can't connect to " % self.binary) > + raise RuntimeError("%s:%s can't connect" % (self.node, self.binary)) > time.sleep(1) > self.started = True > self.used = False > > def stop(self): > '''Tear down (undo) the given ScenarioComponent''' > - self.env.debug('test agent: stopping %s on node %s' % (self.binary, self.node)) > + self.env.debug('%s:%s stopping' % (self.binary, self.node)) > self.rsh(self.node, "killall " + self.binary + " 2>/dev/null") > - self.sock.close () > - self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null") > + if self.sock: > + self.sock.close () > + del self.sock > + self.sock = None > + while self.getpid() != '': > + time.sleep(1) > self.started = False > > def kill(self): > '''Tear down (undo) the given ScenarioComponent''' > - self.env.debug('test agent: killing %s on node %s' % (self.binary, self.node)) > + self.env.log('%s:%s killing' % (self.node, self.binary)) > self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null") > self.started = False > > def getpid(self): > return self.rsh(self.node, 'pidof ' + self.binary, 1) > > - def send (self, args): > - if not self.started: > - self.start() > + def send_internal(self, args): > > real_msg = str (len (args)) > for a in args: > a_str = str(a) > real_msg += ":" + str (len (a_str)) + ":" + a_str > real_msg += ";" > - sent = 0 > try: > - sent = self.sock.send (real_msg) > + return self.sock.send (real_msg) > except socket.error, msg: > self.env.log("send(%s): %s; error: %s" % (self.node, real_msg, msg)) > + return 0 > + > + def send (self, args): > + if not self.started: > + self.start() > + > + sent = self.send_internal(args) > > if sent == 0: > raise RuntimeError ("socket connection broken") > @@ -516,7 +538,7 @@ class TestAgent(object): > > def send_dynamic (self, *args): > if not self.started: > - self.start() > + raise RuntimeError ("agent not started") > > # number of args+func > real_msg = str (len (args) + 1) + ":" + str(len(self.func_name)) + ":" + self.func_name > @@ -536,10 +558,10 @@ class TestAgent(object): > > def read (self): > > - msg = self.sock.recv (4096) > - if msg == '': > - raise RuntimeError("socket connection broken") > - return msg > + msg = self.sock.recv (4096) > + if msg == '': > + raise RuntimeError("socket connection broken") > + return msg > > > class CpgConfigEvent: > @@ -553,9 +575,9 @@ class CpgConfigEvent: > self.is_member = False > else: > self.is_member = True > - > + > def __str__ (self): > - > + > str = self.group_name + "," + self.node_id + "," + self.pid + "," > if self.is_member: > return str + "joined" > @@ -571,36 +593,26 @@ class CpgTestAgent(TestAgent): > self.nodeid = None > > def start(self): > - if not self.started: > + if not self.status(): > TestAgent.start(self) > self.cpg_initialize() > self.used = False > > - def stop(self): > - try: > - if self.started: > - self.cpg_finalize() > - except RuntimeError, msg: > - # if cpg_agent is down, we are not going to stress > - self.env.debug("CpgTestAgent::cpg_finalize() - %s" % msg) > - > - TestAgent.stop(self) > - > def cpg_local_get(self): > if self.nodeid == None: > - self.send (["cpg_local_get"]) > + self.send (["cpg_local_get"]) > self.nodeid = self.read () > return self.nodeid > > def record_config_events(self, truncate=True): > if truncate: > - self.send (["record_config_events", "truncate"]) > + self.send (["record_config_events", "truncate"]) > else: > - self.send (["record_config_events", "append"]) > + self.send (["record_config_events", "append"]) > return self.read () > > def read_config_event(self): > - self.send (["read_config_event"]) > + self.send (["read_config_event"]) > msg = self.read () > > if "None" in msg: > @@ -609,7 +621,7 @@ class CpgTestAgent(TestAgent): > return CpgConfigEvent(msg) > > def read_messages(self, atmost): > - self.send (["read_messages", atmost]) > + self.send (["read_messages", atmost]) > msg = self.read () > > if "None" in msg: > @@ -618,7 +630,7 @@ class CpgTestAgent(TestAgent): > return msg > > def context_test(self): > - self.send (["context_test"]) > + self.send (["context_test"]) > return self.read () > > ################################################################### _______________________________________________ discuss mailing list discuss@xxxxxxxxxxxx http://lists.corosync.org/mailman/listinfo/discuss