Re: custom notifications

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Wolfgang,
Am Dienstag, den 15.02.2011, 10:37 +0100 schrieb Wolfgang Hennerbichler:
> Hi, 
> 
> I'd like to write a custom notification system (using xmmp or something like that, I don't know yet :)) for cyrus. 
> I've had a look at the notify_unix/simple_notify - file in the contrib-directory. It doesn't seem to work in my installation (the script doesn't log any notifications, although notifyd does for example notify zephyr, which works), or maybe I don't understand the concept of notifications. So I'd like to ask a couple of questions: 
> * does anybody have custom notifications up and running by reading the notification-socket of cyrus?
> * does notifyd need to be running in order to make the notify-socket readable, or is the notify-socket filled by the cyrus-master process? 
> * where would I find instructions on that? 
I have attached a python implementation of a notifyd, which we are using
together with cyrus imapd 2.2.12.

It has to be started as user cyrus (or whatever you are using to run
imapd with) and before you are starting cyrus imapd. 

It will try to remove an existing socket and recreate it, so it needs
enough rights to do it.

To resend the messages (events) it will need
http://code.google.com/p/stomppy/ and a stomp server. We are using
apache activemq for that.

Regards
 Felix

> 
> thanks a bunch, 
> wolfgang

#!/usr/bin/python
# -*- coding: utf8 -*-
'''notifyd fÃŒr cyrus imapd.

To use this daemon with cyrus imapd as a notifyd replacement, you have to

 * start the daemon before starting cyrus
 * install the daemon on the backends, if you are running it in a murder setup
 * set the values 'notifysocket' and 'mailnotifier' in /etc/imapd.conf
   e.g.
        notifysocket: /var/lib/imap/socket/notify
        mailnotifier: MAIL

 * start the daemon with the same user as imapd
'''
import logging
import Queue
import signal
import stomp
import socket
from threading import Thread
import time
from xml.sax.saxutils import escape as xml_escape

class QueueSender(Thread):
    """Sends messages from a python Queue to a stomp queue

    >>> sender = QueueSender([('127.0.0.2', 61613), ('localhost', 61613)], '/topic/test')
    >>> sender.start()
    >>> for i in range(1, 400):
    ...    if not sender.queue_message('this is a test %d' % i):
    ...        print "could not queue message %d" % i
    >>> sender.force_shutdown()
    >>> sender.join()
    """
    def __init__(self, config, destination, user=None, password=None):
        """Construktor for a QueueSender

        \param config configuration for the stomp-server
                 [('localhost', 61613), ('other.server.de', 61613)]

        \param destination name of the queue or topic
                 '/topic/test'
        """
        Thread.__init__(self)
        self.conn        = None
        self.config      = config
        self.user        = user
        self.password    = password
        self.destination = destination
        self.queue       = Queue.Queue(maxsize=300)

    def __get_connection(self):
        """tries to get a connection to the configured stomp server. 
        In case of exceptions, it will return None
        
        returns:
            configured connection, or None if an exception has occured
        """
        if self.conn:
            return self.conn
        self.conn = stomp.Connection(self.config, user=self.user, passcode=self.password)
        try:
            self.conn.start()
        except stomp.exception.ReconnectFailedException:
            logging.warn(
                "Probleme beim Verbinden mit dem Stomp Server. Probieren es spÀter noch einmal...")
            self.conn = None
            return
        self.conn.connect(wait=True)
        return self.conn

    def __invalidate_connection(self):
        self.conn = None

    def queue_message(self, message, headers={} ,timeout=3):
        """queues a message to the internal queue and will send it to
        the stomp server as soon as it can.

        \param message to be queued

        \param headers for the message, default empty dict

        \param timeout for the internal queue, default 3 seconds

        returns:
            True if message could be queued internally, False else
        """
        try:
            self.queue.put(MessageElement(message, headers), True, timeout)
        except Queue.Full:
            return False
        return True

    def force_shutdown(self):
        """forces a shutdown of the sender"""
        self.queue.put(KillElement())

    def __send(self, message):
        """tries to send a message to the configured stomp server
    
        \param message to be send by stomp queue

        returns:
            True if message could be sent, False else
        """
        conn = self.__get_connection()
        if not conn:
            return False
        try:
            conn.send(message.message, message.headers, destination=self.destination)
        except stomp.exception.NotConnectedException:
            logging.warn("Fehler beim Senden der Nachricht: %s" % message)
            self.__invalidate_connection()
            return False
        return True

    def run(self):
        while (True):
            try:
                message = self.queue.get(self, True)
            except Queue.Empty:
                logging.debug("Queue Empty. Sleep 1 second")
                time.sleep(1)
                continue
            if isinstance(message, MessageElement):
                if self.__send(message):
                    logging.debug("message sent: %s" % (message))
                else:
                    self.info("message NOT sent (retry?): %s" % (message))
                    time.sleep(10)
            if isinstance(message, KillElement):
                logging.info("got KillMessage...")
                if self.conn:
                    self.conn.disconnect()
                return

class QueueElement(object):
    """Base class to represent a message used for internal python queue"""
    pass

class KillElement(QueueElement):
    """Special Message to end the clients listening to queue"""
    pass

class MessageElement(QueueElement):
    """Message element, which contains a message"""
    def __init__(self, message, headers={}):
        self.message = message
        self.headers = headers

class Notifyd(object):
    '''Daemon to listen on a unix file socket for messages from cyrus-imapd. It will reemit
       those messages via stomp.

       >>> notifyd = Notifyd(notifyd_socket, queue_sender, ('always_bcc', ))
       >>> notifyd.run_loop()
    '''
    def __init__(self,
            socket_file_name,
            destination,
            ignore_users=[]):
        '''Constructor for notifyd

        \param socket_file_name
        Name of the unix file socket

        \param destination
        QueueSender, which reemits the messages

        \param ignore_users
        List of users, for which no events should be reemitted
        '''
        self.socket_file_name = socket_file_name
        self.destination      = destination
        self.ignore_users     = set(ignore_users)

    def run_loop(self):
        '''main loop in which we listen for new messages on the socket and
        route them to the destinatione.

        \param socket_file_name Name of the socket from which messages will be read
        \param destination Instance of a QueueSender's
        '''
        notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
        notify_socket.bind(self.socket_file_name)
        while True:
            data = notify_socket.recv(8192).split('\0')
            data_dict = {}
            key_nr=0
            for key in ('method', 'message_class', 
                'priority', 'user', 'mailbox', 
                'nopt', 'message', 'opts'):
                data_dict[key] = xml_escape(data[key_nr])
                key_nr+=1
            if data_dict['user'] in self.ignore_users:
                logging.debug("Ignorier message: %s an %s" % (data_dict, data_dict['user']))
            else: 
                logging.debug("Send message: %s an %s" % (data_dict, data_dict['user']))
                self.destination.queue_message(data_dict,
                                               headers={
                                                 'user': data_dict['user'], 
                                                 'mailbox': data_dict['mailbox'],
                                               })

class SignalHandler(object):
    '''May be used as a signal handler, to kill the queue
    '''
    def __init__(self, queue):
        '''Constructor for SignalHandler

        \param queue that should be killed
        '''
        self.queue = queue

    def handle_signal(self, signo, frame):
        if signo in (signal.SIGTERM, signal.SIGKILL):
            self.queue.force_shutdown()

if __name__ == '__main__':
    import ConfigParser
    import os
    import os.path
    import logging.config
    import logging.handlers
    config=ConfigParser.ConfigParser()
    config.read('/etc/notifyd.ini')
    try:
        logging.config.fileConfig(config.get('logging', 'file'))
        logger = logging.getLogger(config.get('logging', 'handler'))
    except Exception, e:
        print "Using default config for logging. Reason: %s: %s" % (Exception, e)
        logging.basicConfig()
        logger = logging.getLogger()
        logger.setLevel(logging.INFO)
        hdlr = logging.handlers.SysLogHandler(facility=logging.handlers.SysLogHandler.LOG_DAEMON)
        formatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s')
        hdlr.setFormatter(formatter)
        logger.addHandler(hdlr)
    stomp_host        = config.get('stomp', 'host')
    stomp_port        = config.getint('stomp', 'port')
    stomp_user        = config.get('stomp', 'user')
    stomp_password    = config.get('stomp', 'password')
    stomp_destination = config.get('stomp', 'destination')
    notifyd_socket    = config.get('notifyd', 'socket')
    if config.has_option('notifyd', 'ignore_users'):
        ignore_users = [ x.strip() for x in config.get('notifyd', 'ignore_users').split(',')]
    else:
        ignore_users = []
    if os.path.exists(notifyd_socket):
        os.unlink(notifyd_socket)    
    sender = QueueSender([(stomp_host, stomp_port)], 
        stomp_destination, 
        stomp_user, 
        stomp_password)
    sender.start()
    sig_handler = SignalHandler(sender)
    for sig in (signal.SIGTERM, ):
        signal.signal(sig, sig_handler.handle_signal)
    try:
        notifyd = Notifyd(notifyd_socket, sender, ignore_users)
        notifyd.run_loop()
    except Exception, e:
        logging.error("Error in run_loop: %s" % e)
    sender.force_shutdown()
    sender.join()
[stomp]
host=activemq.server.name
port=61613
user=guest
password=password
destination=/queue/GUEST.new_mails
[notifyd]
socket=/var/lib/imap/socket/notify
ignore_users=always_bcc
[logging]
file=/etc/notifyd.logging
handler=notifyd

# Loggingkonfiguration fuer notifyd und python-logging modul
[loggers]
keys=root,notifyd

[handlers]
keys=syslog

[formatters]
keys=form05

[logger_root]
level=NOTSET
handlers=syslog

[logger_notifyd]
level=INFO
handlers=syslog
qualname=notifyd
channel=notifyd
parent=root
propagate=0

[handler_syslog]
class=handlers.SysLogHandler
level=WARN
formatter=form05
args=(('localhost', handlers.SYSLOG_UDP_PORT), handlers.SysLogHandler.LOG_USER)

[formatter_form05]
format="%(name)s: %(asctime)s %(levelname)s %(message)s"
datefmt=
----
Cyrus Home Page: http://www.cyrusimap.org/
List Archives/Info: http://lists.andrew.cmu.edu/pipermail/info-cyrus/

[Index of Archives]     [Cyrus SASL]     [Squirrel Mail]     [Asterisk PBX]     [Video For Linux]     [Photo]     [Yosemite News]     [gtk]     [KDE]     [Gimp on Windows]     [Steve's Art]

  Powered by Linux