Hi Wolfgang, Am Dienstag, den 15.02.2011, 10:37 +0100 schrieb Wolfgang Hennerbichler: > Hi, > > I'd like to write a custom notification system (using xmmp or something like that, I don't know yet :)) for cyrus. > I've had a look at the notify_unix/simple_notify - file in the contrib-directory. It doesn't seem to work in my installation (the script doesn't log any notifications, although notifyd does for example notify zephyr, which works), or maybe I don't understand the concept of notifications. So I'd like to ask a couple of questions: > * does anybody have custom notifications up and running by reading the notification-socket of cyrus? > * does notifyd need to be running in order to make the notify-socket readable, or is the notify-socket filled by the cyrus-master process? > * where would I find instructions on that? I have attached a python implementation of a notifyd, which we are using together with cyrus imapd 2.2.12. It has to be started as user cyrus (or whatever you are using to run imapd with) and before you are starting cyrus imapd. It will try to remove an existing socket and recreate it, so it needs enough rights to do it. To resend the messages (events) it will need http://code.google.com/p/stomppy/ and a stomp server. We are using apache activemq for that. Regards Felix > > thanks a bunch, > wolfgang
#!/usr/bin/python # -*- coding: utf8 -*- '''notifyd fÌr cyrus imapd. To use this daemon with cyrus imapd as a notifyd replacement, you have to * start the daemon before starting cyrus * install the daemon on the backends, if you are running it in a murder setup * set the values 'notifysocket' and 'mailnotifier' in /etc/imapd.conf e.g. notifysocket: /var/lib/imap/socket/notify mailnotifier: MAIL * start the daemon with the same user as imapd ''' import logging import Queue import signal import stomp import socket from threading import Thread import time from xml.sax.saxutils import escape as xml_escape class QueueSender(Thread): """Sends messages from a python Queue to a stomp queue >>> sender = QueueSender([('127.0.0.2', 61613), ('localhost', 61613)], '/topic/test') >>> sender.start() >>> for i in range(1, 400): ... if not sender.queue_message('this is a test %d' % i): ... print "could not queue message %d" % i >>> sender.force_shutdown() >>> sender.join() """ def __init__(self, config, destination, user=None, password=None): """Construktor for a QueueSender \param config configuration for the stomp-server [('localhost', 61613), ('other.server.de', 61613)] \param destination name of the queue or topic '/topic/test' """ Thread.__init__(self) self.conn = None self.config = config self.user = user self.password = password self.destination = destination self.queue = Queue.Queue(maxsize=300) def __get_connection(self): """tries to get a connection to the configured stomp server. In case of exceptions, it will return None returns: configured connection, or None if an exception has occured """ if self.conn: return self.conn self.conn = stomp.Connection(self.config, user=self.user, passcode=self.password) try: self.conn.start() except stomp.exception.ReconnectFailedException: logging.warn( "Probleme beim Verbinden mit dem Stomp Server. Probieren es spÀter noch einmal...") self.conn = None return self.conn.connect(wait=True) return self.conn def __invalidate_connection(self): self.conn = None def queue_message(self, message, headers={} ,timeout=3): """queues a message to the internal queue and will send it to the stomp server as soon as it can. \param message to be queued \param headers for the message, default empty dict \param timeout for the internal queue, default 3 seconds returns: True if message could be queued internally, False else """ try: self.queue.put(MessageElement(message, headers), True, timeout) except Queue.Full: return False return True def force_shutdown(self): """forces a shutdown of the sender""" self.queue.put(KillElement()) def __send(self, message): """tries to send a message to the configured stomp server \param message to be send by stomp queue returns: True if message could be sent, False else """ conn = self.__get_connection() if not conn: return False try: conn.send(message.message, message.headers, destination=self.destination) except stomp.exception.NotConnectedException: logging.warn("Fehler beim Senden der Nachricht: %s" % message) self.__invalidate_connection() return False return True def run(self): while (True): try: message = self.queue.get(self, True) except Queue.Empty: logging.debug("Queue Empty. Sleep 1 second") time.sleep(1) continue if isinstance(message, MessageElement): if self.__send(message): logging.debug("message sent: %s" % (message)) else: self.info("message NOT sent (retry?): %s" % (message)) time.sleep(10) if isinstance(message, KillElement): logging.info("got KillMessage...") if self.conn: self.conn.disconnect() return class QueueElement(object): """Base class to represent a message used for internal python queue""" pass class KillElement(QueueElement): """Special Message to end the clients listening to queue""" pass class MessageElement(QueueElement): """Message element, which contains a message""" def __init__(self, message, headers={}): self.message = message self.headers = headers class Notifyd(object): '''Daemon to listen on a unix file socket for messages from cyrus-imapd. It will reemit those messages via stomp. >>> notifyd = Notifyd(notifyd_socket, queue_sender, ('always_bcc', )) >>> notifyd.run_loop() ''' def __init__(self, socket_file_name, destination, ignore_users=[]): '''Constructor for notifyd \param socket_file_name Name of the unix file socket \param destination QueueSender, which reemits the messages \param ignore_users List of users, for which no events should be reemitted ''' self.socket_file_name = socket_file_name self.destination = destination self.ignore_users = set(ignore_users) def run_loop(self): '''main loop in which we listen for new messages on the socket and route them to the destinatione. \param socket_file_name Name of the socket from which messages will be read \param destination Instance of a QueueSender's ''' notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) notify_socket.bind(self.socket_file_name) while True: data = notify_socket.recv(8192).split('\0') data_dict = {} key_nr=0 for key in ('method', 'message_class', 'priority', 'user', 'mailbox', 'nopt', 'message', 'opts'): data_dict[key] = xml_escape(data[key_nr]) key_nr+=1 if data_dict['user'] in self.ignore_users: logging.debug("Ignorier message: %s an %s" % (data_dict, data_dict['user'])) else: logging.debug("Send message: %s an %s" % (data_dict, data_dict['user'])) self.destination.queue_message(data_dict, headers={ 'user': data_dict['user'], 'mailbox': data_dict['mailbox'], }) class SignalHandler(object): '''May be used as a signal handler, to kill the queue ''' def __init__(self, queue): '''Constructor for SignalHandler \param queue that should be killed ''' self.queue = queue def handle_signal(self, signo, frame): if signo in (signal.SIGTERM, signal.SIGKILL): self.queue.force_shutdown() if __name__ == '__main__': import ConfigParser import os import os.path import logging.config import logging.handlers config=ConfigParser.ConfigParser() config.read('/etc/notifyd.ini') try: logging.config.fileConfig(config.get('logging', 'file')) logger = logging.getLogger(config.get('logging', 'handler')) except Exception, e: print "Using default config for logging. Reason: %s: %s" % (Exception, e) logging.basicConfig() logger = logging.getLogger() logger.setLevel(logging.INFO) hdlr = logging.handlers.SysLogHandler(facility=logging.handlers.SysLogHandler.LOG_DAEMON) formatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s') hdlr.setFormatter(formatter) logger.addHandler(hdlr) stomp_host = config.get('stomp', 'host') stomp_port = config.getint('stomp', 'port') stomp_user = config.get('stomp', 'user') stomp_password = config.get('stomp', 'password') stomp_destination = config.get('stomp', 'destination') notifyd_socket = config.get('notifyd', 'socket') if config.has_option('notifyd', 'ignore_users'): ignore_users = [ x.strip() for x in config.get('notifyd', 'ignore_users').split(',')] else: ignore_users = [] if os.path.exists(notifyd_socket): os.unlink(notifyd_socket) sender = QueueSender([(stomp_host, stomp_port)], stomp_destination, stomp_user, stomp_password) sender.start() sig_handler = SignalHandler(sender) for sig in (signal.SIGTERM, ): signal.signal(sig, sig_handler.handle_signal) try: notifyd = Notifyd(notifyd_socket, sender, ignore_users) notifyd.run_loop() except Exception, e: logging.error("Error in run_loop: %s" % e) sender.force_shutdown() sender.join()
[stomp] host=activemq.server.name port=61613 user=guest password=password destination=/queue/GUEST.new_mails [notifyd] socket=/var/lib/imap/socket/notify ignore_users=always_bcc [logging] file=/etc/notifyd.logging handler=notifyd
# Loggingkonfiguration fuer notifyd und python-logging modul [loggers] keys=root,notifyd [handlers] keys=syslog [formatters] keys=form05 [logger_root] level=NOTSET handlers=syslog [logger_notifyd] level=INFO handlers=syslog qualname=notifyd channel=notifyd parent=root propagate=0 [handler_syslog] class=handlers.SysLogHandler level=WARN formatter=form05 args=(('localhost', handlers.SYSLOG_UDP_PORT), handlers.SysLogHandler.LOG_USER) [formatter_form05] format="%(name)s: %(asctime)s %(levelname)s %(message)s" datefmt=
---- Cyrus Home Page: http://www.cyrusimap.org/ List Archives/Info: http://lists.andrew.cmu.edu/pipermail/info-cyrus/