All incoming messages currently get routed to the generic method remoteDispatchClientRequest() for processing. To allow incoming data stream messages to bypass this and be routed to a specific location, a concept of dispatch filters is introduced. * qemud/qemud.h: Add a qemud_client_filter struct and a callback qemud_client_filter_func. Maintain a list of filters on every struct qemud_client * qemud/qemud.c: Move remoteDecodeClientMessageHeader() out of qemudWorker() into qemudDispatchClientRead(). Check registered message filters in qemudDispatchClientRead() to decide where to send incoming messages for dispatch. Signed-off-by: Daniel P. Berrange <berrange@xxxxxxxxxx> --- qemud/qemud.c | 28 ++++++++++++++++++++++++---- qemud/qemud.h | 16 ++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/qemud/qemud.c b/qemud/qemud.c index 42bc00e..e393db4 100644 --- a/qemud/qemud.c +++ b/qemud/qemud.c @@ -1457,8 +1457,7 @@ static void *qemudWorker(void *data) /* This function drops the lock during dispatch, * and re-acquires it before returning */ - if (remoteDecodeClientMessageHeader(msg) < 0 || - remoteDispatchClientRequest (server, client, msg) < 0) { + if (remoteDispatchClientRequest (server, client, msg) < 0) { VIR_FREE(msg); qemudDispatchClientFailure(client); client->refs--; @@ -1705,9 +1704,30 @@ readmore: waiting for us */ goto readmore; } else { + /* Grab the completed message */ + struct qemud_client_message *msg = qemudClientMessageQueueServe(&client->rx); + struct qemud_client_filter *filter; + + /* Decode the header so we can use it for routing decisions */ + if (remoteDecodeClientMessageHeader(msg) < 0) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); + } + + /* Check if any filters match this message */ + filter = client->filters; + while (filter) { + if ((filter->query)(msg, filter->opaque)) { + qemudClientMessageQueuePush(&filter->dx, msg); + msg = NULL; + break; + } + filter = filter->next; + } + /* Move completed message to the end of the dispatch queue */ - qemudClientMessageQueuePush(&client->dx, client->rx); - client->rx = NULL; + if (msg) + qemudClientMessageQueuePush(&client->dx, msg); client->nrequests++; /* Possibly need to create another receive buffer */ diff --git a/qemud/qemud.h b/qemud/qemud.h index 86b893d..abacbbb 100644 --- a/qemud/qemud.h +++ b/qemud/qemud.h @@ -90,6 +90,19 @@ struct qemud_client_message { struct qemud_client_message *next; }; +/* Allow for filtering of incoming messages to a custom + * dispatch processing queue, instead of client->dx. + */ +typedef int (*qemud_client_filter_func)(struct qemud_client_message *msg, void *opaque); +struct qemud_client_filter { + qemud_client_filter_func query; + void *opaque; + + struct qemud_client_message *dx; + + struct qemud_client_filter *next; +}; + /* Stores the per-client connection state */ struct qemud_client { virMutex lock; @@ -134,6 +147,9 @@ struct qemud_client { /* Zero or many messages waiting for transmit * back to client, including async events */ struct qemud_client_message *tx; + /* Filters to capture messages that would otherwise + * end up on the 'dx' queue */ + struct qemud_client_filter *filters; /* This is only valid if a remote open call has been made on this * connection, otherwise it will be NULL. Also if remote close is -- 1.6.2.5 -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :| -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list