📜 ⬆️ ⬇️

Data science and quality code

Usually, machine learning models are built in jupyter laptops, the code of which looks, to put it mildly, not very good - long sheets of noodles of expressions and calls "on the knee" of written functions. It is clear that such code is almost impossible to maintain, so each project is rewritten almost from scratch. And about the introduction of this code in production even scary to think.


Therefore, today we are submitting to your strict court a preview of the Python library for working with datasets and data science models. With it, your python code might look like this:


my_dataset. load('/some/path'). normalize(). resize(shape=(256, 256, 256)). random_rotate(angle=(-30, 30)). random_crop(shape=(64, 64, 64)) for i in range(MAX_ITER): batch = my_dataset.next_batch(BATCH_SIZE, shuffle=True) #  ,      

In this article you will learn about the main classes and methods that will help make your code simple, understandable and convenient.



The library is undergoing final polishing and has not yet been made publicly available.
This article is not a complete documentation, but only a brief description of the library and examples of its use.
Your comments will help finalize the library and incorporate the features you need into it.


Dataset


The amount of data can be very large, and by the beginning of data processing you may not have all the data at all, for example, if they arrive gradually. Therefore, the Dataset class does not store data in it. It includes an index - a list of items of your data (these can be identifiers or just ordinal numbers), as well as the Batch class, which defines methods for working with data.


 dataset = Dataset(index = some_index, batch_class=DataFrameBatch) 

The main purpose of Dataset is the formation of batches.


 batch = dataset.next_batch(BATCH_SIZE, shuffle=True) # batch -   DataFrameBatch, #  BATCH_SIZE   

or you can call a generator:


 for batch in dataset.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): # batch -   DataFrameBatch 

Butches can be collected in a strictly orderly or chaotic manner, iterate endlessly or make exactly 1 cycle according to your data. You can even create batches of different sizes at every step, if it makes sense in your situation.


In addition to the iteration in Dataset , another useful operation is cv_split - cv_split - which divides dataset into train, test, and validation. And, which is especially convenient, each of them is again dataset.


 dataset.cv_split([0.7, 0.2, 0.1]) #    70 / 20 / 10 #     for i in range(MAX_ITER): batch = dataset.train.next_batch(BATCH_SIZE, shuffle=True) #  ,      

Index


Dataset elements are addressed using an index. This may be a set of identifiers (clients, transactions, CT images) or just sequence numbers (for example, numpy.arange(N) ). Dataset can be (almost) arbitrarily large and not fit in RAM. But this is not required. After all, data processing is performed by batch.


Creating an index is very simple:


 ds_index = DatasetIndex(sequence_of_item_ids) 

The sequence can be a list, numpy array, pandas.Series or any other iterated data type.


When the source data is stored in separate files, it is convenient to build an index immediately from the list of these files:


 ds_index = FilesIndex(path='/some/path/*.dat', no_ext=True) 

Here, the elements of the index will be the file names (without extensions) from the specified directory.


It happens that elements of dataset (for example, 3-dimensional CT images) are stored in separate directories.


 ds_index = FilesIndex(path='/ct_images_??/*', dirs=True) 

This will build the general index of all subdirectories of /ct_images_01 , /ct_images_02 , /ct_images_02 , etc. The file index remembers the full paths of its elements. Therefore, later in the load or save method you can conveniently get the path index.get_fullpath(index_item) .


Although most often you don’t have to operate with indices at all - all the necessary work is done inside, and you already work only with the whole batch.


Class batch


All storage logic and methods for processing your data are defined in the Batch class. Let's create a class for working with CT images as an example. The base class Batch , the descendant of which will become our CTImagesBatch , already has an index attribute that stores the list of elements of this batch, as well as the data attribute, which is initialized to None . And since this is enough for us, we will not redefine the constructor.


Therefore, we will immediately proceed to the creation of an action load method:


 class CTImagesBatch(Batch): @action def load(self, src, fmt): if fmt == 'dicom': self.data = self._load_dicom(src) elif fmt == 'blosc': self.data = self._load_blosc(src) elif fmt == 'npz': self.data = self._load_npz(src) else: raise ValueError("Incorrect format") return self 

First, the method must be preceded by the @action decorator (later you will know why).


Secondly, it must return a Batch object. This may be a new object of the same class (in this case, CTImagesBatch), or an object of another class (but necessarily a descendant of Batch ), or you can simply return self .


This approach allows you to describe the chain of actions on data. Moreover, in the course of processing, data may change not only in content, but also in format and structure.


We will not spend time on private methods _load_dicom , _load_blosc and _load_npz . They are able to load data from files of a specific format and return a 3-dimensional numpy array — [batch size, image width, image height]. The main thing is that it was here that we determined how the data of each batch is arranged, and we will continue to work with this array.


Now let's write the very_complicated_processing method, which performs some extremely complex image processing. Since the images in the batch are independent of each other, it would be convenient to process them in parallel.


 class CTImagesBatch(Batch): ... @action @inbatch_parallel(target='threads') def very_complicated_processing(self, item, *args, **kwargs): #   ... return processed_image_as_array 

That is, the method should be written as if it processes one snapshot, and the index of this snapshot is passed in the first parameter.


In order for the parallelism magic to work, the method needs to be wrapped by the decorator, where the parallelism technology is defined (processes, threads, etc.), as well as the pre- and post-processing functions that are called before and after the parallelization.


By the way, it is better to write operations with intensive input-output as async methods and parallelize through target='async' , which will significantly speed up data loading and unloading.


It is clear that this all adds to the convenience of programming, but it does not at all relieve the “ thinking ”, is there any need for parallelism, which one and what will not make it worse?


When all the action methods are written, you can work with the batch:


 for i in range(MAX_ITER): batch = ct_images_dataset.next_batch(BATCH_SIZE, shuffle=True) processed_batch = batch.load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)) .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) #  ,   processed_batch    

It looks good ... but somehow it is wrong that the iteration over the batch is mixed with data processing. Yes, and I want to shorten the cycle of learning the model, so that there is nothing at all except next_batch .


In general, it is necessary to move the chain of action methods to the dataset level.


Pipeline


And it can be done. It’s not for nothing that we have guarded all these action decorators. They contain the cunning magic of transferring methods to the level of dataset. So just write:


 ct_images_pipeline = ct_images_dataset.pipeline(). .load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)). .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) # ... for i in range(MAX_ITER): batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True) #  ,       

You do not need to create a new descendant class Dataset and describe all these methods in it. They are in the corresponding Batch classes and are marked by the @action decorator, @action means you can safely call them as if they were in the Dataset class.


Another trick is that with this approach, all action methods become "lazy" (lazy) and are executed deferred. That is, loading, processing, resizing and other actions are performed for each batch at the moment of the formation of this batch by calling next_batch .


And since the processing of each batch can take a lot of time, it would be nice to form the batch in advance. This is especially important if the model is trained on the GPU, because then a simple GPU, while waiting for a new batch, can easily “eat” all the advantages of its high performance.


 batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True, prefetch=3) 

The prefetch parameter indicates that 3 batch should be considered in parallel. Additionally, you can specify the parallelization technology (processes, threads).


We unite datasets


In real machine learning tasks, you rarely have to deal with a single dataset. Most often you will have at least two data sets: X and Y. For example, data on the parameters of houses and data on their cost. In computer vision tasks, in addition to the images themselves, there are class labels, segmenting masks and bounding boxes.


In general, it is useful to be able to form parallel batches from several datasets. And for this you can perform the join operation or create a JointDataset .


JointDataset


If you need only a parallel iteration of the batches, it will be more convenient to create a single dataset:


 joint_dataset = JointDataset((ds_X, ds_Y)) 

If ds_X and ds_Y are not based on the same index, then it is important that the indexes are of the same length and are equally ordered, that is, the value of ds_Y[i] corresponds to the value of ds_X[i] . In this case, the creation of dataset will look a little different:


 joint_dataset = JointDataset((ds_X, ds_Y), align='order') 

And then everything happens in a completely standard way:


 for i in range(MAX_ITER): batch_X, batch_Y = joint_dataset.next_batch(BATCH_SIZE, shuffle=True) 

Only now next_batch returns not one batch, but a tuple with batches from each dataset.


Naturally, JointDataset can also consist of pipelines:


 pl_images = ct_images_ds.pipeline() .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256,256,256)) .segment_lungs() pl_labels = labels_ds.pipeline() .load('/other/path', 'csv') .apply(lambda x: (x['diagnosis'] == 'C').astype('int')) full_ds = JointDataset((pl_images, pl_labels), align='same') for i in range(MAX_ITER): images_batch, labels_batch = full_ds.next_batch(BATCH_SIZE, shuffle=True) #    ,        

And since the components of the dataset are pipelines, the loading and processing of images and tags is started only by calling next_batch . That is, all calculations are performed and the batch is formed only when it is needed.


Join operation


However, there are other situations when you need to perform an operation with a dataset, applying data from another dataset to it.


It is better to demonstrate this with the example of CT images. We load the coordinates and dimensions of the cancer and form three-dimensional masks from them.


 pl_masks = nodules_ds.pipeline() .load('/other/path', 'csv') .calculate_3d_masks() 

Load CT images and apply masks to them to isolate only cancerous areas.


 pl_images = ct_images_ds.pipeline(). .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256, 256, 256)) .join(pl_masks) .apply_masks(op='mult') 

In join you specify. Due to this, the next action method (in this example, in apply_masks ) will be passed the batch from this dataset as the first argument. And not just any batches, but exactly those that are needed. For example, if the current batch from ct_images_ds contains images 117, 234, 186 and 14, then the attached batch with masks will also apply to images 117, 234, 186 and 14.


Naturally, the apply_masks method should be written with this argument in mind, because it can be passed explicitly, without first join . And in the action method you can no longer think about the indices and identifiers of the elements of the batch - you just apply an array of masks to the image array.


Again, I note that no downloads and calculations, neither with images nor with masks, will be launched until you call pl_images.next_batch


Putting it all together


So, let's see how the full workflow data science of the project will look like.


  1. Create an index and dataset
     ct_images_index = FilesIndex(path='/ct_images_??/*', dirs=True) ct_images_dataset = Dataset(index = ct_images_index, batch_class=CTImagesBatch) 
  2. We perform preprocessing and save the processed images.


     ct_images_dataset.pipeline() .load(None, 'dicom') #     dicom     .hu_normalize() .resize(shape=(256, 256, 256)) .segment_lungs() .save('/preprocessed/images', 'blosc') .run(BATCH_SIZE, shuffle=False, one_pass=True) 

  3. Describe the preparation and augmentation of data for the model.


     ct_preprocessed_index = FilesIndex(path='/preprocessed/images/*') ct_preprocessed_dataset = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) # ct_images_pipeline = ct_preprocessed_dataset.pipeline() .load(None, 'blosc') .split_to_patches(shape=(64, 64, 64)) # ct_masks_ds = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) ct_masks_pipeline = ct_masks_ds.pipeline(). .load('/preprocessed/masks', 'blosc') .split_to_patches(shape=(64, 64, 64)) # full_ds = JointDataset((ct_images_pipeline, ct_masks_pipeline)) 

  4. We form training batches and train the model


     full_ds.cv_split([0.8, 0.2]) for i in range(MAX_ITER): images, masks = full_ds.train.next_batch(BATCH_SIZE, shuffle=True) #  ,       

  5. Check the quality of the model
     for images, masks in full_ds.test.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): #     

This is such a convenient library that helps to develop significantly faster high-quality code, reuse previously created models with complex data preprocessing, and even develop production-ready systems.


And now the question: what else should I add to the library? What are you sorely lacking when working with data and models?


')

Source: https://habr.com/ru/post/326656/


All Articles