PyTorch এর DataLoader ( torch.utils.data.Dataloader
) ইতিমধ্যেই ডিপ লার্নিং মডেলের প্রশিক্ষণের জন্য দক্ষতার সাথে ডেটা লোড এবং প্রিপ্রসেস করার জন্য একটি দরকারী টুল। ডিফল্টরূপে, PyTorch একটি একক-কর্মী প্রক্রিয়া ব্যবহার করে ( num_workers=0
), কিন্তু ব্যবহারকারীরা সমান্তরালতা লাভ করতে এবং ডেটা লোডিংকে গতি বাড়ানোর জন্য একটি উচ্চ সংখ্যা নির্দিষ্ট করতে পারে।
যাইহোক, যেহেতু এটি একটি সাধারণ-উদ্দেশ্য ডেটালোডার, এবং যদিও এটি সমান্তরালতা প্রদান করে, এটি এখনও নির্দিষ্ট কাস্টম ব্যবহারের ক্ষেত্রে উপযুক্ত নয়। এই পোস্টে, আমরা কীভাবে torch.multiprocessing()
ব্যবহার করে 3D মেডিকেল স্ক্যানের ডেটাসেট থেকে একাধিক 2D স্লাইস লোড করার গতি বাড়াতে পারি তা অন্বেষণ করি।
torch.utils.data.Dataset
আমি একটি ব্যবহারের ক্ষেত্রে কল্পনা করি যেখানে রোগীদের জন্য 3D স্ক্যানের একটি সেট দেওয়া হয়েছে (যেমন, P1, P2, P3, …) এবং সংশ্লিষ্ট স্লাইসের একটি তালিকা; আমাদের লক্ষ্য হল একটি ডেটালোডার তৈরি করা যা প্রতিটি পুনরাবৃত্তিতে একটি স্লাইস আউটপুট করে । নীচের পাইথন কোডটি পরীক্ষা করুন যেখানে আমরা myDataset
নামে একটি টর্চ ডেটাসেট তৈরি করি এবং এটিকে torch.utils.data.Dataloader()
এ পাস করি।
# check full code here: https://gist.github.com/prerakmody/0c5e9263d42b2fab26a48dfb6b818cca#file-torchdataloader-py import tqdm import time import torch # v1.12.1 import numpy as np ################################################## # myDataset ################################################## def getPatientArray(patientName): # return patients 3D scan def getPatientSliceArray(patientName, sliceId, patientArray=None): # return patientArray and a slice class myDataset(torch.utils.data.Dataset): def __init__(self, patientSlicesList, patientsInMemory=1): ... self.patientObj = {} # To store one patients 3D array. More patients lead to more memory usage. def _managePatientObj(self, patientName): if len(self.patientObj) > self.patientsInMemory: self.patientObj.pop(list(self.patientObj.keys())[0]) def __getitem__(self, idx): # Step 0 - Init patientName, sliceId = ... # Step 1 - Get patient slice array patientArrayThis = self.patientObj.get(patientName, None) patientArray, patientSliceArray = getPatientSliceArray(patientName, sliceId, patientArray=patientArrayThis) if patientArray is not None: self.patientObj[patientName] = patientArray self._managePatientObj(patientName) return patientSliceArray, [patientName, sliceId] ################################################## # Main ################################################## if __name__ == '__main__': # Step 1 - Setup patient slices (fixed count of slices per patient) patientSlicesList = { 'P1': [45, 62, 32, 21, 69] , 'P2': [13, 23, 87, 54, 5] , 'P3': [34, 56, 78, 90, 12] , 'P4': [34, 56, 78, 90, 12] } workerCount, batchSize, epochs = 4, 1, 3 # Step 2.1 - Create dataset and dataloader dataset = myDataset(patientSlicesList) dataloader = torch.utils.data.DataLoader(dataset, batch_size=3, num_workers=4) # Step 2.2 - Iterate over dataloader print ('\n - [main] Iterating over (my) dataloader...') for epochId in range(epochs): print (' - [main] --------------------------------------- Epoch {}/{}'.format(epochId+1, epochs)) for i, (patientSliceArray, meta) in enumerate(dataloader): print (' - [main] meta: ', meta) pbar.update(patientSliceArray.shape[0])
আমাদের ব্যবহারের ক্ষেত্রে প্রধান উদ্বেগ হল যে 3D মেডিকেল স্ক্যানগুলি আকারে বড় ( এখানে time.sleep()
অপারেশন দ্বারা অনুকরণ করা হয়েছে) এবং তাই
ডিস্ক থেকে তাদের পড়া সময় নিবিড় হতে পারে
এবং বেশিরভাগ ক্ষেত্রে 3D স্ক্যানের একটি বড় ডেটাসেট মেমরিতে আগে থেকে পড়া যায় না
আদর্শভাবে, আমাদের প্রতিটি রোগীর স্ক্যানের সাথে যুক্ত সমস্ত স্লাইসের জন্য একবার পড়া উচিত। কিন্তু যেহেতু ডেটা torch.utils.data.dataloader(myDataset, batch_size=b, workers=n)
দ্বারা ব্যাচের আকারের উপর নির্ভর করে কর্মীদের মধ্যে বিভক্ত করা হয়, তাই বিভিন্ন কর্মীদের জন্য রোগীকে দুবার পড়ার সম্ভাবনা রয়েছে ( চিত্রটি পরীক্ষা করুন এবং লগ দেখুন নিচে ).
- [main] Iterating over (my) dataloader... - [main] --------------------------------------- Epoch 1/3 - [getPatientArray()][worker=3] Loading volumes for patient: P2 - [getPatientArray()][worker=1] Loading volumes for patient: P1 - [getPatientArray()][worker=2] Loading volumes for patient: P2 - [getPatientArray()][worker=0] Loading volumes for patient: P1 - [getPatientArray()][worker=3] Loading volumes for patient: P3 - [main] meta: [('P1', 'P1', 'P1'), tensor([45, 62, 32])] - [getPatientArray()][worker=1] Loading volumes for patient: P2 - [main] meta: [('P1', 'P1', 'P2'), tensor([21, 69, 13])] - [main] meta: [('P2', 'P2', 'P2'), tensor([23, 87, 54])] - [main] meta: [('P2', 'P3', 'P3'), tensor([ 5, 34, 56])] - [getPatientArray()][worker=2] Loading volumes for patient: P4 - [getPatientArray()][worker=0] Loading volumes for patient: P3 - [getPatientArray()][worker=1] Loading volumes for patient: P4 - [main] meta: [('P3', 'P3', 'P3'), tensor([78, 90, 12])] - [main] meta: [('P4', 'P4', 'P4'), tensor([34, 56, 78])] - [main] meta: [('P4', 'P4'), tensor([90, 12])]
সংক্ষেপে, এখানে torch.utils.data.Dataloader
এর বিদ্যমান বাস্তবায়নের সমস্যাগুলি রয়েছে
myDataset()
এর একটি অনুলিপি দেওয়া হয় (রেফারেন্স:
patientSliceList
উপর লুপ করে ( নীচের ছবিটি দেখুন ), (পেশেন্টআইডি, স্লাইসআইডি) কম্বোগুলির মধ্যে কোনও প্রাকৃতিক পরিবর্তন সম্ভব নয়। ( দ্রষ্টব্য: কেউ এলোমেলো করতে পারে, তবে এতে মেমরিতে আউটপুট সংরক্ষণ করা জড়িত )
দ্রষ্টব্য: একজন প্রতিটি রোগীর 3D স্ক্যান থেকে একসাথে একগুচ্ছ স্লাইস ফেরত দিতে পারে। কিন্তু আমরা যদি স্লাইস-নির্ভর 3D অ্যারেও ফেরত দিতে চাই (উদাহরণস্বরূপ, ইন্টারেক্টিভ রিফাইনমেন্ট নেটওয়ার্ক ( এই কাজের চিত্র 1 দেখুন ), তাহলে এটি আপনার ডেটালোডারের মেমরি পদচিহ্নকে ব্যাপকভাবে বৃদ্ধি করে।
torch.multiprocessing
ব্যবহার করেরোগীর স্ক্যানের একাধিক পঠন প্রতিরোধ করার জন্য, আমাদের আদর্শভাবে প্রতিটি রোগীর ( আসুন 8 জন রোগীকে কল্পনা করা যাক ) একটি নির্দিষ্ট কর্মীর দ্বারা পড়তে হবে।
এটি অর্জনের জন্য, আমরা টর্চ ডেটালোডার ক্লাসের মতো একই অভ্যন্তরীণ সরঞ্জাম ব্যবহার করি (অর্থাৎ, torch.multiprocessing()
) কিন্তু সামান্য পার্থক্যের সাথে। আমাদের কাস্টম ডেটালোডার - myDataloader
এর জন্য নিচের ওয়ার্কফ্লো চিত্র এবং কোডটি দেখুন
# check full code here: https://gist.github.com/prerakmody/0c5e9263d42b2fab26a48dfb6b818cca#file-mydataloader-py class myDataloader: def __init__(self, patientSlicesList, numWorkers, batchSize) -> None: ... self._initWorkers() def _initWorkers(self): # Step 1 - Initialize vas self.workerProcesses = [] self.workerInputQueues = [torchMP.Queue() for _ in range(self.numWorkers)] self.workerOutputQueue = torchMP.Queue() for workerId in range(self.numWorkers): p = torchMP.Process(target=getSlice, args=(workerId, self.workerInputQueues[workerId], self.workerOutputQueue)) p.start() def fillInputQueues(self): """ This function allows to split patients and slices across workers. One can implement custom logic here. """ patientNames = list(self.patientSlicesList.keys()) for workerId in range(self.numWorkers): idxs = ... for patientName in patientNames[idxs]: for sliceId in self.patientSlicesList[patientName]: self.workerInputQueues[workerId].put((patientName, sliceId)) def emptyAllQueues(self): # empties the self.workerInputQueues and self.workerOutputQueue def __iter__(self): try: # Step 0 - Init self.fillInputQueues() # once for each epoch batchArray, batchMeta = [], [] # Step 1 - Continuously yield results while True: if not self.workerOutputQueue.empty(): # Step 2.1 - Get data point patientSliceArray, patientName, sliceId = self.workerOutputQueue.get(timeout=QUEUE_TIMEOUT) # Step 2.2 - Append to batch ... # Step 2.3 - Yield batch if len(batchArray) == self.batchSize: batchArray = collate_tensor_fn(batchArray) yield batchArray, batchMeta batchArray, batchMeta = [], [] # Step 3 - End condition if np.all([self.workerInputQueues[i].empty() for i in range(self.numWorkers)]) and self.workerOutputQueue.empty(): break except GeneratorExit: self.emptyAllQueues() except KeyboardInterrupt: self.closeProcesses() except: traceback.print_exc() def closeProcesses(self): pass if __name__ == "__main__": # Step 1 - Setup patient slices (fixed count of slices per patient) patientSlicesList = { 'P1': [45, 62, 32, 21, 69] , 'P2': [13, 23, 87, 54, 5] , 'P3': [34, 56, 78, 90, 12] , 'P4': [34, 56, 78, 90, 12] , 'P5': [45, 62, 32, 21, 69] , 'P6': [13, 23, 87, 54, 5] , 'P7': [34, 56, 78, 90, 12] , 'P8': [34, 56, 78, 90, 12, 21] } workerCount, batchSize, epochs = 4, 1, 3 # Step 2 - Create new dataloader dataloaderNew = None try: dataloaderNew = myDataloader(patientSlicesList, numWorkers=workerCount, batchSize=batchSize) print ('\n - [main] Iterating over (my) dataloader...') for epochId in range(epochs): with tqdm.tqdm(total=len(dataset), desc=' - Epoch {}/{}'.format(epochId+1, epochs)) as pbar: for i, (X, meta) in enumerate(dataloaderNew): print (' - [main] {}'.format(meta.tolist())) pbar.update(X.shape[0]) dataloaderNew.closeProcesses() except KeyboardInterrupt: if dataloader is not None: dataloader.closeProcesses() except: traceback.print_exc() if dataloaderNew is not None: dataloaderNew.closeProcesses()
উপরের স্নিপেটে ( এর পরিবর্তে 8 জন রোগী আছে ) নিম্নলিখিত ফাংশন ধারণ করে
__iter__()
- যেহেতু myDataloader()
একটি লুপ, এটি আসলে এই ফাংশনটি লুপ করে।
_initWorkers()
- এখানে, আমরা আমাদের কর্মী প্রক্রিয়া তৈরি করি তাদের পৃথক ইনপুট সারি workerInputQueues[workerId]
দিয়ে। এটি বলা হয় যখন ক্লাস শুরু হয়।
fillInputQueues()
- এই ফাংশনটিকে বলা হয় যখন আমরা লুপ শুরু করি ( প্রত্যেক যুগের শুরুতে )। এটি পৃথক কর্মীর ইনপুট সারি পূরণ করে।
getSlice()
- এটি হল প্রধান লজিক ফাংশন যা রোগীর ভলিউম থেকে একটি স্লাইস ফেরত দেয়। এখানে কোড চেক করুন.
collate_tensor_fn()
- এই ফাংশনটি সরাসরি টর্চ রেপো - torchv1.12.0 থেকে কপি করা হয় এবং ডেটা একসাথে ব্যাচ করতে ব্যবহৃত হয়।আমাদের ডেটালোডার ডিফল্ট বিকল্পের তুলনায় গতির অফার করে কিনা তা পরীক্ষা করতে, আমরা বিভিন্ন কর্মী গণনা ব্যবহার করে প্রতিটি ডেটালোডার লুপের গতি পরীক্ষা করি। আমরা আমাদের পরীক্ষায় দুটি পরামিতি পরিবর্তন করেছি:
আমরা প্রথমে আমাদের খেলনা ডেটাসেট নিয়ে পরীক্ষা করি এবং দেখি যে আমাদের ডেটালোডার অনেক দ্রুত কাজ করে। নীচের চিত্রটি দেখুন (বা এই কোডটি দিয়ে পুনরুত্পাদন করুন)
এখানে, আমরা নিম্নলিখিত দেখতে পারেন
patientSlicesList
ভেরিয়েবলে প্রতি রোগীর জন্য 5টি স্লাইস থাকে। সুতরাং, ব্যাচের শেষ সূচকে যোগ করার জন্য কর্মীকে দ্বিতীয় রোগী পড়ার জন্য অপেক্ষা করতে হবে। তারপরে আমরা একটি বাস্তব ডেটাসেট বেঞ্চমার্ক করি যেখানে 3D স্ক্যান লোড করা হয়, একটি স্লাইস বের করা হয়,
আমরা তা পর্যবেক্ষণ করেছি
আমরা বিভিন্ন কর্মী সংখ্যার সাথে ডেটা লোডিংয়ের সময় সম্পদের ব্যবহারও পর্যবেক্ষণ করেছি। অধিক সংখ্যক কর্মীদের সাথে, আমরা CPU এবং মেমরি ব্যবহার বৃদ্ধি লক্ষ্য করেছি, যা অতিরিক্ত প্রক্রিয়া দ্বারা প্রবর্তিত সমান্তরালতার কারণে প্রত্যাশিত। সর্বোত্তম কর্মী গণনা নির্বাচন করার সময় ব্যবহারকারীদের তাদের হার্ডওয়্যার সীমাবদ্ধতা এবং সংস্থান প্রাপ্যতা বিবেচনা করা উচিত।
এই ব্লগ পোস্টে, আমরা বড় 3D মেডিকেল স্ক্যান সমন্বিত ডেটাসেটগুলির সাথে কাজ করার সময় PyTorch-এর স্ট্যান্ডার্ড ডেটালোডারের সীমাবদ্ধতাগুলি অন্বেষণ করেছি এবং ডেটা লোডিং দক্ষতা উন্নত করতে torch.multiprocessing
ব্যবহার করে একটি কাস্টম সমাধান উপস্থাপন করেছি৷
এই 3D মেডিকেল স্ক্যানগুলি থেকে স্লাইস নিষ্কাশনের প্রেক্ষাপটে, ডিফল্ট ডেটালোডার সম্ভাব্যভাবে একই রোগীর স্ক্যানের একাধিক রিড হতে পারে কারণ কর্মীরা মেমরি ভাগ করে না। এই অপ্রয়োজনীয়তা উল্লেখযোগ্য বিলম্ব ঘটায়, বিশেষ করে যখন বড় ডেটাসেট নিয়ে কাজ করা হয়।
আমাদের কাস্টম ডেটালোডার রোগীদের কর্মীদের মধ্যে বিভক্ত করে, প্রতিটি 3D স্ক্যান প্রতি কর্মী প্রতি একবার পড়া হয় তা নিশ্চিত করে। এই পদ্ধতিটি অপ্রয়োজনীয় ডিস্ক রিডকে বাধা দেয় এবং ডেটা লোড করার গতি বাড়াতে সমান্তরাল প্রক্রিয়াকরণের সুবিধা দেয়।
কর্মক্ষমতা পরীক্ষায় দেখা গেছে যে আমাদের কাস্টম ডেটালোডার সাধারণত স্ট্যান্ডার্ড ডেটালোডারকে ছাড়িয়ে যায়, বিশেষ করে ছোট ব্যাচের আকার এবং একাধিক কর্মী প্রক্রিয়ার সাথে।
আমাদের কাস্টম ডেটালোডার অপ্রয়োজনীয় পাঠ হ্রাস করে এবং সমান্তরালতাকে সর্বাধিক করে বড় 3D মেডিকেল ডেটাসেটের জন্য ডেটা লোডিং দক্ষতা বাড়ায়। এই উন্নতির ফলে দ্রুত প্রশিক্ষণের সময় এবং হার্ডওয়্যার সংস্থানগুলির আরও ভাল ব্যবহার হতে পারে।
এই ব্লগটি আমার সহকর্মী জিংনান জিয়ার সাথে একসাথে লেখা হয়েছিল।