I would like to make a tcp server with a pool of worker threads using the ioqueue framework, but I am not exactly sure how to make it work. The code below (with error checking, etc. omitted) does the following : - pj_caching_pool_init() - pj_pool_create() - pj_sock_socket() - pj_sockaddr_in_init() - pj_sock_bind() - pj_sock_listen() - pj_ioqueue_create() - pj_ioqueue_register_sock() The pj_ioqueue_callback argument to pj_ioqueue_register_sock() points to handlers for each event type. The main thread then creates a pool of threads and waits forever (for now) while each of the spawned threads does the following : - pj_ioqueue_op_key_init() - pj_ioqueue_accept() Each thread then calls pj_ioqueue_poll() in a loop to process events. The on_accept_complete handler calls pj_pool_calloc() to allocate buffer then calls pj_ioqueue_recv()/pj_ioqueue_recv from() -- I have tried both -- assuming that the on_read_complete handler will receive an event when pj_ioqueue_poll() runs. Unfortunately, that much does not quite work. On Windows, the status of pj_ioqueue_recv()/pj_ioqueue_recvfrom() is OS error 10057 ("Socket is not connected"). On Linux, the status of pj_ioqueue_recv()/pj_ioqueue_recvfrom() is PJ_EPENDING, but the on_read_complete handler never gets called. I am sure I am doing something wrong, but I do not know what. Any advice would be appreciated. -- gifford // endpoint.c // #include <pjlib.h> typedef struct { pj_caching_pool m_caching_pool ; pj_pool_t * m_pPool ; pj_atomic_t * m_pAtomic ; pj_ioqueue_t * m_pQueue ; pj_ioqueue_key_t * m_pKey ; pj_thread_t ** m_pThreads ; pj_sock_t m_socket ; int m_nThreads ; } endpoint_t ; void endpoint_accept( pj_ioqueue_key_t * pKey, pj_ioqueue_op_key_t * pOp, pj_sock_t socket, pj_status_t status ) { endpoint_t * pEndpoint = ( endpoint_t * ) pj_ioqueue_get_user_data( pKey ) ; pj_ssize_t nData = 4096 ; PJ_LOG( 4, ( __FILE__, "endpoint_accept()" ) ) ; pOp->user_data = pj_pool_calloc( pEndpoint->m_pPool, 1, nData ) ; status = pj_ioqueue_recvfrom( pKey, pOp, pOp->user_data, &( nData ), PJ_IOQUEUE_ALWAYS_ASYNC, 0, 0 ) ; switch( status ) { case PJ_SUCCESS : PJ_LOG( 4, ( __FILE__, "pj_ioqueue_recv() : PJ_SUCCESS" ) ) ; break ; case PJ_EPENDING : PJ_LOG( 4, ( __FILE__, "pj_ioqueue_recv() : PJ_EPENDING" ) ) ; break ; default : PJ_LOG( 4, ( __FILE__, "pj_ioqueue_recv() : %d", PJ_STATUS_TO_OS( status ) ) ) ; break ; } // switch } // endpoint_accept() void endpoint_connect( pj_ioqueue_key_t * pKey, pj_status_t status ) { PJ_LOG( 4, ( __FILE__, "endpoint_connect()" ) ) ; } // endpoint_connect() void endpoint_read( pj_ioqueue_key_t * pKey, pj_ioqueue_op_key_t * pOp, pj_ssize_t nBytes ) { PJ_LOG( 4, ( __FILE__, "endpoint_read()" ) ) ; } // endpoint_read() void endpoint_write( pj_ioqueue_key_t * pKey, pj_ioqueue_op_key_t * pOp, pj_ssize_t nBytes ) { PJ_LOG( 4, ( __FILE__, "endpoint_write()" ) ) ; } // endpoint_write() int endpoint_thread( endpoint_t * pEndpoint ) { pj_sock_t sock ; pj_sockaddr_in addr ; pj_status_t status ; pj_time_val timeout ; timeout.sec = 1 ; timeout.msec = 0 ; while( pj_atomic_get( pEndpoint->m_pAtomic ) ) { pj_ioqueue_op_key_t * pOp = ( pj_ioqueue_op_key_t * ) pj_pool_calloc( pEndpoint->m_pPool, 1, sizeof( *( pOp ) ) ) ; pj_ioqueue_op_key_init( pOp, sizeof( *( pOp ) ) ) ; status = pj_ioqueue_accept( pEndpoint->m_pKey, pOp, &( sock ), 0, &( addr ), 0 ) ; switch( status ) { case PJ_SUCCESS : PJ_LOG( 4, ( __FILE__, "pj_ioqueue_accept() : PJ_SUCCESS" ) ) ; break ; case PJ_EPENDING : PJ_LOG( 4, ( __FILE__, "pj_ioqueue_accept() : PJ_EPENDING" ) ) ; break ; default : PJ_LOG( 4, ( __FILE__, "pj_ioqueue_accept() : %d", PJ_STATUS_TO_OS( status ) ) ) ; break ; } // switch while( pj_atomic_get( pEndpoint->m_pAtomic ) ) { pj_ioqueue_poll( pEndpoint->m_pQueue, &( timeout ) ) ; } // while } // while return 0 ; } // endpoint_thread() void endpoint_create( endpoint_t * pEndpoint ) { pj_ioqueue_callback callback ; pj_sockaddr_in address ; int nThread ; int nAddress = sizeof( address ) ; pEndpoint->m_nThreads = 4 ; pj_init() ; pj_caching_pool_init( &( pEndpoint->m_caching_pool ), 0, 4096 * 4096 ) ; pEndpoint->m_pPool = pj_pool_create( ( pj_pool_factory * ) &( pEndpoint->m_caching_pool ), 0, 4096, 4096, 0 ) ; pj_sock_socket( pj_AF_INET(), pj_SOCK_STREAM(), 0, &( pEndpoint->m_socket ) ) ; pj_sockaddr_in_init( &( address ), 0, 0 ) ; pj_sock_bind( pEndpoint->m_socket, &( address ), nAddress ) ; pj_sock_getsockname( pEndpoint->m_socket, &( address ), &( nAddress ) ) ; PJ_LOG( 4, ( __FILE__, "pj_sockaddr_get_port() : %u", pj_sockaddr_get_port( &( address ) ) ) ) ; pj_sock_listen( pEndpoint->m_socket, 4096 ) ; pj_ioqueue_create( pEndpoint->m_pPool, 4096, &( pEndpoint->m_pQueue ) ) ; callback.on_accept_complete = &( endpoint_accept ) ; callback.on_connect_complete = &( endpoint_connect ) ; callback.on_read_complete = &( endpoint_read ) ; callback.on_write_complete = &( endpoint_write ) ; pj_ioqueue_register_sock( pEndpoint->m_pPool, pEndpoint->m_pQueue, pEndpoint->m_socket, pEndpoint, &( callback ), &( pEndpoint->m_pKey ) ) ; pj_ioqueue_set_user_data( pEndpoint->m_pKey, pEndpoint, 0 ) ; pj_atomic_create( pEndpoint->m_pPool, 1, &( pEndpoint->m_pAtomic ) ) ; pEndpoint->m_pThreads = ( pj_thread_t ** ) pj_pool_alloc( pEndpoint->m_pPool, pEndpoint->m_nThreads * sizeof( *( pEndpoint->m_pThreads ) ) ) ; for( nThread = 0; nThread < pEndpoint->m_nThreads; ++( nThread ) ) { pj_thread_create( pEndpoint->m_pPool, 0, ( pj_thread_proc * ) &( endpoint_thread ), pEndpoint, 0, 0, &( pEndpoint->m_pThreads[ nThread ] ) ) ; } while( pj_atomic_get( pEndpoint->m_pAtomic ) ) { pj_thread_sleep( 1000 ) ; } // while } // endpoint_create() void endpoint_delete( endpoint_t * pEndpoint ) { int nThread ; pj_atomic_set( pEndpoint->m_pAtomic, 0 ) ; pj_ioqueue_unregister( pEndpoint->m_pKey ) ; pj_ioqueue_destroy( pEndpoint->m_pQueue ) ; pj_sock_close( pEndpoint->m_socket ) ; for( nThread = 0; nThread < pEndpoint->m_nThreads; ++( nThread ) ) { pj_ioqueue_post_completion( pEndpoint->m_pKey, 0, 0 ) ; } for( nThread = 0; nThread < pEndpoint->m_nThreads; ++( nThread ) ) { pj_thread_join( pEndpoint->m_pThreads[ nThread ] ) ; pj_thread_destroy( pEndpoint->m_pThreads[ nThread ] ) ; } pj_atomic_destroy( pEndpoint->m_pAtomic ) ; pj_pool_release( pEndpoint->m_pPool ) ; pj_caching_pool_destroy( &( pEndpoint->m_caching_pool ) ) ; pj_shutdown() ; } // endpoint_delete() int main( int nArgs, char ** pArgs ) { endpoint_t Endpoint ; endpoint_create( &( Endpoint ) ) ; endpoint_delete( &( Endpoint ) ) ; return 0 ; } // main() -------------- next part -------------- An HTML attachment was scrubbed... URL: <http://lists.pjsip.org/pipermail/pjsip_lists.pjsip.org/attachments/20110901/4d5b516b/attachment.html>