Usually, machine learning models are built in jupyter laptops, the code of which looks, to put it mildly, not very good - long sheets of noodles of expressions and calls "on the knee" of written functions. It is clear that such code is almost impossible to maintain, so each project is rewritten almost from scratch. And about the introduction of this code in production even scary to think.
Therefore, today we are submitting to your strict court a preview of the Python library for working with datasets and data science models. With it, your python code might look like this:
my_dataset. load('/some/path'). normalize(). resize(shape=(256, 256, 256)). random_rotate(angle=(-30, 30)). random_crop(shape=(64, 64, 64)) for i in range(MAX_ITER): batch = my_dataset.next_batch(BATCH_SIZE, shuffle=True) # ,
In this article you will learn about the main classes and methods that will help make your code simple, understandable and convenient.
The library is undergoing final polishing and has not yet been made publicly available.
This article is not a complete documentation, but only a brief description of the library and examples of its use.
Your comments will help finalize the library and incorporate the features you need into it.
The amount of data can be very large, and by the beginning of data processing you may not have all the data at all, for example, if they arrive gradually. Therefore, the Dataset
class does not store data in it. It includes an index - a list of items of your data (these can be identifiers or just ordinal numbers), as well as the Batch
class, which defines methods for working with data.
dataset = Dataset(index = some_index, batch_class=DataFrameBatch)
The main purpose of Dataset
is the formation of batches.
batch = dataset.next_batch(BATCH_SIZE, shuffle=True) # batch - DataFrameBatch, # BATCH_SIZE
or you can call a generator:
for batch in dataset.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): # batch - DataFrameBatch
Butches can be collected in a strictly orderly or chaotic manner, iterate endlessly or make exactly 1 cycle according to your data. You can even create batches of different sizes at every step, if it makes sense in your situation.
In addition to the iteration in Dataset
, another useful operation is cv_split
- cv_split
- which divides dataset into train, test, and validation. And, which is especially convenient, each of them is again dataset.
dataset.cv_split([0.7, 0.2, 0.1]) # 70 / 20 / 10 # for i in range(MAX_ITER): batch = dataset.train.next_batch(BATCH_SIZE, shuffle=True) # ,
Dataset elements are addressed using an index. This may be a set of identifiers (clients, transactions, CT images) or just sequence numbers (for example, numpy.arange(N)
). Dataset can be (almost) arbitrarily large and not fit in RAM. But this is not required. After all, data processing is performed by batch.
Creating an index is very simple:
ds_index = DatasetIndex(sequence_of_item_ids)
The sequence can be a list, numpy
array, pandas.Series
or any other iterated data type.
When the source data is stored in separate files, it is convenient to build an index immediately from the list of these files:
ds_index = FilesIndex(path='/some/path/*.dat', no_ext=True)
Here, the elements of the index will be the file names (without extensions) from the specified directory.
It happens that elements of dataset (for example, 3-dimensional CT images) are stored in separate directories.
ds_index = FilesIndex(path='/ct_images_??/*', dirs=True)
This will build the general index of all subdirectories of /ct_images_01
, /ct_images_02
, /ct_images_02
, etc. The file index remembers the full paths of its elements. Therefore, later in the load
or save
method you can conveniently get the path index.get_fullpath(index_item)
.
Although most often you don’t have to operate with indices at all - all the necessary work is done inside, and you already work only with the whole batch.
All storage logic and methods for processing your data are defined in the Batch
class. Let's create a class for working with CT images as an example. The base class Batch
, the descendant of which will become our CTImagesBatch
, already has an index
attribute that stores the list of elements of this batch, as well as the data
attribute, which is initialized to None
. And since this is enough for us, we will not redefine the constructor.
Therefore, we will immediately proceed to the creation of an action
load
method:
class CTImagesBatch(Batch): @action def load(self, src, fmt): if fmt == 'dicom': self.data = self._load_dicom(src) elif fmt == 'blosc': self.data = self._load_blosc(src) elif fmt == 'npz': self.data = self._load_npz(src) else: raise ValueError("Incorrect format") return self
First, the method must be preceded by the @action
decorator (later you will know why).
Secondly, it must return a Batch
object. This may be a new object of the same class (in this case, CTImagesBatch), or an object of another class (but necessarily a descendant of Batch
), or you can simply return self
.
This approach allows you to describe the chain of actions on data. Moreover, in the course of processing, data may change not only in content, but also in format and structure.
We will not spend time on private methods _load_dicom
, _load_blosc
and _load_npz
. They are able to load data from files of a specific format and return a 3-dimensional numpy
array — [batch size, image width, image height]. The main thing is that it was here that we determined how the data of each batch is arranged, and we will continue to work with this array.
Now let's write the very_complicated_processing
method, which performs some extremely complex image processing. Since the images in the batch are independent of each other, it would be convenient to process them in parallel.
class CTImagesBatch(Batch): ... @action @inbatch_parallel(target='threads') def very_complicated_processing(self, item, *args, **kwargs): # ... return processed_image_as_array
That is, the method should be written as if it processes one snapshot, and the index of this snapshot is passed in the first parameter.
In order for the parallelism magic to work, the method needs to be wrapped by the decorator, where the parallelism technology is defined (processes, threads, etc.), as well as the pre- and post-processing functions that are called before and after the parallelization.
By the way, it is better to write operations with intensive input-output as async
methods and parallelize through target='async'
, which will significantly speed up data loading and unloading.
It is clear that this all adds to the convenience of programming, but it does not at all relieve the “ thinking ”, is there any need for parallelism, which one and what will not make it worse?
When all the action
methods are written, you can work with the batch:
for i in range(MAX_ITER): batch = ct_images_dataset.next_batch(BATCH_SIZE, shuffle=True) processed_batch = batch.load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)) .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) # , processed_batch
It looks good ... but somehow it is wrong that the iteration over the batch is mixed with data processing. Yes, and I want to shorten the cycle of learning the model, so that there is nothing at all except next_batch
.
In general, it is necessary to move the chain of action
methods to the dataset level.
And it can be done. It’s not for nothing that we have guarded all these action
decorators. They contain the cunning magic of transferring methods to the level of dataset. So just write:
ct_images_pipeline = ct_images_dataset.pipeline(). .load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)). .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) # ... for i in range(MAX_ITER): batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True) # ,
You do not need to create a new descendant class Dataset
and describe all these methods in it. They are in the corresponding Batch
classes and are marked by the @action
decorator, @action
means you can safely call them as if they were in the Dataset
class.
Another trick is that with this approach, all action
methods become "lazy" (lazy) and are executed deferred. That is, loading, processing, resizing and other actions are performed for each batch at the moment of the formation of this batch by calling next_batch
.
And since the processing of each batch can take a lot of time, it would be nice to form the batch in advance. This is especially important if the model is trained on the GPU, because then a simple GPU, while waiting for a new batch, can easily “eat” all the advantages of its high performance.
batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True, prefetch=3)
The prefetch
parameter indicates that 3 batch should be considered in parallel. Additionally, you can specify the parallelization technology (processes, threads).
In real machine learning tasks, you rarely have to deal with a single dataset. Most often you will have at least two data sets: X and Y. For example, data on the parameters of houses and data on their cost. In computer vision tasks, in addition to the images themselves, there are class labels, segmenting masks and bounding boxes.
In general, it is useful to be able to form parallel batches from several datasets. And for this you can perform the join
operation or create a JointDataset
.
If you need only a parallel iteration of the batches, it will be more convenient to create a single dataset:
joint_dataset = JointDataset((ds_X, ds_Y))
If ds_X
and ds_Y
are not based on the same index, then it is important that the indexes are of the same length and are equally ordered, that is, the value of ds_Y[i]
corresponds to the value of ds_X[i]
. In this case, the creation of dataset will look a little different:
joint_dataset = JointDataset((ds_X, ds_Y), align='order')
And then everything happens in a completely standard way:
for i in range(MAX_ITER): batch_X, batch_Y = joint_dataset.next_batch(BATCH_SIZE, shuffle=True)
Only now next_batch
returns not one batch, but a tuple with batches from each dataset.
Naturally, JointDataset
can also consist of pipelines:
pl_images = ct_images_ds.pipeline() .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256,256,256)) .segment_lungs() pl_labels = labels_ds.pipeline() .load('/other/path', 'csv') .apply(lambda x: (x['diagnosis'] == 'C').astype('int')) full_ds = JointDataset((pl_images, pl_labels), align='same') for i in range(MAX_ITER): images_batch, labels_batch = full_ds.next_batch(BATCH_SIZE, shuffle=True) # ,
And since the components of the dataset are pipelines, the loading and processing of images and tags is started only by calling next_batch
. That is, all calculations are performed and the batch is formed only when it is needed.
However, there are other situations when you need to perform an operation with a dataset, applying data from another dataset to it.
It is better to demonstrate this with the example of CT images. We load the coordinates and dimensions of the cancer and form three-dimensional masks from them.
pl_masks = nodules_ds.pipeline() .load('/other/path', 'csv') .calculate_3d_masks()
Load CT images and apply masks to them to isolate only cancerous areas.
pl_images = ct_images_ds.pipeline(). .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256, 256, 256)) .join(pl_masks) .apply_masks(op='mult')
In join
you specify. Due to this, the next action
method (in this example, in apply_masks
) will be passed the batch from this dataset as the first argument. And not just any batches, but exactly those that are needed. For example, if the current batch from ct_images_ds
contains images 117, 234, 186 and 14, then the attached batch with masks will also apply to images 117, 234, 186 and 14.
Naturally, the apply_masks
method should be written with this argument in mind, because it can be passed explicitly, without first join
. And in the action
method you can no longer think about the indices and identifiers of the elements of the batch - you just apply an array of masks to the image array.
Again, I note that no downloads and calculations, neither with images nor with masks, will be launched until you call pl_images.next_batch
So, let's see how the full workflow data science of the project will look like.
ct_images_index = FilesIndex(path='/ct_images_??/*', dirs=True) ct_images_dataset = Dataset(index = ct_images_index, batch_class=CTImagesBatch)
We perform preprocessing and save the processed images.
ct_images_dataset.pipeline() .load(None, 'dicom') # dicom .hu_normalize() .resize(shape=(256, 256, 256)) .segment_lungs() .save('/preprocessed/images', 'blosc') .run(BATCH_SIZE, shuffle=False, one_pass=True)
Describe the preparation and augmentation of data for the model.
ct_preprocessed_index = FilesIndex(path='/preprocessed/images/*') ct_preprocessed_dataset = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) # ct_images_pipeline = ct_preprocessed_dataset.pipeline() .load(None, 'blosc') .split_to_patches(shape=(64, 64, 64)) # ct_masks_ds = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) ct_masks_pipeline = ct_masks_ds.pipeline(). .load('/preprocessed/masks', 'blosc') .split_to_patches(shape=(64, 64, 64)) # full_ds = JointDataset((ct_images_pipeline, ct_masks_pipeline))
We form training batches and train the model
full_ds.cv_split([0.8, 0.2]) for i in range(MAX_ITER): images, masks = full_ds.train.next_batch(BATCH_SIZE, shuffle=True) # ,
for images, masks in full_ds.test.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): #
This is such a convenient library that helps to develop significantly faster high-quality code, reuse previously created models with complex data preprocessing, and even develop production-ready systems.
And now the question: what else should I add to the library? What are you sorely lacking when working with data and models?
Source: https://habr.com/ru/post/326656/
All Articles