We add an infrastructure for correcting the timestamps of the entries. This is needed in order to correlate Data streams that have been recorded using non-synchronized clocks. The infrastructure can handle an arbitrary timestamps correction formula, however for the moment we only provide calibration that adds a constant offset. Signed-off-by: Yordan Karadzhov (VMware) <y.karadz@xxxxxxxxx> --- src/libkshark-tepdata.c | 13 +++++- src/libkshark.c | 99 +++++++++++++++++++++++++++++++++++++++++ src/libkshark.h | 27 +++++++++++ 3 files changed, 138 insertions(+), 1 deletion(-) diff --git a/src/libkshark-tepdata.c b/src/libkshark-tepdata.c index d9d57843..31cb33d5 100644 --- a/src/libkshark-tepdata.c +++ b/src/libkshark-tepdata.c @@ -296,6 +296,9 @@ static ssize_t get_records(struct kshark_context *kshark_ctx, entry = &temp_rec->entry; missed_events_action(stream, rec, entry); + /* Apply time calibration. */ + kshark_postprocess_entry(stream, rec, entry); + entry->stream_id = stream->stream_id; temp_next = &temp_rec->next; @@ -316,6 +319,12 @@ static ssize_t get_records(struct kshark_context *kshark_ctx, entry->stream_id = stream->stream_id; + /* + * Post-process the content of the entry. This includes + * time calibration and event-specific plugin actions. + */ + kshark_postprocess_entry(stream, rec, entry); + pid = entry->pid; /* Apply Id filtering. */ @@ -481,8 +490,10 @@ static ssize_t tepdata_load_matrix(struct kshark_data_stream *stream, if (cpu_array) (*cpu_array)[count] = e->cpu; - if (ts_array) + if (ts_array) { + kshark_calib_entry(stream, e); (*ts_array)[count] = e->ts; + } if (pid_array) (*pid_array)[count] = e->pid; diff --git a/src/libkshark.c b/src/libkshark.c index 6f99dd6c..33476ed0 100644 --- a/src/libkshark.c +++ b/src/libkshark.c @@ -131,6 +131,7 @@ static void kshark_stream_free(struct kshark_data_stream *stream) kshark_hash_id_free(stream->tasks); + free(stream->calib_array); free(stream->file); free(stream->name); free(stream); @@ -862,6 +863,37 @@ void kshark_plugin_actions(struct kshark_data_stream *stream, } } +/** + * @brief Time calibration of the timestamp of the entry. + * + * @param stream: Input location for a Trace data stream pointer. + * @param entry: Output location for entry. + */ +void kshark_calib_entry(struct kshark_data_stream *stream, + struct kshark_entry *entry) +{ + if (stream->calib && stream->calib_array) { + /* Calibrate the timestamp of the entry. */ + stream->calib(&entry->ts, stream->calib_array); + } +} + +/** + * @brief Post-process the content of the entry. This includes time calibration + * and all registered event-specific plugin actions. + * + * @param stream: Input location for a Trace data stream pointer. + * @param record: Input location for the trace record. + * @param entry: Output location for entry. + */ +void kshark_postprocess_entry(struct kshark_data_stream *stream, + void *record, struct kshark_entry *entry) +{ + kshark_calib_entry(stream, entry); + + kshark_plugin_actions(stream, record, entry); +} + static inline void free_ptr(void *ptr) { if (ptr) @@ -1259,6 +1291,73 @@ kshark_get_entry_back(const struct kshark_entry_request *req, return get_entry(req, data, index, req->first, end, -1); } +static int compare_time(const void* a, const void* b) +{ + const struct kshark_entry *entry_a, *entry_b; + + entry_a = *(const struct kshark_entry **) a; + entry_b = *(const struct kshark_entry **) b; + + if (entry_a->ts > entry_b->ts) + return 1; + + if (entry_a->ts < entry_b->ts) + return -1; + + return 0; +} + +static void kshark_data_qsort(struct kshark_entry **entries, size_t size) +{ + qsort(entries, size, sizeof(struct kshark_entry *), compare_time); +} + +/** + * Add constant offset to the timestamp of the entry. To be used by the sream + * object as a System clock calibration callback function. + */ +void kshark_offset_calib(int64_t *ts, int64_t *argv) +{ + *ts += argv[0]; +} + +/** + * @brief Apply constant offset to the timestamps of all entries from a given + * Data stream. + * + * @param kshark_ctx: Input location for the session context pointer. + * @param entries: Input location for the trace data. + * @param size: The size of the trace data. + * @param sd: Data stream identifier. + * @param offset: The constant offset to be added (in nanosecond). + */ +void kshark_set_clock_offset(struct kshark_context *kshark_ctx, + struct kshark_entry **entries, size_t size, + int sd, int64_t offset) +{ + struct kshark_data_stream *stream; + int64_t correction; + + stream = kshark_get_data_stream(kshark_ctx, sd); + if (!stream) + return; + + if (!stream->calib_array) { + stream->calib = kshark_offset_calib; + stream->calib_array = calloc(1, sizeof(*stream->calib_array)); + stream->calib_array_size = 1; + } + + correction = offset - stream->calib_array[0]; + stream->calib_array[0] = offset; + + for (size_t i = 0; i < size; ++i) + if (entries[i]->stream_id == sd) + entries[i]->ts += correction; + + kshark_data_qsort(entries, size); +} + static int first_in_time_entry(struct kshark_entry_data_set *buffer, int n_buffers, size_t *count) { int64_t t_min = INT64_MAX; diff --git a/src/libkshark.h b/src/libkshark.h index fbe1b2ac..dc30665b 100644 --- a/src/libkshark.h +++ b/src/libkshark.h @@ -115,6 +115,12 @@ void kshark_hash_id_free(struct kshark_hash_id *hash); int *kshark_hash_ids(struct kshark_hash_id *hash); +/** + * Timestamp calibration function type. To be user for system clock + * calibration. + */ +typedef void (*time_calib_func) (int64_t *, int64_t *); + struct kshark_data_stream; /** A function type to be used to initialize the interface of the data stream. */ @@ -315,6 +321,15 @@ struct kshark_data_stream { /** The number of plugins registered for this stream.*/ int n_plugins; + /** System clock calibration function. */ + time_calib_func calib; + + /** An array of time calibration constants. */ + int64_t *calib_array; + + /** The size of the array of time calibration constants. */ + size_t calib_array_size; + /** List of Plugin's Event handlers. */ struct kshark_event_proc_handler *event_handlers; @@ -669,6 +684,12 @@ void kshark_clear_all_filters(struct kshark_context *kshark_ctx, void kshark_plugin_actions(struct kshark_data_stream *stream, void *record, struct kshark_entry *entry); +void kshark_calib_entry(struct kshark_data_stream *stream, + struct kshark_entry *entry); + +void kshark_postprocess_entry(struct kshark_data_stream *stream, + void *record, struct kshark_entry *entry); + /** Search failed identifiers. */ enum kshark_search_failed { /** All entries have greater timestamps. */ @@ -1059,6 +1080,12 @@ struct kshark_config_doc *kshark_open_config_file(const char *file_name, struct kshark_config_doc *kshark_json_to_conf(struct json_object *jobj); +void kshark_offset_calib(int64_t *ts, int64_t *atgv); + +void kshark_set_clock_offset(struct kshark_context *kshark_ctx, + struct kshark_entry **entries, size_t size, + int sd, int64_t offset); + /** Structure representing a data set made of KernelShark entries. */ struct kshark_entry_data_set { /** Array of entries pointers. */ -- 2.25.1