On Thu 2020-06-18 16:55:18, John Ogness wrote: > Introduce a multi-reader multi-writer lockless ringbuffer for storing > the kernel log messages. Readers and writers may use their API from > any context (including scheduler and NMI). This ringbuffer will make > it possible to decouple printk() callers from any context, locking, > or console constraints. It also makes it possible for readers to have > full access to the ringbuffer contents at any time and context (for > example from any panic situation). Please, find few comments below. They are primary about improving the comments. There are also 3 ideas how to clean up the code a bit[*]. Everything might be solved by followup patches. Well, I would prefer to update the comments in the next respin if you do it because of the third patch.[**] The small code changes can be done later. I do not want to introduce eventual problems at this stage. Feel free to use: Reviewed-by: Petr Mladek <pmladek@xxxxxxxx> [*] I have pretty good feeling about the code. And I really like the extensive comments. The remaining ideas are mostly from the nits department. [**] We are getting close to push this into linux-next. I think that it makes sense only when also the 3rd patch is ready. Also we should have a solution for the crashdump/crash tools at hands. But I think that this is already the case. > --- /dev/null > +++ b/kernel/printk/printk_ringbuffer.c > + * Data Rings > + * ~~~~~~~~~~ > + * The two data rings (text and dictionary) function identically. They exist > + * separately so that their buffer sizes can be individually set and they do > + * not affect one another. > + * > + * Data rings are byte arrays composed of data blocks. Data blocks are > + * referenced by blk_lpos structs that point to the logical position of the > + * beginning of a data block and the beginning of the next adjacent data > + * block. Logical positions are mapped directly to index values of the byte > + * array ringbuffer. > + * > + * Each data block consists of an ID followed by the raw data. The ID is the > + * identifier of a descriptor that is associated with the data block. A data > + * block is considered valid if all of the following conditions are met: > + * > + * 1) The descriptor associated with the data block is in the committed > + * or reusable queried state. This sounds strange. The information in the descriptor is valid when the descriptor is in the committed or reusable state. But the data in the data block are valid only when the related descriptor is in the committed state. The data block might get reused faster than the descriptor that owned it earlier. > + * 2) The blk_lpos struct within the descriptor associated with the data > + * block references back to the same data block. This is true. But this cross check is needed only when the assocoated descriptor is found via the ID stored in the data block. It is not needed when the data block is found via id/seq number stored in the descriptor that is in commited state. > + * 3) The data block is within the head/tail logical position range. Also this is true. But it works by design. It does not need to be checked when validating the data. I would like to make this clear. So, I would write something like: The raw data stored in a data block can be considered valid when the the associated descriptor is in the committed state. The above condition is enough when the data block is found via information from the descriptor that was chosen by its ID. One more check is needed when the data block is found via blk_lpos.next stored in the descriptor for the previous data block. The data block might point to a descriptor in a committed state just by chance. It is the right one only when blk_lpos.begin points back to the data block. I do not know. It might be to complicated. I actually did not mind about the original text in previous versions. But I realize now that it might have been the reason why the checks in data_make_reusable() were too complicated in the previous versions. > + * Sample reader code:: > + * > + * struct printk_info info; > + * struct printk_record r; > + * char text_buf[32]; > + * char dict_buf[32]; > + * u64 seq; > + * > + * prb_rec_init_rd(&r, &info, &text_buf[0], sizeof(text_buf), > + * &dict_buf[0], sizeof(dict_buf)); > + * > + * prb_for_each_record(0, &test_rb, &seq, &r) { > + * if (info.seq != seq) > + * pr_warn("lost %llu records\n", info.seq - seq); > + * > + * if (info.text_len > r.text_buf_size) { > + * pr_warn("record %llu text truncated\n", info.seq); > + * text_buf[sizeof(text_buf) - 1] = 0; > + * } AFAIK, the trailing '\0' is missing even the the text is not truncated. We need to always add it here. Otherwise, we could not print the text using %s below. Also it means that it is not safe to copy the text using memcpy(new_buf, &r.text_buf[0], r.text_len) because r.text_len might be greater than r.text_bug_size. I would prefer a more safe API where r.text_len is set to the length that is really stored r.text_buf. The information about truncation might be passed via another struct member, e.g. r.text_truncated or so. Well, this might be done later. The printk() code does not have this problem because the length of the stored messages is limited on the store side. The reader side uses buffers that are always big enough. But I consider this error prone and non-intuitive. It might hit us in the long term. > + * > + * if (info.dict_len > r.dict_buf_size) { > + * pr_warn("record %llu dict truncated\n", info.seq); > + * dict_buf[sizeof(dict_buf) - 1] = 0; > + * } > + * > + * pr_info("%llu: %llu: %s;%s\n", info.seq, info.ts_nsec, > + * &text_buf[0], info.dict_len ? &dict_buf[0] : ""); > + * } > + * > + * Note that additional less convenient reader functions are available to > + * allow complex record access. > + * > + * ABA Issues > + * ~~~~~~~~~~ > + * To help avoid ABA issues, descriptors are referenced by IDs (array index > + * values with tagged states) "array index values" makes the feeling that it is a direct position in the array. We should make it clear that it is growing with each overlap. Also my understanding is that "tagged states" are the two bits. See below for an alternative text. > + * and data blocks are referenced by logical > + * positions (array index values with tagged states). Finally, lpos does does not include the tagged state. Or the meaning of "tagged state" is rather confusing. I would replace this with something like: To help avoid ABA issues, both descriptors and data blocks are referenced by logical positions that are growing with each wrap. Where the position of the descriptor is stored together with the state bits so that they can be manipulated atomically. > + * However, on 32-bit > + * systems the number of tagged states is relatively small such that an ABA "tagged states" ? Should it be "number of logical positions" > + * incident is (at least theoretically) possible. For example, if 4 million > + * maximally sized (1KiB) printk messages were to occur in NMI context on a > + * 32-bit system, the interrupted context would not be able to recognize that > + * the 32-bit integer completely wrapped and thus represents a different > + * data block than the one the interrupted context expects. > + * > + * To help combat this possibility, additional state checking is performed > + * (such as using cmpxchg() even though set() would suffice). These extra > + * checks are commented as such and will hopefully catch any ABA issue that > + * a 32-bit system might experience. > + * > + * Memory Barriers > + * ~~~~~~~~~~~~~~~ > + * Multiple memory barriers are used. To simplify proving correctness and > + * generating litmus tests, lines of code related to memory barriers > + * (loads, stores, and the associated memory barriers) are labeled:: > + * > + * LMM(function:letter) > + * > + * Comments reference the labels using only the "function:letter" part. > + * > + * The memory barrier pairs and their ordering are: > + * > + * desc_reserve:D / desc_reserve:B > + * push descriptor tail (id), then push descriptor head (id) > + * > + * desc_reserve:D / data_push_tail:B > + * push data tail (lpos), then set new descriptor reserved (state) > + * > + * desc_reserve:D / desc_push_tail:C > + * push descriptor tail (id), then set new descriptor reserved (state) > + * > + * desc_reserve:D / prb_first_seq:C > + * push descriptor tail (id), then set new descriptor reserved (state) > + * > + * desc_reserve:F / desc_read:D > + * set new descriptor id and reserved (state), then allow writer changes > + * > + * data_alloc:A / desc_read:D > + * set old descriptor reusable (state), then modify new data block area > + * > + * data_alloc:A / data_push_tail:B > + * push data tail (lpos), then modify new data block area > + * > + * prb_commit:B / desc_read:B > + * store writer changes, then set new descriptor committed (state) > + * > + * data_push_tail:D / data_push_tail:A > + * set descriptor reusable (state), then push data tail (lpos) > + * > + * desc_push_tail:B / desc_reserve:D > + * set descriptor reusable (state), then push descriptor tail (id) I am not sure that this list is really needs. It might get outdated easily. It does not help me to make sure that all these barriers are enough to keep the consistency. That said, feel free to keep it if it helps anyone. > + */ > + > +#define DATA_SIZE(data_ring) _DATA_SIZE((data_ring)->size_bits) > +#define DATA_SIZE_MASK(data_ring) (DATA_SIZE(data_ring) - 1) > + > +#define DESCS_COUNT(desc_ring) _DESCS_COUNT((desc_ring)->count_bits) > +#define DESCS_COUNT_MASK(desc_ring) (DESCS_COUNT(desc_ring) - 1) > + > +/* Determine the data array index from a logical position. */ > +#define DATA_INDEX(data_ring, lpos) ((lpos) & DATA_SIZE_MASK(data_ring)) > + > +/* Determine the desc array index from an ID or sequence number. */ > +#define DESC_INDEX(desc_ring, n) ((n) & DESCS_COUNT_MASK(desc_ring)) > + > +/* Determine how many times the data array has wrapped. */ > +#define DATA_WRAPS(data_ring, lpos) ((lpos) >> (data_ring)->size_bits) > + > +/* Get the logical position at index 0 of the current wrap. */ > +#define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \ > +((lpos) & ~DATA_SIZE_MASK(data_ring)) > + > +/* Get the ID for the same index of the previous wrap as the given ID. */ > +#define DESC_ID_PREV_WRAP(desc_ring, id) \ > +DESC_ID((id) - DESCS_COUNT(desc_ring)) > + > +/* A data block: maps to the raw data within the data ring. */ "raw data" is ambiguous here. Does it mean "data" array or the entire structure. See below for an alternative text. > +struct prb_data_block { > + unsigned long id; > + char data[0]; > +}; I think that the relation between desc.state_val, id, and, and state bits is not descried anywhere. It might be confusing for someone trying to understand the code for the first time. Also we should make it clear that id is the logical position without the state bits. For example, the meaning of DESC_COUNT, DATA_SIZE It might really help to describe of all struct members here, e.g. /** * struct prb_data_block: maps to the data block stored in the data ring * * @id: logical position in desc array (without state bits) * @data[0]: raw data stored by the writer */ struct prb_data_block { unsigned long id; char data[0]; }; > + > +static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n) This might deserve a description of possible n values: id, seq, state_val. > +{ > + return &desc_ring->descs[DESC_INDEX(desc_ring, n)]; > +} > + > +/* > + * Allocate a new data block, invalidating the oldest data block(s) > + * if necessary. This function also associates the data block with > + * a specified descriptor. > + */ > +static char *data_alloc(struct printk_ringbuffer *rb, > + struct prb_data_ring *data_ring, unsigned int size, > + struct prb_data_blk_lpos *blk_lpos, unsigned long id) > +{ > + struct prb_data_block *blk; > + unsigned long begin_lpos; > + unsigned long next_lpos; > + > + if (!data_ring->data || size == 0) { Is there any specific reason to check data_ring->data is not NULL, please? It is not a bit deal. I am just curious because it is not checked on other locations. > + /* Specify a data-less block. */ > + blk_lpos->begin = INVALID_LPOS; > + blk_lpos->next = INVALID_LPOS; > + return NULL; > + } > +/** > + * prb_next_seq() - Get the sequence number after the last available record. > + * > + * @rb: The ringbuffer to get the sequence number from. > + * > + * This is the public function available to readers to see what the next > + * newest sequence number available to readers will be. > + * > + * This provides readers a sequence number to jump to if all currently > + * available records should be skipped. > + * > + * Context: Any context. > + * Return: The sequence number of the next newest (not yet available) record > + * for readers. > + */ > +u64 prb_next_seq(struct printk_ringbuffer *rb) > +{ > + u64 seq = 0; > + > + do { > + /* Search forward from the oldest descriptor. */ > + if (!_prb_read_valid(rb, &seq, NULL, NULL)) > + return seq; > + seq++; > + } while (seq); seq should never become zero here. It would take ages to overflow and just creates false assumption about the logic. I would replace this with for(;;) > + > + return 0; > +} > + > --- /dev/null > +++ b/kernel/printk/printk_ringbuffer.h > @@ -0,0 +1,352 @@ > +/* SPDX-License-Identifier: GPL-2.0 */ > + > +#ifndef _KERNEL_PRINTK_RINGBUFFER_H > +#define _KERNEL_PRINTK_RINGBUFFER_H > + > +#include <linux/atomic.h> > + It might be worth to add some information. And maybe also define who is supposed to manipulate the struct members. /* * Meta information about each stored message. * * They are stored and used by printk code. The only exception are: * seq, text_len, dict_len that are handled also in the ringbuffer * code. */ > +struct printk_info { > + u64 seq; /* sequence number */ > + u64 ts_nsec; /* timestamp in nanoseconds */ > + u16 text_len; /* length of text message */ > + u16 dict_len; /* length of dictionary message */ > + u8 facility; /* syslog facility */ > + u8 flags:5; /* internal record flags */ > + u8 level:3; /* syslog level */ > + u32 caller_id; /* thread id or processor id */ > +}; > +/* Specifies the position/span of a data block. */ s/position/logical position/ > +struct prb_data_blk_lpos { > + unsigned long begin; > + unsigned long next; > +}; > + > +/* A descriptor: the complete meta-data for a record. */ state_var might deserve some description, e.g. * @state_var combines logical position of the descriptor * in the array ring buffer (id) and two state bits. > +struct prb_desc { > + struct printk_info info; > + atomic_long_t state_var; > + struct prb_data_blk_lpos text_blk_lpos; > + struct prb_data_blk_lpos dict_blk_lpos; > +}; > + > +/* A ringbuffer of "ID + data" elements. */ > +struct prb_data_ring { > + unsigned int size_bits; > + char *data; > + atomic_long_t head_lpos; > + atomic_long_t tail_lpos; > +}; > + > +/* A ringbuffer of "struct prb_desc" elements. */ > +struct prb_desc_ring { > + unsigned int count_bits; > + struct prb_desc *descs; > + atomic_long_t head_id; > + atomic_long_t tail_id; > +}; > + > +/* The high level structure representing the printk ringbuffer. */ @fault might desrve some comment, .e.g. * @fail: automatically counted number of failed prb_reserve() calls. > +struct printk_ringbuffer { > + struct prb_desc_ring desc_ring; > + struct prb_data_ring text_data_ring; > + struct prb_data_ring dict_data_ring; > + atomic_long_t fail; > +}; > + > +/* Used by writers as a reserve/commit handle. */ We should emphasise that is should be manipulated only by the ring_buffer code. Also @id and @text_space might deserve some explanation. I think that it is better to use the full description here: /** * struct prb_reserved_entry - reserve/commit handle used * by writers and readers. * * @rb: ring buffer where the entry has been reserved * @irqflags: saved irq flags to be used when the entry gets committed * @id: logical position of the reserved descriptor * @text_space: Really used space in text data ring, including * the stored "id" and and alignment * * It must be manipulated only by the ring buffer code. */ > +struct prb_reserved_entry { > + struct printk_ringbuffer *rb; > + unsigned long irqflags; > + unsigned long id; > + unsigned int text_space; > +}; > +/** > + * prb_for_each_record() - Iterate over a ringbuffer. > + * > + * @from: The sequence number to begin with. > + * @rb: The ringbuffer to iterate over. > + * @s: A u64 to store the sequence number on each iteration. > + * @r: A printk_record to store the record on each iteration. > + * > + * This is a macro for conveniently iterating over a ringbuffer. It might be worth emphasizing that @s is internal value. The reader should check r->seq to see that acutally read sequence number. IMHO, it is not clearly visible from the code. At least, I had to check prb_read_valid() to realize this, > + * > + * Context: Any context. > + */ > +#define prb_for_each_record(from, rb, s, r) \ > +for ((s) = from; prb_read_valid(rb, s, r); (s) = (r)->info->seq + 1) That's it. Uff, it is a huge relief that there was nothing memory barriers related ;-) Best Regards, Petr _______________________________________________ kexec mailing list kexec@xxxxxxxxxxxxxxxxxxx http://lists.infradead.org/mailman/listinfo/kexec