how to use ioqueue for tcp server

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



I would like to make a tcp server with a pool of worker threads using the
ioqueue framework, but I am not exactly sure how to make it work.  The code
below (with error checking, etc. omitted) does the following :

   - pj_caching_pool_init()
   - pj_pool_create()
   - pj_sock_socket()
   - pj_sockaddr_in_init()
   - pj_sock_bind()
   - pj_sock_listen()
   - pj_ioqueue_create()
   - pj_ioqueue_register_sock()

The pj_ioqueue_callback argument to pj_ioqueue_register_sock() points to
handlers for each event type.  The main thread then creates a pool of
threads and waits forever (for now) while each of the spawned threads does
the following :

   - pj_ioqueue_op_key_init()
   - pj_ioqueue_accept()

Each thread then calls pj_ioqueue_poll() in a loop to process events.

The on_accept_complete handler calls pj_pool_calloc() to allocate buffer
then calls pj_ioqueue_recv()/pj_ioqueue_recv  from() -- I have tried both --
assuming that the on_read_complete handler will receive an event when
pj_ioqueue_poll() runs.

Unfortunately, that much does not quite work.  On Windows, the status of
pj_ioqueue_recv()/pj_ioqueue_recvfrom() is OS error 10057 ("Socket is not
connected").  On Linux, the status
of pj_ioqueue_recv()/pj_ioqueue_recvfrom() is PJ_EPENDING, but
the on_read_complete handler never gets called.

I am sure I am doing something wrong, but I do not know what.  Any advice
would be appreciated.

-- gifford


// endpoint.c
//
#include <pjlib.h>


typedef struct
{
pj_caching_pool m_caching_pool ;
pj_pool_t * m_pPool ;
pj_atomic_t * m_pAtomic ;
pj_ioqueue_t * m_pQueue ;
pj_ioqueue_key_t * m_pKey ;
 pj_thread_t ** m_pThreads ;
pj_sock_t m_socket ;

int m_nThreads ;

} endpoint_t ;


void endpoint_accept( pj_ioqueue_key_t * pKey, pj_ioqueue_op_key_t * pOp,
pj_sock_t socket, pj_status_t status )
{
endpoint_t * pEndpoint = ( endpoint_t * ) pj_ioqueue_get_user_data( pKey ) ;

pj_ssize_t nData = 4096 ;

PJ_LOG( 4, ( __FILE__, "endpoint_accept()" ) ) ;

pOp->user_data = pj_pool_calloc( pEndpoint->m_pPool, 1, nData ) ;

status = pj_ioqueue_recvfrom( pKey, pOp, pOp->user_data, &( nData ),
PJ_IOQUEUE_ALWAYS_ASYNC, 0, 0 ) ;

switch( status )
 {
case PJ_SUCCESS :
PJ_LOG( 4, ( __FILE__, "pj_ioqueue_recv() : PJ_SUCCESS" ) ) ;
 break ;

case PJ_EPENDING :
PJ_LOG( 4, ( __FILE__, "pj_ioqueue_recv() : PJ_EPENDING" ) ) ;
 break ;

default :
PJ_LOG( 4, ( __FILE__, "pj_ioqueue_recv() : %d", PJ_STATUS_TO_OS( status ) )
) ;
 break ;

} // switch

} // endpoint_accept()


void endpoint_connect( pj_ioqueue_key_t * pKey, pj_status_t status )
{
PJ_LOG( 4, ( __FILE__, "endpoint_connect()" ) ) ;

} // endpoint_connect()


void endpoint_read( pj_ioqueue_key_t * pKey, pj_ioqueue_op_key_t * pOp,
pj_ssize_t nBytes )
{
PJ_LOG( 4, ( __FILE__, "endpoint_read()" ) ) ;

} // endpoint_read()


void endpoint_write( pj_ioqueue_key_t * pKey, pj_ioqueue_op_key_t * pOp,
pj_ssize_t nBytes )
{
PJ_LOG( 4, ( __FILE__, "endpoint_write()" ) ) ;

} // endpoint_write()


int endpoint_thread( endpoint_t * pEndpoint )
{
pj_sock_t sock ;
pj_sockaddr_in addr ;
pj_status_t status ;

pj_time_val timeout ;

timeout.sec = 1 ;
timeout.msec = 0 ;

while( pj_atomic_get( pEndpoint->m_pAtomic ) )
{
pj_ioqueue_op_key_t * pOp = ( pj_ioqueue_op_key_t * ) pj_pool_calloc(
pEndpoint->m_pPool, 1, sizeof( *( pOp ) ) ) ;

pj_ioqueue_op_key_init( pOp, sizeof( *( pOp ) ) ) ;

status = pj_ioqueue_accept( pEndpoint->m_pKey, pOp, &( sock ), 0, &( addr ),
0 ) ;

switch( status )
{
case PJ_SUCCESS :
PJ_LOG( 4, ( __FILE__, "pj_ioqueue_accept() : PJ_SUCCESS" ) ) ;
break ;

case PJ_EPENDING :
PJ_LOG( 4, ( __FILE__, "pj_ioqueue_accept() : PJ_EPENDING" ) ) ;
 break ;

default :
PJ_LOG( 4, ( __FILE__, "pj_ioqueue_accept() : %d", PJ_STATUS_TO_OS( status )
) ) ;
 break ;

} // switch

 while( pj_atomic_get( pEndpoint->m_pAtomic ) )
{
pj_ioqueue_poll( pEndpoint->m_pQueue, &( timeout ) ) ;

} // while

} // while

return 0 ;

} // endpoint_thread()


void endpoint_create( endpoint_t * pEndpoint )
{
pj_ioqueue_callback callback ;
 pj_sockaddr_in address ;

int nThread ;
int nAddress = sizeof( address ) ;

pEndpoint->m_nThreads = 4 ;

 pj_init() ;

pj_caching_pool_init( &( pEndpoint->m_caching_pool ), 0, 4096 * 4096 ) ;

pEndpoint->m_pPool = pj_pool_create( ( pj_pool_factory * ) &(
pEndpoint->m_caching_pool ), 0, 4096, 4096, 0 ) ;

pj_sock_socket( pj_AF_INET(), pj_SOCK_STREAM(), 0, &( pEndpoint->m_socket )
) ;

pj_sockaddr_in_init( &( address ), 0, 0 ) ;

pj_sock_bind( pEndpoint->m_socket, &( address ), nAddress ) ;

pj_sock_getsockname( pEndpoint->m_socket, &( address ), &( nAddress ) ) ;
PJ_LOG( 4, ( __FILE__, "pj_sockaddr_get_port() : %u", pj_sockaddr_get_port(
&( address ) ) ) ) ;

pj_sock_listen( pEndpoint->m_socket, 4096 ) ;

pj_ioqueue_create( pEndpoint->m_pPool, 4096, &( pEndpoint->m_pQueue ) ) ;

callback.on_accept_complete = &( endpoint_accept ) ;
callback.on_connect_complete = &( endpoint_connect ) ;
 callback.on_read_complete = &( endpoint_read ) ;
callback.on_write_complete = &( endpoint_write ) ;

pj_ioqueue_register_sock( pEndpoint->m_pPool, pEndpoint->m_pQueue,
pEndpoint->m_socket, pEndpoint, &( callback ), &( pEndpoint->m_pKey ) ) ;

pj_ioqueue_set_user_data( pEndpoint->m_pKey, pEndpoint, 0 ) ;

pj_atomic_create( pEndpoint->m_pPool, 1, &( pEndpoint->m_pAtomic ) ) ;

pEndpoint->m_pThreads = ( pj_thread_t ** ) pj_pool_alloc(
pEndpoint->m_pPool, pEndpoint->m_nThreads * sizeof( *( pEndpoint->m_pThreads
) ) ) ;

for( nThread = 0; nThread < pEndpoint->m_nThreads; ++( nThread ) )
{
 pj_thread_create( pEndpoint->m_pPool, 0, ( pj_thread_proc * ) &(
endpoint_thread ), pEndpoint, 0, 0, &( pEndpoint->m_pThreads[ nThread ] ) )
;
}

while( pj_atomic_get( pEndpoint->m_pAtomic ) )
{
pj_thread_sleep( 1000 ) ;

} // while

} // endpoint_create()


void endpoint_delete( endpoint_t * pEndpoint )
{
int nThread ;

pj_atomic_set( pEndpoint->m_pAtomic, 0 ) ;

pj_ioqueue_unregister( pEndpoint->m_pKey ) ;

pj_ioqueue_destroy( pEndpoint->m_pQueue ) ;

pj_sock_close( pEndpoint->m_socket ) ;

for( nThread = 0; nThread < pEndpoint->m_nThreads; ++( nThread ) )
{
 pj_ioqueue_post_completion( pEndpoint->m_pKey, 0, 0 ) ;
}

for( nThread = 0; nThread < pEndpoint->m_nThreads; ++( nThread ) )
{
pj_thread_join( pEndpoint->m_pThreads[ nThread ] ) ;
 pj_thread_destroy( pEndpoint->m_pThreads[ nThread ] ) ;
}

pj_atomic_destroy( pEndpoint->m_pAtomic ) ;

pj_pool_release( pEndpoint->m_pPool ) ;

pj_caching_pool_destroy( &( pEndpoint->m_caching_pool ) ) ;

pj_shutdown() ;

} // endpoint_delete()


int main( int nArgs, char ** pArgs )
{
endpoint_t Endpoint ;

endpoint_create( &( Endpoint ) ) ;

endpoint_delete( &( Endpoint ) ) ;

return 0 ;

} // main()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.pjsip.org/pipermail/pjsip_lists.pjsip.org/attachments/20110902/2f5d347e/attachment.html>


[Index of Archives]     [Asterisk Users]     [Asterisk App Development]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [Linux API]
  Powered by Linux