Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx> --- tests/iohelpermessagetest.c | 295 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 292 insertions(+), 3 deletions(-) diff --git a/tests/iohelpermessagetest.c b/tests/iohelpermessagetest.c index 293c107..8c83261 100644 --- a/tests/iohelpermessagetest.c +++ b/tests/iohelpermessagetest.c @@ -27,6 +27,7 @@ #include "internal.h" #include "iohelper_message.h" #include "virlog.h" +#include "virtime.h" VIR_LOG_INIT("tests.iohelpermessagetest"); @@ -35,6 +36,8 @@ VIR_LOG_INIT("tests.iohelpermessagetest"); typedef struct { const char * const *msg; unsigned int *len; + bool blockR; + bool blockW; } testData; typedef testData *testDataPtr; @@ -77,7 +80,7 @@ testBlocking(const void *opaque) char *genMsg = NULL; char buf[1024]; - if (testInit(ioCtl, pipeFD, true, true) < 0) + if (testInit(ioCtl, pipeFD, data->blockR, data->blockW) < 0) goto cleanup; while (!quit) { @@ -143,15 +146,253 @@ testBlocking(const void *opaque) return ret; } +typedef struct { + virMutexPtr lock; + virCondPtr cond; + bool finished; + int ret; + iohelperCtlPtr ioCtl; + char *msg; + size_t msgLen; +} threadData; + +typedef threadData *threadDataPtr; + +static void +readerThread(void *opaque) +{ + threadDataPtr data = opaque; + char *bigBuf = NULL; + size_t bigBufSize = 0; + + virObjectRef(data->ioCtl); + /* Sleep some random time to simulate out of sync read & + * write */ + usleep((rand() % 100) * 1000); + + while (true) { + char buf[10]; /* Simulate reads of small chunks of data */ + ssize_t nread; + + reread: + nread = iohelperRead(data->ioCtl, buf, sizeof(buf)); + if (nread < 0) { + if (errno == EAGAIN) { + usleep(20 * 1000); + goto reread; + } + + virFilePrintf(stderr, "Unable to read message (errno=%d)\n", errno); + goto cleanup; + } + + if (!nread) + break; + + if (VIR_REALLOC_N(bigBuf, bigBufSize + nread) < 0) + goto cleanup; + + memcpy(bigBuf + bigBufSize, buf, nread); + bigBufSize += nread; + } + + if (bigBufSize != data->msgLen) { + virFilePrintf(stderr, "Message length mismatch: expected %zu got %zu", + data->msgLen, bigBufSize); + goto cleanup; + } + + if (memcmp(bigBuf, data->msg, data->msgLen)) { + virFilePrintf(stderr, "Mismatched data"); + goto cleanup; + } + + data->ret = 0; + + cleanup: + VIR_FREE(bigBuf); + virObjectUnref(data->ioCtl); + virMutexLock(data->lock); + data->finished = true; + virCondSignal(data->cond); + virMutexUnlock(data->lock); +} + +static void +writerThread(void *opaque ATTRIBUTE_UNUSED) +{ + threadDataPtr data = opaque; + size_t writeOff = 0; + + virObjectRef(data->ioCtl); + /* Sleep some random time to simulate out of sync read & + * write */ + usleep((rand() % 100) * 1000); + + while (true) { + ssize_t nwritten; + size_t want = data->msgLen - writeOff; + + if (!want) + break; + + rewrite: + nwritten = iohelperWrite(data->ioCtl, + data->msg + writeOff, + want); + + if (nwritten < 0) { + if (errno == EAGAIN) { + usleep(20 * 1000); + goto rewrite; + } + + virFilePrintf(stderr, "Unable to write message (errno=%d)\n", errno); + goto cleanup; + } + + if (!nwritten) + break; + + writeOff += nwritten; + } + + if (writeOff != data->msgLen) { + virFilePrintf(stderr, "Message length mismatch: expected %zu written %zu", + data->msgLen, writeOff); + goto cleanup; + } + + data->ret = 0; + + cleanup: + virObjectUnref(data->ioCtl); + virMutexLock(data->lock); + data->finished = true; + virCondSignal(data->cond); + virMutexUnlock(data->lock); +} + +/* How long wait (in ms) for both reader & writer + * threads to finish? */ +#define WAIT_TIME 10000 + +static int +testNonblocking(const void *opaque) +{ + int ret = -1; + const testData *data = opaque; + iohelperCtlPtr ioCtl[2] = {NULL, NULL}; + int pipeFD[2] = {-1, -1}; + virThread reader, writer; + threadData readerD, writerD; + virMutex lock; + virCond cond; + unsigned long long now; + unsigned long long then; + char *msg = NULL; + size_t msgLen = 0, idx; + + for (idx = 0; data->msg && data->msg[idx]; idx++) { + const char *tmp = data->msg[idx]; + size_t tmpLen = strlen(tmp); + + if (VIR_REALLOC_N(msg, msgLen + tmpLen + 1) < 0) + goto cleanup; + + memcpy(msg + msgLen, tmp, tmpLen + 1); + msgLen += tmpLen; + } + + for (idx = 0; data->len && data->len[idx]; idx++) { + size_t tmpLen = data->len[idx]; + + if (VIR_REALLOC_N(msg, msgLen + tmpLen) < 0) + goto cleanup; + msgLen += tmpLen; + + /* Here @msg contains some garbage that was on the heap + * when the memory was allocated. That's okay, we want to + * be sure iohelper can deal with binary garbage. */ + } + + if (virMutexInit(&lock) < 0 || + virCondInit(&cond) < 0) + goto cleanup; + + if (testInit(ioCtl, pipeFD, data->blockR, data->blockW) < 0) + goto cleanup; + + readerD = writerD = (threadData) {.lock = &lock, .cond = &cond, + .ret = -1, .finished = false, .msg = msg, msgLen = msgLen}; + readerD.ioCtl = ioCtl[0]; + writerD.ioCtl = ioCtl[1]; + + /* Now, ideally we would set the kernel's pipe buffer to be + * small. Really small. Couple of bytes perhaps so that we + * can be sure writes wrap around it just nicely. But the + * smallest possible size is PAGESIZE. Trying to set anything + * smaller than that is silently rounded up to PAGESIZE. + * Okay, in that case we should write multiple of that. */ + fcntl(pipeFD[0], F_SETPIPE_SZ, 0); + + virMutexLock(&lock); + + if (virThreadCreate(&reader, false, readerThread, &readerD) < 0 || + virThreadCreate(&writer, false, writerThread, &writerD) < 0) + goto cleanup; + + if (virTimeMillisNow(&now) < 0) + goto cleanup; + + then = now + WAIT_TIME; + + while (!readerD.finished || + !writerD.finished) { + if (virCondWaitUntil(&cond, &lock, then) < 0) { + if (errno == ETIMEDOUT) { + if (!readerD.finished) + virThreadCancel(&reader); + if (!writerD.finished) + virThreadCancel(&writer); + } + + goto cleanup; + } + if (readerD.finished) + VIR_FORCE_CLOSE(pipeFD[0]); + if (writerD.finished) + VIR_FORCE_CLOSE(pipeFD[1]); + } + + if (readerD.ret < 0 || + writerD.ret < 0) + goto cleanup; + + ret = 0; + cleanup: + virMutexUnlock(&lock); + virMutexDestroy(&lock); + virCondDestroy(&cond); + virObjectUnref(ioCtl[0]); + virObjectUnref(ioCtl[1]); + VIR_FORCE_CLOSE(pipeFD[0]); + VIR_FORCE_CLOSE(pipeFD[1]); + return ret; +} + static int mymain(void) { int ret = 0; + srand(time(NULL)); + #define DO_TEST_BLOCKING_SIMPLE(...) \ do { \ const char *msg[] = { __VA_ARGS__, NULL}; \ - testData data = {.msg = msg, .len = NULL }; \ + testData data = {.blockR = true, .blockW = true, \ + .msg = msg, .len = NULL }; \ if (virTestRun("Blocking simple", testBlocking, &data) < 0) \ ret = -1; \ } while (0) @@ -159,11 +400,47 @@ mymain(void) #define DO_TEST_BLOCKING_LEN(...) \ do { \ unsigned int len[] = { __VA_ARGS__, 0}; \ - testData data = {.msg = NULL, .len = len }; \ + testData data = {.blockR = true, .blockW = true, \ + .msg = NULL, .len = len }; \ if (virTestRun("Blocking len", testBlocking, &data) < 0) \ ret = -1; \ } while (0) +#define DO_TEST_BLOCKR_SIMPLE(...) \ + do { \ + const char *msg[] = { __VA_ARGS__, NULL}; \ + testData data = {.blockR = true, .blockW = false, \ + .msg = msg, .len = NULL }; \ + if (virTestRun("Blocking read simple", testNonblocking, &data) < 0) \ + ret = -1; \ + } while (0) + +#define DO_TEST_BLOCKR_LEN(...) \ + do { \ + unsigned int len[] = { __VA_ARGS__, 0}; \ + testData data = {.blockR = true, .blockW = false, \ + .msg = NULL, .len = len }; \ + if (virTestRun("Blocking read len", testNonblocking, &data) < 0) \ + ret = -1; \ + } while (0) + +#define DO_TEST_BLOCKW_SIMPLE(...) \ + do { \ + const char *msg[] = { __VA_ARGS__, NULL}; \ + testData data = {.blockR = false, .blockW = true, \ + .msg = msg, .len = NULL }; \ + if (virTestRun("Blocking write simple", testNonblocking, &data) < 0) \ + ret = -1; \ + } while (0) +#define DO_TEST_BLOCKW_LEN(...) \ + do { \ + unsigned int len[] = { __VA_ARGS__, 0}; \ + testData data = {.blockR = false, .blockW = true, \ + .msg = NULL, .len = len }; \ + if (virTestRun("Blocking write len", testNonblocking, &data) < 0) \ + ret = -1; \ + } while (0) + DO_TEST_BLOCKING_SIMPLE("Hello world"); DO_TEST_BLOCKING_SIMPLE("Hello world", "Hello", "world"); @@ -171,6 +448,18 @@ mymain(void) DO_TEST_BLOCKING_LEN(1024); DO_TEST_BLOCKING_LEN(32, 64, 128, 512, 1024); + DO_TEST_BLOCKR_SIMPLE("Hello world"); + DO_TEST_BLOCKR_SIMPLE("Hello world", "Hello", "world"); + + DO_TEST_BLOCKR_LEN(1024); + DO_TEST_BLOCKR_LEN(409600); + + DO_TEST_BLOCKW_SIMPLE("Hello world"); + DO_TEST_BLOCKW_SIMPLE("Hello world", "Hello", "world"); + + DO_TEST_BLOCKW_LEN(1024); + DO_TEST_BLOCKW_LEN(409600); + return ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE; } -- 2.8.4 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list