diff mbox series

[v2,10/20] kernel-shark: Start using data streams

Message ID 20201012133523.469040-11-y.karadz@gmail.com (mailing list archive)
State Superseded
Headers show
Series Start KernelShark v2 transformation | expand

Commit Message

Yordan Karadzhov Oct. 12, 2020, 1:35 p.m. UTC
Here we switch to using the trace data readout, provided by the Data
stream interface. The actual implementation of the new readout was
done in the previous commits.

Signed-off-by: Yordan Karadzhov (VMware) <y.karadz@gmail.com>
---
 examples/dataload.c |  21 ++-
 src/libkshark.c     | 336 ++++++++++++++++++--------------------------
 src/libkshark.h     |  24 +++-
 3 files changed, 164 insertions(+), 217 deletions(-)

Comments

Steven Rostedt Oct. 14, 2020, 6:56 p.m. UTC | #1
On Mon, 12 Oct 2020 16:35:13 +0300
"Yordan Karadzhov (VMware)" <y.karadz@gmail.com> wrote:

> Here we switch to using the trace data readout, provided by the Data
> stream interface. The actual implementation of the new readout was
> done in the previous commits.
> 
> Signed-off-by: Yordan Karadzhov (VMware) <y.karadz@gmail.com>
> ---
> --- a/src/libkshark.c
> +++ b/src/libkshark.c
> @@ -19,6 +19,7 @@
>  
>  // KernelShark
>  #include "libkshark.h"
> +#include "libkshark-tepdata.h"
>  
>  static __thread struct trace_seq seq;
>  
> @@ -32,6 +33,9 @@ static bool kshark_default_context(struct kshark_context **context)
>  	if (!kshark_ctx)
>  		return false;
>  
> +	kshark_ctx->stream = calloc(KS_MAX_NUM_STREAMS,
> +				    sizeof(*kshark_ctx->stream));
> +
>  	kshark_ctx->event_handlers = NULL;
>  	kshark_ctx->collections = NULL;
>  	kshark_ctx->plugins = NULL;
> @@ -108,62 +112,28 @@ bool kshark_instance(struct kshark_context **kshark_ctx)
>  	return true;
>  }
>  
> -static void kshark_free_task_list(struct kshark_context *kshark_ctx)
> -{
> -	struct kshark_task_list *task;
> -	int i;
> -
> -	if (!kshark_ctx)
> -		return;
> -
> -	for (i = 0; i < KS_TASK_HASH_SIZE; ++i) {
> -		while (kshark_ctx->tasks[i]) {
> -			task = kshark_ctx->tasks[i];
> -			kshark_ctx->tasks[i] = task->next;
> -			free(task);
> -		}
> -	}
> -}
> -
>  /**
>   * @brief Open and prepare for reading a trace data file specified by "file".
> - *	  If the specified file does not exist, or contains no trace data,
> - *	  the function returns false.
>   *
>   * @param kshark_ctx: Input location for context pointer.
>   * @param file: The file to load.
>   *
> - * @returns True on success, or false on failure.
> + * @returns The Id number of the data stream associated with this file on success.
> + * 	    Otherwise a negative errno code.
>   */
> -bool kshark_open(struct kshark_context *kshark_ctx, const char *file)
> +int kshark_open(struct kshark_context *kshark_ctx, const char *file)
>  {
> -	struct tracecmd_input *handle;
> -
> -	kshark_free_task_list(kshark_ctx);
> -
> -	handle = tracecmd_open(file);
> -	if (!handle)
> -		return false;
> -
> -	if (pthread_mutex_init(&kshark_ctx->input_mutex, NULL) != 0) {
> -		tracecmd_close(handle);
> -		return false;
> -	}
> -
> -	kshark_ctx->handle = handle;
> -	kshark_ctx->pevent = tracecmd_get_pevent(handle);
> +	int sd, rt;
>  
> -	kshark_ctx->advanced_event_filter =
> -		tep_filter_alloc(kshark_ctx->pevent);
> +	sd = kshark_add_stream(kshark_ctx);
> +	if (sd < 0)
> +		return sd;
>  
> -	/*
> -	 * Turn off function trace indent and turn on show parent
> -	 * if possible.
> -	 */
> -	tep_plugin_add_option("ftrace:parent", "1");
> -	tep_plugin_add_option("ftrace:indent", "0");
> +	rt = kshark_stream_open(kshark_ctx->stream[sd], file);
> +	if (rt < 0)

On failure, we should probably destroy the stream that was created.

> +		return rt;
>  
> -	return true;
> +	return sd;
>  }
>  
>  static void kshark_stream_free(struct kshark_data_stream *stream)
> @@ -253,6 +223,56 @@ int kshark_add_stream(struct kshark_context *kshark_ctx)
>  	return stream->stream_id;
>  }
>  
> +static bool is_tep(const char *filename)
> +{
> +	/*
> +	 * TODO: This is very naive. Implement more appropriate check. Ideally
> +	 * it should be part of the trace-cmd library.
> +	 */
> +	char *ext = strrchr(filename, '.');
> +	return ext && strcmp(ext, ".dat") == 0;
> +}
> +
> +static void set_format(struct kshark_context *kshark_ctx,
> +		       struct kshark_data_stream *stream,
> +		       const char *filename)
> +{
> +	stream->format = KS_INVALIDE_DATA;
> +
> +	if (is_tep(filename)) {
> +		stream->format = KS_TEP_DATA;
> +		return;
> +	}
> +}
> +
> +/**
> + * @brief Use an existing Trace data stream to open and prepare for reading
> + *	  a trace data file specified by "file".
> + *
> + * @param stream: Input location for a Trace data stream pointer.
> + * @param file: The file to load.
> + *
> + * @returns Zero on success or a negative error code in the case of an errno.
> + */
> +int kshark_stream_open(struct kshark_data_stream *stream, const char *file)
> +{
> +	struct kshark_context *kshark_ctx = NULL;
> +
> +	if (!stream || !kshark_instance(&kshark_ctx))
> +		return -EFAULT;
> +
> +	stream->file = strdup(file);
> +	set_format(kshark_ctx, stream, file);
> +
> +	switch (stream->format) {
> +	case KS_TEP_DATA:
> +		return kshark_tep_init_input(stream, file);
> +
> +	default:
> +		return -ENODATA;
> +	}
> +}
> +
>  /**
>   * @brief Get the Data stream object having given Id.
>   *
> @@ -313,45 +333,76 @@ int *kshark_all_streams(struct kshark_context *kshark_ctx)
>  	return ids;
>  }
>  
> +static void kshark_stream_close(struct kshark_data_stream *stream)
> +{
> +	struct kshark_context *kshark_ctx = NULL;
> +
> +	if (!stream || !kshark_instance(&kshark_ctx))
> +		return;
> +
> +	/*
> +	 * All filters are file specific. Make sure that all Process Ids and
> +	 * Event Ids from this file are not going to be used with another file.
> +	 */
> +	kshark_hash_id_clear(stream->show_task_filter);
> +	kshark_hash_id_clear(stream->hide_task_filter);
> +	kshark_hash_id_clear(stream->show_event_filter);
> +	kshark_hash_id_clear(stream->hide_event_filter);
> +	kshark_hash_id_clear(stream->show_cpu_filter);
> +	kshark_hash_id_clear(stream->hide_cpu_filter);
> +
> +	switch (stream->format) {
> +	case KS_TEP_DATA:
> +		kshark_tep_close_interface(stream);
> +		break;
> +
> +	default:
> +		break;
> +	}
> +
> +	pthread_mutex_destroy(&stream->input_mutex);
> +}
> +
>  /**
>   * @brief Close the trace data file and free the trace data handle.
>   *
>   * @param kshark_ctx: Input location for the session context pointer.
> + * @param sd: Data stream identifier.
>   */
> -void kshark_close(struct kshark_context *kshark_ctx)
> +void kshark_close(struct kshark_context *kshark_ctx, int sd)
>  {
> -	if (!kshark_ctx || !kshark_ctx->handle)
> +	struct kshark_data_stream *stream;
> +
> +	stream = kshark_get_data_stream(kshark_ctx, sd);
> +	if (!stream)
>  		return;
>  
> -	/*
> -	 * All filters are file specific. Make sure that the Pids and Event Ids
> -	 * from this file are not going to be used with another file.
> -	 */
> -	tracecmd_filter_id_clear(kshark_ctx->show_task_filter);
> -	tracecmd_filter_id_clear(kshark_ctx->hide_task_filter);
> -	tracecmd_filter_id_clear(kshark_ctx->show_event_filter);
> -	tracecmd_filter_id_clear(kshark_ctx->hide_event_filter);
> -	tracecmd_filter_id_clear(kshark_ctx->show_cpu_filter);
> -	tracecmd_filter_id_clear(kshark_ctx->hide_cpu_filter);
> -
> -	if (kshark_ctx->advanced_event_filter) {
> -		tep_filter_reset(kshark_ctx->advanced_event_filter);
> -		tep_filter_free(kshark_ctx->advanced_event_filter);
> -		kshark_ctx->advanced_event_filter = NULL;
> -	}
> +	kshark_stream_close(stream);
> +	kshark_stream_free(stream);
> +	kshark_ctx->stream[sd] = NULL;
> +	kshark_ctx->n_streams--;

So, if you have multiple streams, and you close one that's not the last,
and then add a new one, this will cause the new one to be overwritten.

As add_stream has:

	kshark_ctx->stream[kshark_ctx->n_streams++] = stream;

You may want to do instead:

	kshark_ctx->stream[sd] = NULL;
	while (!kshark->streams[kshark_ctx->n_streams - 1])
		kshark_ctx->n_streams--;

You can also add a free store, where you store an index in the stream[sd]
field of the next free item. Then for adding you have:

	if (kshark_ctx->free >= 0) {
		sd = kshark_ctx->free;
		kshark_ctx->free = (int)kshark_ctx->stream[kshark_ctx->free];
	} else {
		sd = kshark_ctx->n_streams++;
	}

and on freeing:

	kshark_ctx->streams[sd] = (void *)kshark_ctx->free;
	kshark_ctx->free = sd;

Just need to initialize kshark_ctx->free to -1.

And never decrement n_streams. If you do any loops over the stream, you
could verify that it is a real stream by:

	if ((unsigned long)kshark_ctx->streams[sd] > kshark_ctx->n_streams)
		/* pointer to a stream */
	else
		/* a free item. */

Places like kshark_close() would need the above (if doing a free store).

-- Steve


> +}
> +
> +/**
> + * @brief Close all currently open trace data file and free the trace data handle.
> + *
> + * @param kshark_ctx: Input location for the session context pointer.
> + */
> +void kshark_close_all(struct kshark_context *kshark_ctx)
> +{
> +	int i, *stream_ids, n_streams;
> +
> +	stream_ids = kshark_all_streams(kshark_ctx);
>  
>  	/*
> -	 * All data collections are file specific. Make sure that collections
> -	 * from this file are not going to be used with another file.
> +	 * Get a copy of shark_ctx->n_streams befor you start closing. Be aware
> +	 * that kshark_close() will decrement shark_ctx->n_streams.
>  	 */
> -	kshark_free_collection_list(kshark_ctx->collections);
> -	kshark_ctx->collections = NULL;
> -
> -	tracecmd_close(kshark_ctx->handle);
> -	kshark_ctx->handle = NULL;
> -	kshark_ctx->pevent = NULL;
> +	n_streams = kshark_ctx->n_streams;
> +	for (i = 0; i < n_streams; ++i)
> +		kshark_close(kshark_ctx, stream_ids[i]);
>  
> -	pthread_mutex_destroy(&kshark_ctx->input_mutex);
> +	free(stream_ids);
>  }
>  
>  /**
Yordan Karadzhov Nov. 5, 2020, 2:58 p.m. UTC | #2
On 14.10.20 г. 21:56 ч., Steven Rostedt wrote:
> On Mon, 12 Oct 2020 16:35:13 +0300
> "Yordan Karadzhov (VMware)" <y.karadz@gmail.com> wrote:
> 
>> Here we switch to using the trace data readout, provided by the Data
>> stream interface. The actual implementation of the new readout was
>> done in the previous commits.
>>
>> Signed-off-by: Yordan Karadzhov (VMware) <y.karadz@gmail.com>
>> ---
>> --- a/src/libkshark.c
>> +++ b/src/libkshark.c
>> @@ -19,6 +19,7 @@
>>   
>>   // KernelShark
>>   #include "libkshark.h"
>> +#include "libkshark-tepdata.h"
>>   
>>   static __thread struct trace_seq seq;
>>   
>> @@ -32,6 +33,9 @@ static bool kshark_default_context(struct kshark_context **context)
>>   	if (!kshark_ctx)
>>   		return false;
>>   
>> +	kshark_ctx->stream = calloc(KS_MAX_NUM_STREAMS,
>> +				    sizeof(*kshark_ctx->stream));
>> +
>>   	kshark_ctx->event_handlers = NULL;
>>   	kshark_ctx->collections = NULL;
>>   	kshark_ctx->plugins = NULL;
>> @@ -108,62 +112,28 @@ bool kshark_instance(struct kshark_context **kshark_ctx)
>>   	return true;
>>   }
>>   
>> -static void kshark_free_task_list(struct kshark_context *kshark_ctx)
>> -{
>> -	struct kshark_task_list *task;
>> -	int i;
>> -
>> -	if (!kshark_ctx)
>> -		return;
>> -
>> -	for (i = 0; i < KS_TASK_HASH_SIZE; ++i) {
>> -		while (kshark_ctx->tasks[i]) {
>> -			task = kshark_ctx->tasks[i];
>> -			kshark_ctx->tasks[i] = task->next;
>> -			free(task);
>> -		}
>> -	}
>> -}
>> -
>>   /**
>>    * @brief Open and prepare for reading a trace data file specified by "file".
>> - *	  If the specified file does not exist, or contains no trace data,
>> - *	  the function returns false.
>>    *
>>    * @param kshark_ctx: Input location for context pointer.
>>    * @param file: The file to load.
>>    *
>> - * @returns True on success, or false on failure.
>> + * @returns The Id number of the data stream associated with this file on success.
>> + * 	    Otherwise a negative errno code.
>>    */
>> -bool kshark_open(struct kshark_context *kshark_ctx, const char *file)
>> +int kshark_open(struct kshark_context *kshark_ctx, const char *file)
>>   {
>> -	struct tracecmd_input *handle;
>> -
>> -	kshark_free_task_list(kshark_ctx);
>> -
>> -	handle = tracecmd_open(file);
>> -	if (!handle)
>> -		return false;
>> -
>> -	if (pthread_mutex_init(&kshark_ctx->input_mutex, NULL) != 0) {
>> -		tracecmd_close(handle);
>> -		return false;
>> -	}
>> -
>> -	kshark_ctx->handle = handle;
>> -	kshark_ctx->pevent = tracecmd_get_pevent(handle);
>> +	int sd, rt;
>>   
>> -	kshark_ctx->advanced_event_filter =
>> -		tep_filter_alloc(kshark_ctx->pevent);
>> +	sd = kshark_add_stream(kshark_ctx);
>> +	if (sd < 0)
>> +		return sd;
>>   
>> -	/*
>> -	 * Turn off function trace indent and turn on show parent
>> -	 * if possible.
>> -	 */
>> -	tep_plugin_add_option("ftrace:parent", "1");
>> -	tep_plugin_add_option("ftrace:indent", "0");
>> +	rt = kshark_stream_open(kshark_ctx->stream[sd], file);
>> +	if (rt < 0)
> 
> On failure, we should probably destroy the stream that was created.


Correct, will be fixed in v3.

> 
>> +		return rt;
>>   
>> -	return true;
>> +	return sd;
>>   }
>>   
>>   static void kshark_stream_free(struct kshark_data_stream *stream)
>> @@ -253,6 +223,56 @@ int kshark_add_stream(struct kshark_context *kshark_ctx)
>>   	return stream->stream_id;
>>   }
>>   
>> +static bool is_tep(const char *filename)
>> +{
>> +	/*
>> +	 * TODO: This is very naive. Implement more appropriate check. Ideally
>> +	 * it should be part of the trace-cmd library.
>> +	 */
>> +	char *ext = strrchr(filename, '.');
>> +	return ext && strcmp(ext, ".dat") == 0;
>> +}
>> +
>> +static void set_format(struct kshark_context *kshark_ctx,
>> +		       struct kshark_data_stream *stream,
>> +		       const char *filename)
>> +{
>> +	stream->format = KS_INVALIDE_DATA;
>> +
>> +	if (is_tep(filename)) {
>> +		stream->format = KS_TEP_DATA;
>> +		return;
>> +	}
>> +}
>> +
>> +/**
>> + * @brief Use an existing Trace data stream to open and prepare for reading
>> + *	  a trace data file specified by "file".
>> + *
>> + * @param stream: Input location for a Trace data stream pointer.
>> + * @param file: The file to load.
>> + *
>> + * @returns Zero on success or a negative error code in the case of an errno.
>> + */
>> +int kshark_stream_open(struct kshark_data_stream *stream, const char *file)
>> +{
>> +	struct kshark_context *kshark_ctx = NULL;
>> +
>> +	if (!stream || !kshark_instance(&kshark_ctx))
>> +		return -EFAULT;
>> +
>> +	stream->file = strdup(file);
>> +	set_format(kshark_ctx, stream, file);
>> +
>> +	switch (stream->format) {
>> +	case KS_TEP_DATA:
>> +		return kshark_tep_init_input(stream, file);
>> +
>> +	default:
>> +		return -ENODATA;
>> +	}
>> +}
>> +
>>   /**
>>    * @brief Get the Data stream object having given Id.
>>    *
>> @@ -313,45 +333,76 @@ int *kshark_all_streams(struct kshark_context *kshark_ctx)
>>   	return ids;
>>   }
>>   
>> +static void kshark_stream_close(struct kshark_data_stream *stream)
>> +{
>> +	struct kshark_context *kshark_ctx = NULL;
>> +
>> +	if (!stream || !kshark_instance(&kshark_ctx))
>> +		return;
>> +
>> +	/*
>> +	 * All filters are file specific. Make sure that all Process Ids and
>> +	 * Event Ids from this file are not going to be used with another file.
>> +	 */
>> +	kshark_hash_id_clear(stream->show_task_filter);
>> +	kshark_hash_id_clear(stream->hide_task_filter);
>> +	kshark_hash_id_clear(stream->show_event_filter);
>> +	kshark_hash_id_clear(stream->hide_event_filter);
>> +	kshark_hash_id_clear(stream->show_cpu_filter);
>> +	kshark_hash_id_clear(stream->hide_cpu_filter);
>> +
>> +	switch (stream->format) {
>> +	case KS_TEP_DATA:
>> +		kshark_tep_close_interface(stream);
>> +		break;
>> +
>> +	default:
>> +		break;
>> +	}
>> +
>> +	pthread_mutex_destroy(&stream->input_mutex);
>> +}
>> +
>>   /**
>>    * @brief Close the trace data file and free the trace data handle.
>>    *
>>    * @param kshark_ctx: Input location for the session context pointer.
>> + * @param sd: Data stream identifier.
>>    */
>> -void kshark_close(struct kshark_context *kshark_ctx)
>> +void kshark_close(struct kshark_context *kshark_ctx, int sd)
>>   {
>> -	if (!kshark_ctx || !kshark_ctx->handle)
>> +	struct kshark_data_stream *stream;
>> +
>> +	stream = kshark_get_data_stream(kshark_ctx, sd);
>> +	if (!stream)
>>   		return;
>>   
>> -	/*
>> -	 * All filters are file specific. Make sure that the Pids and Event Ids
>> -	 * from this file are not going to be used with another file.
>> -	 */
>> -	tracecmd_filter_id_clear(kshark_ctx->show_task_filter);
>> -	tracecmd_filter_id_clear(kshark_ctx->hide_task_filter);
>> -	tracecmd_filter_id_clear(kshark_ctx->show_event_filter);
>> -	tracecmd_filter_id_clear(kshark_ctx->hide_event_filter);
>> -	tracecmd_filter_id_clear(kshark_ctx->show_cpu_filter);
>> -	tracecmd_filter_id_clear(kshark_ctx->hide_cpu_filter);
>> -
>> -	if (kshark_ctx->advanced_event_filter) {
>> -		tep_filter_reset(kshark_ctx->advanced_event_filter);
>> -		tep_filter_free(kshark_ctx->advanced_event_filter);
>> -		kshark_ctx->advanced_event_filter = NULL;
>> -	}
>> +	kshark_stream_close(stream);
>> +	kshark_stream_free(stream);
>> +	kshark_ctx->stream[sd] = NULL;
>> +	kshark_ctx->n_streams--;
> 
> So, if you have multiple streams, and you close one that's not the last,
> and then add a new one, this will cause the new one to be overwritten.
> 
> As add_stream has:
> 
> 	kshark_ctx->stream[kshark_ctx->n_streams++] = stream;
> 

I see the problem. This is definitely wrong.

What if in addition to "n_streams" I add another counter called 
"last_stream_added" and initialize this counter to -1?

Then I can add streams like this:

	kshark_ctx->stream[++kshark_ctx->last_stream_added] = stream;
	++kshark_ctx->n_streams;

> You may want to do instead:
> 
> 	kshark_ctx->stream[sd] = NULL;
> 	while (!kshark->streams[kshark_ctx->n_streams - 1])
> 		kshark_ctx->n_streams--;
> 
> You can also add a free store, where you store an index in the stream[sd]
> field of the next free item. Then for adding you have:
> 
> 	if (kshark_ctx->free >= 0) {
> 		sd = kshark_ctx->free;
> 		kshark_ctx->free = (int)kshark_ctx->stream[kshark_ctx->free];
> 	} else {
> 		sd = kshark_ctx->n_streams++;
> 	}
> 
> and on freeing:
> 
> 	kshark_ctx->streams[sd] = (void *)kshark_ctx->free;
> 	kshark_ctx->free = sd;
> 
> Just need to initialize kshark_ctx->free to -1.
> 
> And never decrement n_streams. If you do any loops over the stream, you
> could verify that it is a real stream by:
> 

I really need n_streams to show the true number of active streams 
because the widgets are using this a lot.

The way I loop over the active streams is the following:

	int *stream_ids = kshark_all_streams(kshark_ctx);
	for (i = 0; i < kshark_ctx->n_streams; ++i) {
		stream = kshark_ctx->stream[stream_ids[i]];

		....
	}

	free(stream_ids);

and with the addition of "last_stream_added" kshark_all_streams() will 
look like this:

int *kshark_all_streams(struct kshark_context *kshark_ctx)
{
	int *ids, i, count = 0;

	ids = calloc(kshark_ctx->n_streams, (sizeof(*ids)));
	if (!ids)
		return NULL;

	for (i = 0; i <= kshark_ctx->last_stream_added; ++i)
		if (kshark_ctx->stream[i])
			ids[count++] = i;

	return ids;
}

What do you think?

Thanks a lot!
Yordan

> 	if ((unsigned long)kshark_ctx->streams[sd] > kshark_ctx->n_streams)
> 		/* pointer to a stream */
> 	else
> 		/* a free item. */
> 
> Places like kshark_close() would need the above (if doing a free store).
> 
> -- Steve
> 
> 
>> +}
>> +
>> +/**
>> + * @brief Close all currently open trace data file and free the trace data handle.
>> + *
>> + * @param kshark_ctx: Input location for the session context pointer.
>> + */
>> +void kshark_close_all(struct kshark_context *kshark_ctx)
>> +{
>> +	int i, *stream_ids, n_streams;
>> +
>> +	stream_ids = kshark_all_streams(kshark_ctx);
>>   
>>   	/*
>> -	 * All data collections are file specific. Make sure that collections
>> -	 * from this file are not going to be used with another file.
>> +	 * Get a copy of shark_ctx->n_streams befor you start closing. Be aware
>> +	 * that kshark_close() will decrement shark_ctx->n_streams.
>>   	 */
>> -	kshark_free_collection_list(kshark_ctx->collections);
>> -	kshark_ctx->collections = NULL;
>> -
>> -	tracecmd_close(kshark_ctx->handle);
>> -	kshark_ctx->handle = NULL;
>> -	kshark_ctx->pevent = NULL;
>> +	n_streams = kshark_ctx->n_streams;
>> +	for (i = 0; i < n_streams; ++i)
>> +		kshark_close(kshark_ctx, stream_ids[i]);
>>   
>> -	pthread_mutex_destroy(&kshark_ctx->input_mutex);
>> +	free(stream_ids);
>>   }
>>   
>>   /**
Steven Rostedt Nov. 5, 2020, 6:17 p.m. UTC | #3
On Thu, 5 Nov 2020 16:58:51 +0200
"Yordan Karadzhov (VMware)" <y.karadz@gmail.com> wrote:

> I see the problem. This is definitely wrong.
> 
> What if in addition to "n_streams" I add another counter called 
> "last_stream_added" and initialize this counter to -1?
> 
> Then I can add streams like this:
> 
> 	kshark_ctx->stream[++kshark_ctx->last_stream_added] = stream;
> 	++kshark_ctx->n_streams;
> 
> > You may want to do instead:

I'm thinking of doing something like this:

struct kshark_ctx {
	[..]
	unsigned int free_stream;
	[..]
};


	if (kshark_ctx->free_stream >= kshark_ctx->n_streams) {
		kshark_ctx->stream[++kshark_ctx->n_streams] = stream;
		kshark_ctx->free_stream = kshark_ctx->n_streams;

/* BTW, the stream array should be allocated to the n_streams, and
   reallocated when it grows.  I don't think we want a huge stream array to
   handle all the bits when not used. */

	} else {
		int new_stream = kshark_ctx->free_stream;

		kshark_ctx->free_stream = kshark_index(kshark_ctx->stream[new_stream]);
		kshark_ctx->stream[new_stream] = stream;
	}


For freeing (index i):

	kshark_ctx->stream[i] = kshark_ptr(kshark_ctx->free_stream);
	kshark_ctx->free_stream = i;


We could define the following (note, I just used these names for the
functions, they could be named something else):


#define KSHARK_INDEX_MASK ((1 << NR_OF_BITS_FOR_STREAM) - 1)

#define KSHARK_INVALID_STREAM (~((1UL << NR_OF_BITS_FOR_STREAM) - 1))

static inline int kshark_index(void *ptr)
{
	unsigned long index = (unsigned long)ptr;

	return (int)(index & KSHARK_INDEX_MASK);
}

static inline void *kshark_ptr(unsigned int index)
{
	unsigned long p;

	p = KSHARK_INVALID_STREAM | index;

	return (void *)p;
}

The KSHARK_INVALID_STREAM and KSHARK_INDEX_MASK, would allow us to do
something like this if we wanted to loop through all streams:

static inline bool kshark_is_valid_stream(void *ptr)
{
	unsigned long p = (unsigned long)ptr;

	return (p & KSHARK_INVALID_STREAM) == KSHARK_INVALID_STREAM;
}

The above works because the address of setting all those bits, would put
the address into the kernel space (illegal user space address).


	for (i = 0; i < kshark_ctx->n_streams; i++) {
		if (!kshark_is_valid_stream(kshark_ctx->stream[i]))
			continue;
		/* process valid stream */
	}

-- Steve
Yordan Karadzhov Nov. 6, 2020, 2:31 p.m. UTC | #4
On 5.11.20 г. 20:17 ч., Steven Rostedt wrote:
> On Thu, 5 Nov 2020 16:58:51 +0200
> "Yordan Karadzhov (VMware)" <y.karadz@gmail.com> wrote:
> 
>> I see the problem. This is definitely wrong.
>>
>> What if in addition to "n_streams" I add another counter called
>> "last_stream_added" and initialize this counter to -1?
>>
>> Then I can add streams like this:
>>
>> 	kshark_ctx->stream[++kshark_ctx->last_stream_added] = stream;
>> 	++kshark_ctx->n_streams;
>>
>>> You may want to do instead:
> 
> I'm thinking of doing something like this:
> 
> struct kshark_ctx {
> 	[..]
> 	unsigned int free_stream;
> 	[..]
> };
> 
> 
> 	if (kshark_ctx->free_stream >= kshark_ctx->n_streams) {
> 		kshark_ctx->stream[++kshark_ctx->n_streams] = stream;
> 		kshark_ctx->free_stream = kshark_ctx->n_streams;
> 
> /* BTW, the stream array should be allocated to the n_streams, and
>     reallocated when it grows.  I don't think we want a huge stream array to
>     handle all the bits when not used. */
> 
> 	} else {
> 		int new_stream = kshark_ctx->free_stream;
> 
> 		kshark_ctx->free_stream = kshark_index(kshark_ctx->stream[new_stream]);
> 		kshark_ctx->stream[new_stream] = stream;
> 	}
> 
> 
> For freeing (index i):
> 
> 	kshark_ctx->stream[i] = kshark_ptr(kshark_ctx->free_stream);
> 	kshark_ctx->free_stream = i;
> 
> 
> We could define the following (note, I just used these names for the
> functions, they could be named something else):
> 
> 
> #define KSHARK_INDEX_MASK ((1 << NR_OF_BITS_FOR_STREAM) - 1)
> 
> #define KSHARK_INVALID_STREAM (~((1UL << NR_OF_BITS_FOR_STREAM) - 1))
> 
> static inline int kshark_index(void *ptr)
> {
> 	unsigned long index = (unsigned long)ptr;
> 
> 	return (int)(index & KSHARK_INDEX_MASK);
> }
> 
> static inline void *kshark_ptr(unsigned int index)
> {
> 	unsigned long p;
> 
> 	p = KSHARK_INVALID_STREAM | index;
> 
> 	return (void *)p;
> }
> 
> The KSHARK_INVALID_STREAM and KSHARK_INDEX_MASK, would allow us to do
> something like this if we wanted to loop through all streams:
> 
> static inline bool kshark_is_valid_stream(void *ptr)
> {
> 	unsigned long p = (unsigned long)ptr;
> 
> 	return (p & KSHARK_INVALID_STREAM) == KSHARK_INVALID_STREAM;
> }
> 
> The above works because the address of setting all those bits, would put
> the address into the kernel space (illegal user space address).
> 
> 
> 	for (i = 0; i < kshark_ctx->n_streams; i++) {
> 		if (!kshark_is_valid_stream(kshark_ctx->stream[i]))
> 			continue;
> 		/* process valid stream */
> 	}
>

Hi Steven,

I am not sure I understand correctly your pseudo-code, so please correct 
me if my interpretation is wrong.

In the normal case when a new stream is added the corresponding object 
will be allocated and added to the array of pointers. Later if a stream 
is removed, instead of freeing the memory we will just manipulate it 
pointer so that it point to nowhere and this manipulation can be 
detected by the kshark_is_valid_stream(). Now if we want to add stream 
again, we will take the broken pointer, will restore its original value 
and will reuse the object without a new allocations.

And at the very end we will have to free all pointers (original or 
manipulated).

Is this what you are suggesting?

Thanks!
Y.


> -- Steve
>
Steven Rostedt Nov. 6, 2020, 3:18 p.m. UTC | #5
On Fri, 6 Nov 2020 16:31:58 +0200
"Yordan Karadzhov (VMware)" <y.karadz@gmail.com> wrote:

> Hi Steven,
> 
> I am not sure I understand correctly your pseudo-code, so please correct 
> me if my interpretation is wrong.

Note, it's not really pseudo code. I didn't compile it, but it should work
mostly unmodified.

> 
> In the normal case when a new stream is added the corresponding object 
> will be allocated and added to the array of pointers. Later if a stream 
> is removed, instead of freeing the memory we will just manipulate it 
> pointer so that it point to nowhere and this manipulation can be 
> detected by the kshark_is_valid_stream(). Now if we want to add stream 
> again, we will take the broken pointer, will restore its original value 
> and will reuse the object without a new allocations.
> 
> And at the very end we will have to free all pointers (original or 
> manipulated).
> 
> Is this what you are suggesting?

Hmm, let me explain it slightly different, as nothing is "restored".

We change the value of the array from pointer to an index. For ease of
explanation, let's consider this a 32bit machine and we allow 256 streams
(one byte). That is:


sizeof(long) == sizeof(void *) == 4

#define NR_OF_BITS_FOR_STREAM 8

#define KSHARK_INDEX_MASK 0x000000FF
#define KSHARK_INVALID_STREAM 0xFFFFFF00

Lets say we add 4 stream in a row. Each one will detect that free_stream is
equal to n_streams, and just append them to the end of the array
(reallocating the array if necessary). We end up with:


free_stream = 4
n_streams = 4

streams:

  0: 0x07123010   -> stream 1
  1: 0x07123020   -> stream 2
  2: 0x07123030   -> stream 3
  3: 0x07123040   -> stream 4


Now we free stream 3 (at location 2):

free_streams = 2
n_streams = 4

streams:
  0: 0x07123010   -> stream 1
  1: 0x07123020   -> stream 2
  2: 0xFFFFFF04	== (KSHARK_INVALID_MASK | orig:free_streams)
  3: 0x07123040   -> stream 4


Now we free stream 1 (at location 0):

free_streams = 0
n_streams = 4

streams:
  0: 0xFFFFFF02	== (KSHARK_INVALID_MASK | orig:free_streams)
  1: 0x07123020   -> stream 2
  2: 0xFFFFFF04	== (KSHARK_INVALID_MASK | 4)
  3: 0x07123040   -> stream 4


Now lets add stream 5:

free_streams = 2 == (streams[orig:free_streams] & KSHARK_INDEX_MASK)
n_streams = 4

streams:
  0: 0x07123050   -> stream 5
  1: 0x07123020   -> stream 2
  2: 0xFFFFFF04
  3: 0x07123040   -> stream 4

We are just making a link list of free pointers within the array of
streams. This is basically exactly how memory management systems work. The
free memory list is stored inside the memory itself that is being allocated.

The 0xFFFFFF is so that if we want to loop over streams, we can skip the
free slots, by checking: streams[i] & 0xFFFFFF00 != 0xFFFFFF00

Note, the free slots do not point any memory location. Think of the items
in the stream array as:

union {
	struct kshark_stream 	*stream;
	unsigned long		index;
};

You can differentiate without using typecasts with:

	if (streams[i].index & 0xFFFFFF00 == 0xFFFFFF00)
		index = streams[i].index & 0xFF;
	else
		stream = streams[i].stream;

Does that make more sense?

-- Steve
Yordan Karadzhov Nov. 9, 2020, 2:49 p.m. UTC | #6
On 6.11.20 г. 17:18 ч., Steven Rostedt wrote:
> On Fri, 6 Nov 2020 16:31:58 +0200
> "Yordan Karadzhov (VMware)" <y.karadz@gmail.com> wrote:
> 
>> Hi Steven,
>>
>> I am not sure I understand correctly your pseudo-code, so please correct
>> me if my interpretation is wrong.
> 
> Note, it's not really pseudo code. I didn't compile it, but it should work
> mostly unmodified.
> 
>>
>> In the normal case when a new stream is added the corresponding object
>> will be allocated and added to the array of pointers. Later if a stream
>> is removed, instead of freeing the memory we will just manipulate it
>> pointer so that it point to nowhere and this manipulation can be
>> detected by the kshark_is_valid_stream(). Now if we want to add stream
>> again, we will take the broken pointer, will restore its original value
>> and will reuse the object without a new allocations.
>>
>> And at the very end we will have to free all pointers (original or
>> manipulated).
>>
>> Is this what you are suggesting?
> 
> Hmm, let me explain it slightly different, as nothing is "restored".
> 
> We change the value of the array from pointer to an index. For ease of
> explanation, let's consider this a 32bit machine and we allow 256 streams
> (one byte). That is:
> 
> 
> sizeof(long) == sizeof(void *) == 4
> 
> #define NR_OF_BITS_FOR_STREAM 8
> 
> #define KSHARK_INDEX_MASK 0x000000FF
> #define KSHARK_INVALID_STREAM 0xFFFFFF00
> 
> Lets say we add 4 stream in a row. Each one will detect that free_stream is
> equal to n_streams, and just append them to the end of the array
> (reallocating the array if necessary). We end up with:
> 
> 
> free_stream = 4
> n_streams = 4
> 
> streams:
> 
>    0: 0x07123010   -> stream 1
>    1: 0x07123020   -> stream 2
>    2: 0x07123030   -> stream 3
>    3: 0x07123040   -> stream 4
> 
> 
> Now we free stream 3 (at location 2):
> 
> free_streams = 2
> n_streams = 4
> 
> streams:
>    0: 0x07123010   -> stream 1
>    1: 0x07123020   -> stream 2
>    2: 0xFFFFFF04	== (KSHARK_INVALID_MASK | orig:free_streams)
>    3: 0x07123040   -> stream 4
> 
> 
> Now we free stream 1 (at location 0):
> 
> free_streams = 0
> n_streams = 4
> 
> streams:
>    0: 0xFFFFFF02	== (KSHARK_INVALID_MASK | orig:free_streams)
>    1: 0x07123020   -> stream 2
>    2: 0xFFFFFF04	== (KSHARK_INVALID_MASK | 4)
>    3: 0x07123040   -> stream 4
> 
> 
> Now lets add stream 5:
> 
> free_streams = 2 == (streams[orig:free_streams] & KSHARK_INDEX_MASK)
> n_streams = 4
> 
> streams:
>    0: 0x07123050   -> stream 5
>    1: 0x07123020   -> stream 2
>    2: 0xFFFFFF04
>    3: 0x07123040   -> stream 4
> 
> We are just making a link list of free pointers within the array of
> streams. This is basically exactly how memory management systems work. The
> free memory list is stored inside the memory itself that is being allocated.
> 
> The 0xFFFFFF is so that if we want to loop over streams, we can skip the
> free slots, by checking: streams[i] & 0xFFFFFF00 != 0xFFFFFF00
> 
> Note, the free slots do not point any memory location. Think of the items
> in the stream array as:
> 
> union {
> 	struct kshark_stream 	*stream;
> 	unsigned long		index;
> };
> 
> You can differentiate without using typecasts with:
> 
> 	if (streams[i].index & 0xFFFFFF00 == 0xFFFFFF00)
> 		index = streams[i].index & 0xFF;
> 	else
> 		stream = streams[i].stream;
> 
> Does that make more sense?
> 


Yes, it makes a lot of sense. Thanks a lot!
I am implementing it. It requires to do some rebasing in the successive 
patches (the GUI code). I am almost ready to send v3.

thanks!
Yordan


> -- Steve
>
diff mbox series

Patch

diff --git a/examples/dataload.c b/examples/dataload.c
index 15c5de05..c13acbcb 100644
--- a/examples/dataload.c
+++ b/examples/dataload.c
@@ -19,8 +19,7 @@  int main(int argc, char **argv)
 	struct kshark_entry **data = NULL;
 	ssize_t r, n_rows, n_tasks;
 	char *entry_str;
-	bool status;
-	int *pids;
+	int sd, *pids;
 
 	/* Create a new kshark session. */
 	kshark_ctx = NULL;
@@ -29,30 +28,28 @@  int main(int argc, char **argv)
 
 	/* Open a trace data file produced by trace-cmd. */
 	if (argc > 1)
-		status = kshark_open(kshark_ctx, argv[1]);
+		sd = kshark_open(kshark_ctx, argv[1]);
 	else
-		status = kshark_open(kshark_ctx, default_file);
+		sd = kshark_open(kshark_ctx, default_file);
 
-	if (!status) {
+	if (sd < 0) {
 		kshark_free(kshark_ctx);
 		return 1;
 	}
 
 	/* Load the content of the file into an array of entries. */
-	n_rows = kshark_load_data_entries(kshark_ctx, &data);
+	n_rows = kshark_load_entries(kshark_ctx, sd, &data);
 	if (n_rows < 1) {
 		kshark_free(kshark_ctx);
 		return 1;
 	}
 
 	/* Print to the screen the list of all tasks. */
-	n_tasks = kshark_get_task_pids(kshark_ctx, &pids);
+	n_tasks = kshark_get_task_pids(kshark_ctx, sd, &pids);
 	for (r = 0; r < n_tasks; ++r) {
-		const char *task_str =
-			tep_data_comm_from_pid(kshark_ctx->pevent,
-					       pids[r]);
-
+		char *task_str = kshark_comm_from_pid(sd, pids[r]);
 		printf("task: %s-%i\n", task_str, pids[r]);
+		free(task_str);
 	}
 
 	free(pids);
@@ -82,7 +79,7 @@  int main(int argc, char **argv)
 	free(data);
 
 	/* Close the file. */
-	kshark_close(kshark_ctx);
+	kshark_close(kshark_ctx, sd);
 
 	/* Close the session. */
 	kshark_free(kshark_ctx);
diff --git a/src/libkshark.c b/src/libkshark.c
index 56721717..6eff53e5 100644
--- a/src/libkshark.c
+++ b/src/libkshark.c
@@ -19,6 +19,7 @@ 
 
 // KernelShark
 #include "libkshark.h"
+#include "libkshark-tepdata.h"
 
 static __thread struct trace_seq seq;
 
@@ -32,6 +33,9 @@  static bool kshark_default_context(struct kshark_context **context)
 	if (!kshark_ctx)
 		return false;
 
+	kshark_ctx->stream = calloc(KS_MAX_NUM_STREAMS,
+				    sizeof(*kshark_ctx->stream));
+
 	kshark_ctx->event_handlers = NULL;
 	kshark_ctx->collections = NULL;
 	kshark_ctx->plugins = NULL;
@@ -108,62 +112,28 @@  bool kshark_instance(struct kshark_context **kshark_ctx)
 	return true;
 }
 
-static void kshark_free_task_list(struct kshark_context *kshark_ctx)
-{
-	struct kshark_task_list *task;
-	int i;
-
-	if (!kshark_ctx)
-		return;
-
-	for (i = 0; i < KS_TASK_HASH_SIZE; ++i) {
-		while (kshark_ctx->tasks[i]) {
-			task = kshark_ctx->tasks[i];
-			kshark_ctx->tasks[i] = task->next;
-			free(task);
-		}
-	}
-}
-
 /**
  * @brief Open and prepare for reading a trace data file specified by "file".
- *	  If the specified file does not exist, or contains no trace data,
- *	  the function returns false.
  *
  * @param kshark_ctx: Input location for context pointer.
  * @param file: The file to load.
  *
- * @returns True on success, or false on failure.
+ * @returns The Id number of the data stream associated with this file on success.
+ * 	    Otherwise a negative errno code.
  */
-bool kshark_open(struct kshark_context *kshark_ctx, const char *file)
+int kshark_open(struct kshark_context *kshark_ctx, const char *file)
 {
-	struct tracecmd_input *handle;
-
-	kshark_free_task_list(kshark_ctx);
-
-	handle = tracecmd_open(file);
-	if (!handle)
-		return false;
-
-	if (pthread_mutex_init(&kshark_ctx->input_mutex, NULL) != 0) {
-		tracecmd_close(handle);
-		return false;
-	}
-
-	kshark_ctx->handle = handle;
-	kshark_ctx->pevent = tracecmd_get_pevent(handle);
+	int sd, rt;
 
-	kshark_ctx->advanced_event_filter =
-		tep_filter_alloc(kshark_ctx->pevent);
+	sd = kshark_add_stream(kshark_ctx);
+	if (sd < 0)
+		return sd;
 
-	/*
-	 * Turn off function trace indent and turn on show parent
-	 * if possible.
-	 */
-	tep_plugin_add_option("ftrace:parent", "1");
-	tep_plugin_add_option("ftrace:indent", "0");
+	rt = kshark_stream_open(kshark_ctx->stream[sd], file);
+	if (rt < 0)
+		return rt;
 
-	return true;
+	return sd;
 }
 
 static void kshark_stream_free(struct kshark_data_stream *stream)
@@ -253,6 +223,56 @@  int kshark_add_stream(struct kshark_context *kshark_ctx)
 	return stream->stream_id;
 }
 
+static bool is_tep(const char *filename)
+{
+	/*
+	 * TODO: This is very naive. Implement more appropriate check. Ideally
+	 * it should be part of the trace-cmd library.
+	 */
+	char *ext = strrchr(filename, '.');
+	return ext && strcmp(ext, ".dat") == 0;
+}
+
+static void set_format(struct kshark_context *kshark_ctx,
+		       struct kshark_data_stream *stream,
+		       const char *filename)
+{
+	stream->format = KS_INVALIDE_DATA;
+
+	if (is_tep(filename)) {
+		stream->format = KS_TEP_DATA;
+		return;
+	}
+}
+
+/**
+ * @brief Use an existing Trace data stream to open and prepare for reading
+ *	  a trace data file specified by "file".
+ *
+ * @param stream: Input location for a Trace data stream pointer.
+ * @param file: The file to load.
+ *
+ * @returns Zero on success or a negative error code in the case of an errno.
+ */
+int kshark_stream_open(struct kshark_data_stream *stream, const char *file)
+{
+	struct kshark_context *kshark_ctx = NULL;
+
+	if (!stream || !kshark_instance(&kshark_ctx))
+		return -EFAULT;
+
+	stream->file = strdup(file);
+	set_format(kshark_ctx, stream, file);
+
+	switch (stream->format) {
+	case KS_TEP_DATA:
+		return kshark_tep_init_input(stream, file);
+
+	default:
+		return -ENODATA;
+	}
+}
+
 /**
  * @brief Get the Data stream object having given Id.
  *
@@ -313,45 +333,76 @@  int *kshark_all_streams(struct kshark_context *kshark_ctx)
 	return ids;
 }
 
+static void kshark_stream_close(struct kshark_data_stream *stream)
+{
+	struct kshark_context *kshark_ctx = NULL;
+
+	if (!stream || !kshark_instance(&kshark_ctx))
+		return;
+
+	/*
+	 * All filters are file specific. Make sure that all Process Ids and
+	 * Event Ids from this file are not going to be used with another file.
+	 */
+	kshark_hash_id_clear(stream->show_task_filter);
+	kshark_hash_id_clear(stream->hide_task_filter);
+	kshark_hash_id_clear(stream->show_event_filter);
+	kshark_hash_id_clear(stream->hide_event_filter);
+	kshark_hash_id_clear(stream->show_cpu_filter);
+	kshark_hash_id_clear(stream->hide_cpu_filter);
+
+	switch (stream->format) {
+	case KS_TEP_DATA:
+		kshark_tep_close_interface(stream);
+		break;
+
+	default:
+		break;
+	}
+
+	pthread_mutex_destroy(&stream->input_mutex);
+}
+
 /**
  * @brief Close the trace data file and free the trace data handle.
  *
  * @param kshark_ctx: Input location for the session context pointer.
+ * @param sd: Data stream identifier.
  */
-void kshark_close(struct kshark_context *kshark_ctx)
+void kshark_close(struct kshark_context *kshark_ctx, int sd)
 {
-	if (!kshark_ctx || !kshark_ctx->handle)
+	struct kshark_data_stream *stream;
+
+	stream = kshark_get_data_stream(kshark_ctx, sd);
+	if (!stream)
 		return;
 
-	/*
-	 * All filters are file specific. Make sure that the Pids and Event Ids
-	 * from this file are not going to be used with another file.
-	 */
-	tracecmd_filter_id_clear(kshark_ctx->show_task_filter);
-	tracecmd_filter_id_clear(kshark_ctx->hide_task_filter);
-	tracecmd_filter_id_clear(kshark_ctx->show_event_filter);
-	tracecmd_filter_id_clear(kshark_ctx->hide_event_filter);
-	tracecmd_filter_id_clear(kshark_ctx->show_cpu_filter);
-	tracecmd_filter_id_clear(kshark_ctx->hide_cpu_filter);
-
-	if (kshark_ctx->advanced_event_filter) {
-		tep_filter_reset(kshark_ctx->advanced_event_filter);
-		tep_filter_free(kshark_ctx->advanced_event_filter);
-		kshark_ctx->advanced_event_filter = NULL;
-	}
+	kshark_stream_close(stream);
+	kshark_stream_free(stream);
+	kshark_ctx->stream[sd] = NULL;
+	kshark_ctx->n_streams--;
+}
+
+/**
+ * @brief Close all currently open trace data file and free the trace data handle.
+ *
+ * @param kshark_ctx: Input location for the session context pointer.
+ */
+void kshark_close_all(struct kshark_context *kshark_ctx)
+{
+	int i, *stream_ids, n_streams;
+
+	stream_ids = kshark_all_streams(kshark_ctx);
 
 	/*
-	 * All data collections are file specific. Make sure that collections
-	 * from this file are not going to be used with another file.
+	 * Get a copy of shark_ctx->n_streams befor you start closing. Be aware
+	 * that kshark_close() will decrement shark_ctx->n_streams.
 	 */
-	kshark_free_collection_list(kshark_ctx->collections);
-	kshark_ctx->collections = NULL;
-
-	tracecmd_close(kshark_ctx->handle);
-	kshark_ctx->handle = NULL;
-	kshark_ctx->pevent = NULL;
+	n_streams = kshark_ctx->n_streams;
+	for (i = 0; i < n_streams; ++i)
+		kshark_close(kshark_ctx, stream_ids[i]);
 
-	pthread_mutex_destroy(&kshark_ctx->input_mutex);
+	free(stream_ids);
 }
 
 /**
@@ -359,7 +410,7 @@  void kshark_close(struct kshark_context *kshark_ctx)
  *	  open trace data files and before your application terminates.
  *
  * @param kshark_ctx: Optional input location for session context pointer.
- *		      If it points to a context of a sessuin, that sessuin
+ *		      If it points to a context of a session, that session
  *		      will be deinitialize. If it points to NULL, it will
  *		      deinitialize the current session.
  */
@@ -373,25 +424,12 @@  void kshark_free(struct kshark_context *kshark_ctx)
 		/* kshark_ctx_handler will be set to NULL below. */
 	}
 
-	tracecmd_filter_id_hash_free(kshark_ctx->show_task_filter);
-	tracecmd_filter_id_hash_free(kshark_ctx->hide_task_filter);
+	kshark_close_all(kshark_ctx);
 
-	tracecmd_filter_id_hash_free(kshark_ctx->show_event_filter);
-	tracecmd_filter_id_hash_free(kshark_ctx->hide_event_filter);
+	free(kshark_ctx->stream);
 
-	tracecmd_filter_id_hash_free(kshark_ctx->show_cpu_filter);
-	tracecmd_filter_id_hash_free(kshark_ctx->hide_cpu_filter);
-
-	if (kshark_ctx->plugins) {
-		kshark_handle_plugins(kshark_ctx, KSHARK_PLUGIN_CLOSE);
+	if (kshark_ctx->plugins)
 		kshark_free_plugin_list(kshark_ctx->plugins);
-		kshark_free_event_handler_list(kshark_ctx->event_handlers);
-	}
-
-	kshark_free_task_list(kshark_ctx);
-
-	if (seq.buffer)
-		trace_seq_destroy(&seq);
 
 	if (kshark_ctx == kshark_context_handler)
 		kshark_context_handler = NULL;
@@ -490,57 +528,24 @@  kshark_add_task(struct kshark_context *kshark_ctx, int pid)
  *	  the loaded trace data file.
  *
  * @param kshark_ctx: Input location for context pointer.
+ * @param sd: Data stream identifier.
  * @param pids: Output location for the Pids of the tasks. The user is
  *		responsible for freeing the elements of the outputted array.
  *
  * @returns The size of the outputted array of Pids in the case of success,
  *	    or a negative error code on failure.
  */
-ssize_t kshark_get_task_pids(struct kshark_context *kshark_ctx, int **pids)
+ssize_t kshark_get_task_pids(struct kshark_context *kshark_ctx, int sd,
+			     int **pids)
 {
-	size_t i, pid_count = 0, pid_size = KS_TASK_HASH_SIZE;
-	struct kshark_task_list *list;
-	int *temp_pids;
-
-	*pids = calloc(pid_size, sizeof(int));
-	if (!*pids)
-		goto fail;
-
-	for (i = 0; i < KS_TASK_HASH_SIZE; ++i) {
-		list = kshark_ctx->tasks[i];
-		while (list) {
-			(*pids)[pid_count] = list->pid;
-			list = list->next;
-			if (++pid_count >= pid_size) {
-				pid_size *= 2;
-				temp_pids = realloc(*pids, pid_size * sizeof(int));
-				if (!temp_pids) {
-					goto fail;
-				}
-				*pids = temp_pids;
-			}
-		}
-	}
-
-	if (pid_count) {
-		temp_pids = realloc(*pids, pid_count * sizeof(int));
-		if (!temp_pids)
-			goto fail;
-
-		/* Paranoid: In the unlikely case of shrinking *pids, realloc moves it */
-		*pids = temp_pids;
-	} else {
-		free(*pids);
-		*pids = NULL;
-	}
+	struct kshark_data_stream *stream;
 
-	return pid_count;
+	stream = kshark_get_data_stream(kshark_ctx, sd);
+	if (!stream)
+		return -EBADF;
 
-fail:
-	fprintf(stderr, "Failed to allocate memory for Task Pids.\n");
-	free(*pids);
-	*pids = NULL;
-	return -ENOMEM;
+	*pids = kshark_hash_ids(stream->tasks);
+	return stream->tasks->count;
 }
 
 static bool filter_find(struct tracecmd_filter_id *filter, int pid,
@@ -1552,75 +1557,6 @@  char* kshark_dump_custom_entry(struct kshark_context *kshark_ctx,
 	return NULL;
 }
 
-/**
- * @brief Dump into a string the content of one entry. The function allocates
- *	  a null terminated string and returns a pointer to this string. The
- *	  user has to free the returned string.
- *
- * @param entry: A Kernel Shark entry to be printed.
- *
- * @returns The returned string contains a semicolon-separated list of data
- *	    fields.
- */
-char* kshark_dump_entry(const struct kshark_entry *entry)
-{
-	const char *event_name, *task, *lat, *info;
-	struct kshark_context *kshark_ctx;
-	char *temp_str, *entry_str;
-	int size = 0;
-
-	kshark_ctx = NULL;
-	if (!kshark_instance(&kshark_ctx) || !init_thread_seq())
-		return NULL;
-
-	task = tep_data_comm_from_pid(kshark_ctx->pevent, entry->pid);
-
-	if (entry->event_id >= 0) {
-		struct tep_event *event;
-		struct tep_record *data;
-
-		data = tracecmd_read_at(kshark_ctx->handle, entry->offset,
-					NULL);
-
-		event = tep_find_event(kshark_ctx->pevent, entry->event_id);
-
-		event_name = event? event->name : "[UNKNOWN EVENT]";
-		lat = get_latency(kshark_ctx->pevent, data);
-
-		size = asprintf(&temp_str, "%" PRIu64 "; %s-%i; CPU %i; %s;",
-				entry->ts,
-				task,
-				entry->pid,
-				entry->cpu,
-				lat);
-
-		info = get_info(kshark_ctx->pevent, data, event);
-
-		if (size > 0) {
-			size = asprintf(&entry_str, "%s %s; %s; 0x%x",
-					temp_str,
-					event_name,
-					info,
-					entry->visible);
-
-			free(temp_str);
-		}
-
-		free_record(data);
-		if (size < 1)
-			entry_str = NULL;
-	} else {
-		switch (entry->event_id) {
-		case KS_EVENT_OVERFLOW:
-			entry_str = kshark_dump_custom_entry(kshark_ctx, entry,
-							     missed_events_dump);
-		default:
-			entry_str = NULL;
-		}
-	}
-
-	return entry_str;
-}
 
 /**
  * @brief Binary search inside a time-sorted array of kshark_entries.
diff --git a/src/libkshark.h b/src/libkshark.h
index f26fad7c..ef48d94b 100644
--- a/src/libkshark.h
+++ b/src/libkshark.h
@@ -401,7 +401,9 @@  struct kshark_context {
 
 bool kshark_instance(struct kshark_context **kshark_ctx);
 
-bool kshark_open(struct kshark_context *kshark_ctx, const char *file);
+int kshark_open(struct kshark_context *kshark_ctx, const char *file);
+
+int kshark_stream_open(struct kshark_data_stream *stream, const char *file);
 
 int kshark_add_stream(struct kshark_context *kshark_ctx);
 
@@ -426,9 +428,12 @@  size_t kshark_load_data_matrix(struct kshark_context *kshark_ctx,
 			       uint16_t **pid_array,
 			       int **event_array);
 
-ssize_t kshark_get_task_pids(struct kshark_context *kshark_ctx, int **pids);
+ssize_t kshark_get_task_pids(struct kshark_context *kshark_ctx, int sd,
+			     int **pids);
+
+void kshark_close(struct kshark_context *kshark_ctx, int sd);
 
-void kshark_close(struct kshark_context *kshark_ctx);
+void kshark_close_all(struct kshark_context *kshark_ctx);
 
 void kshark_free(struct kshark_context *kshark_ctx);
 
@@ -450,8 +455,6 @@  const char *kshark_get_info_easy(struct kshark_entry *entry);
 
 void kshark_convert_nano(uint64_t time, uint64_t *sec, uint64_t *usec);
 
-char* kshark_dump_entry(const struct kshark_entry *entry);
-
 static inline int kshark_get_pid(const struct kshark_entry *entry)
 {
 	struct kshark_data_stream *stream =
@@ -536,6 +539,17 @@  static inline int kshark_read_event_field(const struct kshark_entry *entry,
 							field, val);
 }
 
+static inline char *kshark_dump_entry(const struct kshark_entry *entry)
+{
+	struct kshark_data_stream *stream =
+		kshark_get_stream_from_entry(entry);
+
+	if (!stream)
+		return NULL;
+
+	return stream->interface.dump_entry(stream, entry);
+}
+
 /**
  * @brief Load the content of the trace data file asociated with a given
  *	  Data stream identifie into an array of kshark_entries.