I am trying to do calculation and write it to another txt file using multiprocessing program. I am getting count mismatch in output txt file. every time execute I am getting different output count.
I am new to python could some one please help.
import pandas as pd import multiprocessing as mp source = "\\share\usr\data.txt" target = "\\share\usr\data_masked.txt" Chunk = 10000 def process_calc(df): ''' get source df do calc and return newdf ... ''' return(newdf) def calc_frame(df): output_df = process_calc(df) output_df.to_csv(target,index=None,sep='|',mode='a',header=False) if __name__ == '__main__': reader= pd.read_table(source,sep='|',chunksize = chunk,encoding='ANSI') pool = mp.Pool(mp.cpu_count()) jobs =  for each_df in reader: process = mp.Process(target=calc_frame,args=(each_df) jobs.append(process) process.start() for j in jobs: j.join()
You have several issues in your source as posted that would prevent it from even compiling let alone running. I have attempted to correct those in an effort to also solving your main problem. But do check the code below thoroughly just to make sure the corrections make sense.
First, the args argument to the
Process constructor should be specified as a
tuple. You have specified
(each_df) is not a
tuple, it is a simple parenthesized expression; you need
(each_df,) to make if a
tuple (the statement is also missing a closing parentheses).
The problem you have in addition to making no provision against multiple processes simultaneously attempting to append to the same file is that you cannot be assured of the order in which the processes complete and thus you have no real control over the order in which the dataframes will be appended to the csv file.
The solution is to use a processing pool with the
imap method. The iterable to pass to this method is just the
reader, which when iterated returns the next dataframe to process. The return value from
imap is an iterable that when iterated will return the next return value from
calc_frame in task-submission order, i.e. the same order that the dataframes were submitted. So as these new, modified dataframes are returned, the main process can simply append these to the output file one by one:
import pandas as pd import multiprocessing as mp source = r"\\share\usr\data.txt" target = r"\\share\usr\data_masked.txt" Chunk = 10000 def process_calc(df): ''' get source df do calc and return newdf ... ''' return(newdf) def calc_frame(df): output_df = process_calc(df) return output_df if __name__ == '__main__': with mp.Pool() as pool: reader = pd.read_table(source, sep='|', chunksize=Chunk, encoding='ANSI') for output_df in pool.imap(process_calc, reader): output_df.to_csv(target, index=None, sep='|', mode='a', header=False)
Answered By – Booboo