how to use ioqueue for tcp server

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



 
I would like to write a tcp server with a pool of worker threads using the ioqueue framework, but I am not exactly sure how to make it work.  The code below (with error checking, etc. omitted) does the following :

pj_caching_pool_init()pj_pool_create()pj_sock_socket()pj_sockaddr_in_init()pj_sock_bind()pj_sock_listen()pj_ioqueue_create()pj_ioqueue_register_sock()

The pj_ioqueue_callback argument to pj_ioqueue_register_sock() points to handlers for each event type.  The main thread then creates a pool of threads and waits forever (for now) while each of the spawned threads does the following :

pj_ioqueue_op_key_init()pj_ioqueue_accept() 
Each thread then calls pj_ioqueue_poll() in a loop to process events.
The on_accept_complete handler calls pj_pool_calloc() to allocate buffer then calls pj_ioqueue_recv()/pj_ioqueue_recv
 from() -- I have tried both -- assuming that the on_read_complete handler will receive an event when pj_ioqueue_poll() runs.

Unfortunately, that much does not quite work.  On Windows, the status of pj_ioqueue_recv()/pj_ioqueue_recvfrom() is OS error 10057 ("Socket is not connected").  On Linux, the status of pj_ioqueue_recv()/pj_ioqueue_recvfrom() is PJ_EPENDING, but the on_read_complete handler never gets called.

I am sure I am doing something wrong, but I do not know what.  Any advice would be appreciated.
-- gifford

// endpoint.c
//#include <pjlib.h>

typedef struct{	pj_caching_pool m_caching_pool ;
	pj_pool_t * m_pPool ;	pj_atomic_t * m_pAtomic ;	pj_ioqueue_t * m_pQueue ;	pj_ioqueue_key_t * m_pKey ;
	pj_thread_t ** m_pThreads ;	pj_sock_t m_socket ;

	int m_nThreads ;
} endpoint_t ;

void endpoint_accept( pj_ioqueue_key_t * pKey, pj_ioqueue_op_key_t * pOp, pj_sock_t socket, pj_status_t status ) 
{	endpoint_t * pEndpoint = ( endpoint_t * ) pj_ioqueue_get_user_data( pKey ) ;

	pj_ssize_t nData = 4096 ;
	PJ_LOG( 4, ( __FILE__, "endpoint_accept()" ) ) ;

	pOp->user_data = pj_pool_calloc( pEndpoint->m_pPool, 1, nData ) ;

	status = pj_ioqueue_recvfrom( pKey, pOp, pOp->user_data, &( nData ), PJ_IOQUEUE_ALWAYS_ASYNC, 0, 0 ) ;
	switch( status )
	{		case PJ_SUCCESS :			PJ_LOG( 4, ( __FILE__, "pj_ioqueue_recv() : PJ_SUCCESS" ) ) ;
			break ;
		case PJ_EPENDING :			PJ_LOG( 4, ( __FILE__, "pj_ioqueue_recv() : PJ_EPENDING" ) ) ;
			break ;
		default :			PJ_LOG( 4, ( __FILE__, "pj_ioqueue_recv() : %d", PJ_STATUS_TO_OS( status ) ) ) ;
			break ;
	} // switch

} // endpoint_accept()

void endpoint_connect( pj_ioqueue_key_t * pKey, pj_status_t status ) {	PJ_LOG( 4, ( __FILE__, "endpoint_connect()" ) ) ;

} // endpoint_connect()

void endpoint_read( pj_ioqueue_key_t * pKey, pj_ioqueue_op_key_t * pOp, pj_ssize_t nBytes ) {
	PJ_LOG( 4, ( __FILE__, "endpoint_read()" ) ) ;
} // endpoint_read()

void endpoint_write( pj_ioqueue_key_t * pKey, pj_ioqueue_op_key_t * pOp, pj_ssize_t nBytes ) 
{	PJ_LOG( 4, ( __FILE__, "endpoint_write()" ) ) ;
} // endpoint_write()

int endpoint_thread( endpoint_t * pEndpoint ) 
{	pj_sock_t sock ;	pj_sockaddr_in addr ;	pj_status_t status ;
	pj_time_val timeout ;

	timeout.sec = 1 ;	timeout.msec = 0 ;
	while( pj_atomic_get( pEndpoint->m_pAtomic ) )	{		pj_ioqueue_op_key_t * pOp = ( pj_ioqueue_op_key_t * ) pj_pool_calloc( pEndpoint->m_pPool, 1, sizeof( *( pOp ) ) ) ;

		pj_ioqueue_op_key_init( pOp, sizeof( *( pOp ) ) ) ;

		status = pj_ioqueue_accept( pEndpoint->m_pKey, pOp, &( sock ), 0, &( addr ), 0 ) ;
		switch( status )
		{		case PJ_SUCCESS :			PJ_LOG( 4, ( __FILE__, "pj_ioqueue_accept() : PJ_SUCCESS" ) ) ;
			break ;
		case PJ_EPENDING :			PJ_LOG( 4, ( __FILE__, "pj_ioqueue_accept() : PJ_EPENDING" ) ) ;
			break ;
		default :			PJ_LOG( 4, ( __FILE__, "pj_ioqueue_accept() : %d", PJ_STATUS_TO_OS( status ) ) ) ;
			break ;
		} // switch

		while( pj_atomic_get( pEndpoint->m_pAtomic ) )		{			pj_ioqueue_poll( pEndpoint->m_pQueue, &( timeout ) ) ;

		} // while
	} // while
	return 0 ;

} // endpoint_thread()


void endpoint_create( endpoint_t * pEndpoint ) {	pj_ioqueue_callback callback ;
	pj_sockaddr_in address ;  	int nThread ;
	int nAddress = sizeof( address ) ;
pEndpoint->m_nThreads = 4 ;

	pj_init() ;
	pj_caching_pool_init( &( pEndpoint->m_caching_pool ), 0, 4096 * 4096 ) ;

	pEndpoint->m_pPool = pj_pool_create( ( pj_pool_factory * ) &( pEndpoint->m_caching_pool ), 0, 4096, 4096, 0 ) ;

	pj_sock_socket( pj_AF_INET(), pj_SOCK_STREAM(), 0, &( pEndpoint->m_socket ) ) ;

	pj_sockaddr_in_init( &( address ), 0, 0 ) ;
	pj_sock_bind( pEndpoint->m_socket, &( address ), nAddress ) ;

	pj_sock_getsockname( pEndpoint->m_socket, &( address ), &( nAddress ) ) ;
	PJ_LOG( 4, ( __FILE__, "pj_sockaddr_get_port() : %u", pj_sockaddr_get_port( &( address ) ) ) ) ;
	pj_sock_listen( pEndpoint->m_socket, 4096 ) ;

	pj_ioqueue_create( pEndpoint->m_pPool, 4096, &( pEndpoint->m_pQueue ) ) ;

	callback.on_accept_complete = &( endpoint_accept ) ;	callback.on_connect_complete = &( endpoint_connect ) ;
	callback.on_read_complete = &( endpoint_read ) ;	callback.on_write_complete = &( endpoint_write ) ;

	pj_ioqueue_register_sock( pEndpoint->m_pPool, pEndpoint->m_pQueue, pEndpoint->m_socket, pEndpoint, &( callback ), &( pEndpoint->m_pKey ) ) ;

	pj_ioqueue_set_user_data( pEndpoint->m_pKey, pEndpoint, 0 ) ;
	pj_atomic_create( pEndpoint->m_pPool, 1, &( pEndpoint->m_pAtomic ) ) ;

	pEndpoint->m_pThreads = ( pj_thread_t ** ) pj_pool_alloc( pEndpoint->m_pPool, pEndpoint->m_nThreads * sizeof( *( pEndpoint->m_pThreads ) ) ) ;

	for( nThread = 0; nThread < pEndpoint->m_nThreads; ++( nThread ) )	{
		pj_thread_create( pEndpoint->m_pPool, 0, ( pj_thread_proc * ) &( endpoint_thread ), pEndpoint, 0, 0, &( pEndpoint->m_pThreads[ nThread ] ) ) ;
	}
	while( pj_atomic_get( pEndpoint->m_pAtomic ) )	{
		pj_thread_sleep( 1000 ) ;
	} // while
} // endpoint_create()


void endpoint_delete( endpoint_t * pEndpoint ) {	int nThread ;

	pj_atomic_set( pEndpoint->m_pAtomic, 0 ) ;
	pj_ioqueue_unregister( pEndpoint->m_pKey ) ;

	pj_ioqueue_destroy( pEndpoint->m_pQueue ) ;
	pj_sock_close( pEndpoint->m_socket ) ;

	for( nThread = 0; nThread < pEndpoint->m_nThreads; ++( nThread ) )	{
		pj_ioqueue_post_completion( pEndpoint->m_pKey, 0, 0 ) ;	}

	for( nThread = 0; nThread < pEndpoint->m_nThreads; ++( nThread ) )	{		pj_thread_join( pEndpoint->m_pThreads[ nThread ] ) ;
		pj_thread_destroy( pEndpoint->m_pThreads[ nThread ] ) ;	}

	pj_atomic_destroy( pEndpoint->m_pAtomic ) ;
	pj_pool_release( pEndpoint->m_pPool ) ;

	pj_caching_pool_destroy( &( pEndpoint->m_caching_pool ) ) ;
	pj_shutdown() ;

} // endpoint_delete()

int main( int nArgs, char ** pArgs ){	endpoint_t Endpoint ;

	endpoint_create( &( Endpoint ) ) ;

	endpoint_delete( &( Endpoint ) ) ;
	return 0 ;
} // main() 		 	   		  
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.pjsip.org/pipermail/pjsip_lists.pjsip.org/attachments/20110903/06f63b7c/attachment.html>


[Index of Archives]     [Asterisk Users]     [Asterisk App Development]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [Linux API]
  Powered by Linux