On 12/11/2024 1:10 AM, Purna Pavan Chandra Aekkaladevi wrote:
On Tue, Dec 10, 2024 at 10:28:41AM -0600, Praveen K Paladugu wrote:
On 12/2/2024 3:46 AM, Purna Pavan Chandra Aekkaladevi wrote:
Implement `chReadProcessEvents` and `chProcessEvents` to read events from
event monitor FIFO file and parse them accordingly.
Signed-off-by: Purna Pavan Chandra Aekkaladevi <paekkaladevi@xxxxxxxxxxxxxxxxxxx>
Co-authored-by: Vineeth Pillai <viremana@xxxxxxxxxxxxxxxxxxx>
---
src/ch/ch_events.c | 137 +++++++++++++++++++++++++++++++++++++++++++-
src/ch/ch_events.h | 2 +
src/ch/ch_monitor.h | 6 ++
3 files changed, 144 insertions(+), 1 deletion(-)
diff --git a/src/ch/ch_events.c b/src/ch/ch_events.c
index 247f99cef0..a4c2fc4130 100644
--- a/src/ch/ch_events.c
+++ b/src/ch/ch_events.c
@@ -45,6 +45,137 @@ static int virCHEventStopProcess(virDomainObj *vm,
return 0;
}
+static int virCHProcessEvents(virCHMonitor *mon)
+{
+ virDomainObj *vm = mon->vm;
+ char *buf = mon->event_buffer.buffer;
+ ssize_t sz = mon->event_buffer.buf_fill_sz;
+ virJSONValue *obj = NULL;
+ int blocks = 0;
+ size_t i = 0;
+ char *json_start;
+ ssize_t start_index = -1;
+ ssize_t end_index = -1;
+ char tmp;
+ int ret = 0;
+
+ while (i < sz) {
+ if (buf[i] == '{') {
+ blocks++;
+ if (blocks == 1)
+ start_index = i;
+ } else if (buf[i] == '}' && blocks > 0) {
+ blocks--;
+ if (blocks == 0) {
+ /* valid json document */
+ end_index = i;
+
+ /* temporarily null terminate the JSON doc */
+ tmp = buf[end_index + 1];
+ buf[end_index + 1] = '\0';
+ json_start = buf + start_index;
+
+ if ((obj = virJSONValueFromString(json_start))) {
+ /* Process the event string (obj) here */
+ virJSONValueFree(obj);
+ } else {
+ VIR_WARN("%s: Invalid JSON event doc: %s",
+ vm->def->name, json_start);
+ ret = -1;
If you do encounter an invalid JSON, this implementation will continue
processing rest of the events. You should return from this method with
-1 here.
Returning -1 here does not halt the thread. So, even if we were to
return -1 here (after memmove-ing the rest of the json to the beginning
of buf, just as done in below 'else if' block), it essentially tries to
continue processing the rest of the events.
That is correct. I should have been clearer.
If aJ JSON event received here is invalid, it is very likely an issue
with the transport. In such a case, following events could also be
corrupt. I don't see much value in processing remaining events in
such a situation.
That is why I am suggesting that you return from this method and stop
processing further events.
+ }
+
+ /* replace the original character */
+ buf[end_index + 1] = tmp;
+ start_index = -1;
+ }
+ }
+
+ i++;
+ }
+
+ if (start_index == -1) {
+ /* We have processed all the JSON docs in the buffer */
+ mon->event_buffer.buf_fill_sz = 0;
+ } else if (start_index > 0) {
+ /* We have an incomplete JSON doc at the end of the buffer
+ * Move it to the start of the buffer
+ */
+ mon->event_buffer.buf_fill_sz = sz - start_index;
+ memmove(buf, buf+start_index, mon->event_buffer.buf_fill_sz);
+ }
+
+ return ret;
+}
+
+static void virCHReadProcessEvents(virCHMonitor *mon,
+ int event_monitor_fd)
+{
+ /* Event json string must always terminate with null char.
+ * So, reserve one byte for '\0' at the end.
+ */
+ size_t max_sz = CH_EVENT_BUFFER_SZ - 1;
+ char *buf = mon->event_buffer.buffer;
+ virDomainObj *vm = mon->vm;
+ bool incomplete = false;
+ size_t sz = 0;
+
+ memset(buf, 0, max_sz);
+ do {
+ ssize_t ret;
+
+ ret = read(event_monitor_fd, buf + sz, max_sz - sz);
+ if (ret == 0 || (ret < 0 && errno == EINTR)) {
+ g_usleep(G_USEC_PER_SEC);
+ continue;
+ } else if (ret < 0) {
+ /* We should never reach here. read(2) says possible errors
+ * are EINTR, EAGAIN, EBADF, EFAULT, EINVAL, EIO, EISDIR
+ * We handle EINTR gracefully. There is some serious issue
+ * if we encounter any of the other errors(either in our code
+ * or in the system). Better to bail out.
+ */
+ VIR_ERROR(_("%1$s: Failed to read ch events!: %2$s"),
+ vm->def->name, g_strerror(errno));
+ virCHEventStopProcess(vm, VIR_DOMAIN_SHUTOFF_FAILED);
+ virCHStopEventHandler(mon);
+ return;
+ }
+
+ sz += ret;
+ mon->event_buffer.buf_fill_sz = sz;
+
+ if (virCHProcessEvents(mon) < 0)
+ VIR_WARN("%s: Failed to parse and process events", vm->def->name);
+
+ if (mon->event_buffer.buf_fill_sz != 0)
+ incomplete = true;
+ else
+ incomplete = false;
+ sz = mon->event_buffer.buf_fill_sz;
+
+ } while (virDomainObjIsActive(vm) && (sz < max_sz) && incomplete);
+
+ return;
+}
+
--
Regards,
Praveen K Paladugu
Regards,
Purna Pavan Chandra
--
Regards,
Praveen K Paladugu