Large File Parallel Processing in Python


Image by author

For parallel processing, we divide our task into sub-parts. It increases the number of jobs processed by the program and reduces the overall processing time.

For example, if you are working with a large CSV file and want to modify a single column. We will feed the data as an array to the function and it will process multiple values ​​in parallel based on the number available. workers. These workers are based on the number of cores in your CPU.

Note: Using parallel processing on a smaller database will not improve processing time.

In this blog we will learn how to reduce the processing time of large files by using multiprocessing, workplaceand: thank you Python packages. It is a simple tutorial that can be applied to any file, database, image, video and audio.

Note: We use Kaggle notebook for experiments. Processing time may vary from machine to machine.

We will use the US Accidents (2016 – 2021) database from Kaggle, which consists of 2.8 million records and 47 columns.

We will import multiprocessing, jobliband: tqdm for parallel processing, pandas for data capturesand: re, nltkand: string for text processing.

# Parallel Computing
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm

# Data Ingestion 
import pandas as pd

# Text Processing 
import re 
from nltk.corpus import stopwords
import string

Before we jump right in, let’s sit down n_workers doubling up cpu_count(). As you can see, we have 8 employees.

n_workers = 2 * mp.cpu_count()

print(f"{n_workers} workers are available")

>>> 8 workers are available

In the next step, we will ingest large CSV files using pandas read_csv function. Then print the data frame shape, column name and processing time.

Note: The magical function of Jupiter %%time can display CPU times and: wall time at the end of the process.

%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)

print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

Result:

Shape:(2845342, 47)

Column Names:

Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
'Astronomical_Twilight'],
dtype="object")

CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s

It clean_text is a simple text processing and cleaning function. We will get English stops using nltk.copus use it to filter period words from a text string. After that, we will remove special characters and extra spaces from the sentence. That will be the baseline function that should determine the processing time series, paralleland: batch processing.

def clean_text(text): 
  # Remove stop words
  stops = stopwords.words("english")
  text = " ".join([word for word in text.split() if word not in stops])
  # Remove Special Characters
  text = text.translate(str.maketrans('', '', string.punctuation))
  # removing the extra spaces
  text = re.sub(' +',' ', text)
  return text

For serialization we can use pandas .apply() feature, but if you want to see the progress bar, you need to activate it thank you for pandas and then use it .progress_apply() function.

We are going to process the 2.8 million records and store the result in the Description column.

%%time
tqdm.pandas()

df['Description'] = df['Description'].progress_apply(clean_text)

Result:

It took 9 minutes and 5 seconds high class CPU-to-serial processing 2.8 million lines.

100%  2845342/2845342 [09:05<00:00, 5724.25it/s]

CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
Wall time: 9min 5s

There are different ways to parallelize a file and we are going to learn about them all. It multiprocessing is a built-in python package commonly used for parallel processing of large files.

We will create multiprocessing Swimming pool with 8 employees and use map Function to start the process. To display progress bars, we use thank you.

The map function consists of two sections. The first requires a function, and the second requires an argument or a list of arguments.

Learn more by reading the documentation.

%%time
p = mp.Pool(n_workers) 

df['Description'] = p.map(clean_text,tqdm(df['Description']))

Result:

We have almost improved our processing time 3x. Processing time is reduced 9 minutes 5 seconds to: 3 minutes 51 seconds.

100%  2845342/2845342 [02:58<00:00, 135646.12it/s]

CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s

Now we will learn about another Python package to do parallel processing. In this section we will use joblib Parallel and: late to copy map function.

  • Parallel requires two arguments: n_jobs = 8 and backend = multiprocessing.
  • Then we will add clear_text: to: late function.
  • Create a loop to feed one value at a time.

The process below is quite general and you can modify your function and array according to your needs. I have used it to process thousands of audio and video files without any problems.

It is recommended. add exception handling using try: and: except:

def text_parallel_clean(array):
  result = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(clean_text)
  (text) 
  for text in tqdm(array)
  )
  return result

Add a “Description” column text_parallel_clean().

%%time
df['Description'] = text_parallel_clean(df['Description'])

Result:

Our function took 13 seconds longer than multiprocessing Swimming pool. Even then, Parallel 4 minutes 59 seconds faster than series processing.

100%  2845342/2845342 [04:03<00:00, 10514.98it/s]

CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s

There is a better way to process large files by splitting them into batches and processing them in parallel. Let’s start by creating a batch function that will run a clean_function on one set of values.

Batch processing function

def proc_batch(batch):
  return [
  clean_text(text)
  for text in batch
  ]

Splitting a file into batches

The function below will split the file into multiple batches based on the number of workers. In our case, we get 8 batches.

def batch_file(array,n_workers):
  file_len = len(array)
  batch_size = round(file_len / n_workers)
  batches = [
  array[ix:ix+batch_size]
  for ix in tqdm(range(0, file_len, batch_size))
  ]
  return batches

batches = batch_file(df['Description'],n_workers)

>>> 100%  8/8 [00:00<00:00, 280.01it/s]

Parallel batch processing

Finally, we will use Parallel and: late to process batches.

Note: To get a single array of values, we need to run the list comprehension as shown below.

%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(proc_batch)
  (batch) 
  for batch in tqdm(batches)
  )

df['Description'] = [j for i in batch_output for j in i]

Result:

We have improved the processing time. This technique is known for processing complex data and training deep learning models.

100%  8/8 [00:00<00:00, 2.19it/s]

CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s

tqdm takes multiprocessing to the next level. It’s simple and powerful. I would recommend it to every data scientist.

Check the documentation to learn more about multiprocessing.

It process_map requires.

  1. Function Name:
  2. A data frame column
  3. max_workers:
  4. chucksize is similar to batch size. We will calculate the batch size using the number of workers, or you can increase the number based on your preference.
%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)

df["Description"] = process_map(
    clean_text, df["Description"], max_workers=n_workers, chunksize=batch
)

Result:

With one line of code we get the best result.

100%  2845342/2845342 [03:48<00:00, 1426320.93it/s]

CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s

You have to find a balance and choose the technique that best suits your case. It can be serial, parallel or batch processing. Parallel processing can backfire if you’re working with a smaller, less complex database.

In this mini-tutorial, we learned about various Python packages and techniques that allow us to parallelize our data functions.

If you only work with a tabular database and want to improve your processing efficiency, then I would suggest you to try Dask, DataTable and RAPIDS.

Reference

Abid Ali Awan (@1abidaliawan:) is a certified data scientist who loves building machine learning models. He currently focuses on content creation and writes technical blogs on machine learning and data science technologies. Abid holds an MSc in Technology Management and a BS in Telecommunications Engineering. His vision is to create an AI product using a graph neural network for students struggling with mental illness.



Source link