Hi Petr, I've made changes to the patch that hopefully align with what you are looking for. I would appreciate it if you could go over it and see if the changes are in the right direction. And if so, you should decide whether I should make these kinds of changes for the whole series and submit a v2 before you continue with the review. The list of changes: - Added comments everywhere I think they could be useful. Is it too much? - Renamed struct prb_handle to prb_reserved_entry (more appropriate). - Fixed up macros as you requested. - The implementation from prb_commit() has been moved to a new prb_commit_all_reserved(). This should resolve the confusion in the "failed to push_tail()" code. - I tried moving calc_next() into prb_reserve(), but it was pure insanity. I played with refactoring for a while until I found something that I think looks nice. I moved the implementation of calc_next() along with its containing loop into a new function find_res_ptrs(). This function does what calc_next() and push_tail() did. With this solution, I think prb_reserve() looks pretty clean. However, the optimization of communicating about the wrap is gone. So even though find_res_ptrs() knew if a wrap occurred, prb_reserve() figures it out again for itself. If we want the optimization, I still think the best approach is the -1,0,1 return value of find_res_ptrs(). I'm looking forward to your response. John Ogness diff --git a/include/linux/printk_ringbuffer.h b/include/linux/printk_ringbuffer.h index 4239dc86e029..ab6177c9fe0a 100644 --- a/include/linux/printk_ringbuffer.h +++ b/include/linux/printk_ringbuffer.h @@ -25,6 +25,23 @@ struct printk_ringbuffer { atomic_t ctx; }; +/* + * struct prb_reserved_entry: Reserved but not yet committed entry. + * @rb: The printk_ringbuffer where the entry was reserved. + * + * This is a handle used by the writer to represent an entry that has been + * reserved but not yet committed. + * + * The structure does not actually store any information about the entry that + * has been reserved because this information is not required by the + * implementation. The struct could prove useful if extra tracking or even + * fundamental changes to the ringbuffer were to be implemented. And as such + * would not require changes to writers. + */ +struct prb_reserved_entry { + struct printk_ringbuffer *rb; +}; + #define DECLARE_STATIC_PRINTKRB_CPULOCK(name) \ static struct prb_cpulock name = { \ .owner = ATOMIC_INIT(-1), \ @@ -46,6 +63,11 @@ static struct printk_ringbuffer name = { \ .ctx = ATOMIC_INIT(0), \ } +/* writer interface */ +char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb, + unsigned int size); +void prb_commit(struct prb_reserved_entry *e); + /* utility functions */ void prb_lock(struct prb_cpulock *cpu_lock); void prb_unlock(struct prb_cpulock *cpu_lock); diff --git a/lib/printk_ringbuffer.c b/lib/printk_ringbuffer.c index 54c750092810..fbe1d92b9b60 100644 --- a/lib/printk_ringbuffer.c +++ b/lib/printk_ringbuffer.c @@ -2,6 +2,59 @@ #include <linux/smp.h> #include <linux/printk_ringbuffer.h> +/* + * struct prb_entry: An entry within the ringbuffer. + * @size: The size in bytes of the entry or -1 if terminating. + * @seq: The unique sequence number of the entry. + * @data: The data bytes of the entry. + * + * The struct is typecasted directly into the ringbuffer data array to access + * an entry. The @size specifies the complete size of the entry including any + * padding. The next entry will be located at &this_entry + this_entry.size. + * The only exception is if the entry is terminating (size = -1). In this case + * @seq and @data are invalid and the next entry is at the beginning of the + * ringbuffer data array. + */ +struct prb_entry { + unsigned int size; + u64 seq; + char data[0]; +}; + +/* the size and size bitmask of the ringbuffer data array */ +#define PRB_SIZE(rb) (1L << rb->size_bits) +#define PRB_SIZE_BITMASK(rb) (PRB_SIZE(rb) - 1) + +/* given a logical position, return its index in the ringbuffer data array */ +#define PRB_INDEX(rb, lpos) (lpos & PRB_SIZE_BITMASK(rb)) + +/* + * given a logical position, return how many times the data buffer has + * wrapped, where logical position 0 begins at index 0 with no wraps + */ +#define PRB_WRAPS(rb, lpos) (lpos >> rb->size_bits) + +/* + * given a logical position, return the logical position that represents the + * beginning of the ringbuffer data array for this wrap + */ +#define PRB_THIS_WRAP_START_LPOS(rb, lpos) \ + (PRB_WRAPS(rb, lpos) << rb->size_bits) + +/* + * given a logical position, return the logical position that represents the + * beginning of the ringbuffer data array for the next wrap + */ +#define PRB_NEXT_WRAP_START_LPOS(rb, lpos) \ + ((PRB_WRAPS(rb, lpos) + 1) << rb->size_bits) + +/* + * entries are aligned to allow direct typecasts as struct prb_entry + */ +#define PRB_ENTRY_ALIGN sizeof(long) +#define PRB_ENTRY_ALIGN_SIZE(sz) \ + ((sz + (PRB_ENTRY_ALIGN - 1)) & ~(PRB_ENTRY_ALIGN - 1)) + /* * prb_lock: Perform a processor-reentrant spin lock. * @cpu_lock: A pointer to the lock object. @@ -58,3 +111,257 @@ void prb_unlock(struct prb_cpulock *cpu_lock) } put_cpu(); } + +/* translate a logical position to an entry in the data array */ +static struct prb_entry *to_entry(struct printk_ringbuffer *rb, + unsigned long lpos) +{ + char *buffer = rb->buffer; + + buffer += PRB_INDEX(rb, lpos); + return (struct prb_entry *)buffer; +} + +/* try to move the tail pointer forward, thus removing the oldest entry */ +static bool push_tail(struct printk_ringbuffer *rb, unsigned long tail) +{ + unsigned long new_tail, head; + struct prb_entry *e; + + /* maybe another context already pushed the tail */ + if (tail != atomic_long_read(&rb->tail)) + return true; + + /* + * Determine what the new tail should be. If the tail is a + * terminating entry, the new tail will be beyond the entry + * at the beginning of the data array. + */ + e = to_entry(rb, tail); + if (e->size != -1) + new_tail = tail + e->size; + else + new_tail = PRB_NEXT_WRAP_START_LPOS(rb, tail); + + /* make sure the new tail does not overtake the head */ + head = atomic_long_read(&rb->head); + if (head - new_tail > PRB_SIZE(rb)) + return false; + + /* + * The result of this cmpxchg does not matter. If it succeeds, + * this context pushed the tail. If it fails, some other context + * pushed the tail. Either way, the tail was pushed. + */ + atomic_long_cmpxchg(&rb->tail, tail, new_tail); + return true; +} + +/* + * If this context incremented rb->ctx to 1, move the head pointer + * beyond all reserved entries. + */ +static void prb_commit_all_reserved(struct printk_ringbuffer *rb) +{ + unsigned long head, res; + struct prb_entry *e; + + for (;;) { + if (atomic_read(&rb->ctx) > 1) { + /* another context will adjust the head pointer */ + atomic_dec(&rb->ctx); + break; + } + + /* + * This is the only context that will adjust the head pointer. + * If NMIs interrupt at any time, they can reserve/commit new + * entries, but they will not adjust the head pointer. + */ + + /* assign sequence numbers before moving the head pointer */ + head = atomic_long_read(&rb->head); + res = atomic_long_read(&rb->reserve); + while (head != res) { + e = to_entry(rb, head); + if (e->size == -1) { + head = PRB_NEXT_WRAP_START_LPOS(rb, head); + continue; + } + e->seq = ++rb->seq; + head += e->size; + } + + /* + * move the head pointer, thus making all reserved entries + * visible to any readers + */ + atomic_long_set_release(&rb->head, res); + + atomic_dec(&rb->ctx); + if (atomic_long_read(&rb->reserve) == res) + break; + /* + * The reserve pointer is different than previously read. New + * entries were reserve/committed by NMI contexts, possibly + * before ctx was decremented by this context. Go back and move + * the head pointer beyond those entries as well. + */ + atomic_inc(&rb->ctx); + } + + /* Enable interrupts and allow other CPUs to reserve/commit. */ + prb_unlock(rb->cpulock); +} + +/* + * prb_commit: Commit a reserved entry to the ring buffer. + * @e: A structure referencing a the reserved entry to commit. + * + * Commit data that has been reserved using prb_reserve(). Once the entry + * has been committed, it can be invalidated at any time. If a writer is + * interested in using the data after committing, the writer should make + * its own copy first or use the prb_iter_ reader functions to access the + * data in the ring buffer. + * + * It is safe to call this function from any context and state. + */ +void prb_commit(struct prb_reserved_entry *e) +{ + prb_commit_all_reserved(e->rb); +} + +/* given the size to reserve, determine current and next reserve pointers */ +static bool find_res_ptrs(struct printk_ringbuffer *rb, unsigned long *res_old, + unsigned long *res_new, unsigned int size) +{ + unsigned long tail, entry_begin; + + /* + * The reserve pointer is not allowed to overtake the index of the + * tail pointer. If this would happen, the tail pointer must be + * pushed, thus removing the oldest entry. + */ + for (;;) { + tail = atomic_long_read(&rb->tail); + *res_old = atomic_long_read(&rb->reserve); + + /* + * If the new reserve pointer wraps, the new entry will + * begin at the beginning of the data array. This loop + * exists only to handle the wrap. + */ + for (entry_begin = *res_old;;) { + + *res_new = entry_begin + size; + + if (*res_new - tail > PRB_SIZE(rb)) { + /* would overtake tail, push tail */ + + if (!push_tail(rb, tail)) { + /* couldn't push tail, can't reserve */ + return false; + } + + /* tail pushed, try again */ + break; + } + + if (PRB_WRAPS(rb, entry_begin) == + PRB_WRAPS(rb, *res_new)) { + /* reserve pointer values determined */ + return true; + } + + /* + * The new entry will wrap. Calculate the new reserve + * pointer based on the beginning of the data array + * for the wrap of the new reserve pointer. + */ + entry_begin = PRB_THIS_WRAP_START_LPOS(rb, *res_new); + } + } +} + +/* + * prb_reserve: Reserve an entry within a ring buffer. + * @e: A structure to be setup and reference a reserved entry. + * @rb: A ring buffer to reserve data within. + * @size: The number of bytes to reserve. + * + * Reserve an entry of at least @size bytes to be used by the caller. If + * successful, the data region of the entry belongs to the caller and cannot + * be invalidated by any other task/context. For this reason, the caller + * should call prb_commit() as quickly as possible in order to avoid preventing + * other tasks/contexts from reserving data in the case that the ring buffer + * has wrapped. + * + * It is safe to call this function from any context and state. + * + * Returns a pointer to the reserved data (and @e is setup to reference the + * entry containing that data) or NULL if it was not possible to reserve data. + */ +char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb, + unsigned int size) +{ + unsigned long res_old, res_new; + struct prb_entry *entry; + + if (size == 0) + return NULL; + + /* add entry header to size and align for the following entry */ + size = PRB_ENTRY_ALIGN_SIZE(sizeof(struct prb_entry) + size); + + if (size >= PRB_SIZE(rb)) + return NULL; + + /* + * Lock out all other CPUs and disable interrupts. Only NMIs will + * be able to interrupt. It will stay this way until the matching + * commit is called. + */ + prb_lock(rb->cpulock); + + /* + * Clarify the responsibility of this context. If this context + * increments ctx to 1, this context is responsible for pushing + * the head pointer beyond all reserved entries on commit. + */ + atomic_inc(&rb->ctx); + + /* + * Move the reserve pointer forward. Since NMIs can interrupt at any + * time, modifying the reserve pointer is done in a cmpxchg loop. + */ + do { + if (!find_res_ptrs(rb, &res_old, &res_new, size)) { + /* + * Not possible to move the reserve pointer. Try to + * commit all reserved entries because this context + * might have that responsibility (if it incremented + * ctx to 1). + */ + prb_commit_all_reserved(rb); + return NULL; + } + } while (!atomic_long_try_cmpxchg_acquire(&rb->reserve, + &res_old, res_new)); + + entry = to_entry(rb, res_old); + if (PRB_WRAPS(rb, res_old) != PRB_WRAPS(rb, res_new)) { + /* + * The reserve wraps. Create the terminating entry and get the + * pointer to the actually reserved entry at the beginning of + * the data array on the wrap of the new reserve pointer. + */ + entry->size = -1; + entry = to_entry(rb, PRB_THIS_WRAP_START_LPOS(rb, res_new)); + } + + /* The size is set now. The seq is set later, on commit. */ + entry->size = size; + + e->rb = rb; + return &entry->data[0]; +}