PyTorch의 DataLoader( torch.utils.data.Dataloader
)는 이미 딥 러닝 모델 훈련을 위해 데이터를 효율적으로 로드하고 전처리하는 데 유용한 도구입니다. 기본적으로 PyTorch는 단일 작업자 프로세스 ( num_workers=0
)를 사용하지만 사용자는 병렬성을 활용하고 데이터 로딩 속도를 높이기 위해 더 높은 숫자를 지정할 수 있습니다.
그러나 범용 데이터로더이고 병렬화를 제공하더라도 특정 사용자 정의 사용 사례에는 여전히 적합하지 않습니다. 이 게시물에서는 torch.multiprocessing()
사용하여 3D 의료 스캔 데이터세트에서 여러 2D 슬라이스의 로드 속도를 높이는 방법을 살펴봅니다.
torch.utils.data.Dataset
나는 환자에 대한 3D 스캔 세트(예: P1, P2, P3 등)와 해당 슬라이스 목록이 있는 사용 사례를 상상합니다. 우리의 목표는 모든 반복에서 슬라이스를 출력하는 데이터로더를 구축하는 것입니다. myDataset
라는 토치 데이터세트를 빌드하고 이를 torch.utils.data.Dataloader()
에 전달하는 아래 Python 코드를 확인하세요.
# check full code here: https://gist.github.com/prerakmody/0c5e9263d42b2fab26a48dfb6b818cca#file-torchdataloader-py import tqdm import time import torch # v1.12.1 import numpy as np ################################################## # myDataset ################################################## def getPatientArray(patientName): # return patients 3D scan def getPatientSliceArray(patientName, sliceId, patientArray=None): # return patientArray and a slice class myDataset(torch.utils.data.Dataset): def __init__(self, patientSlicesList, patientsInMemory=1): ... self.patientObj = {} # To store one patients 3D array. More patients lead to more memory usage. def _managePatientObj(self, patientName): if len(self.patientObj) > self.patientsInMemory: self.patientObj.pop(list(self.patientObj.keys())[0]) def __getitem__(self, idx): # Step 0 - Init patientName, sliceId = ... # Step 1 - Get patient slice array patientArrayThis = self.patientObj.get(patientName, None) patientArray, patientSliceArray = getPatientSliceArray(patientName, sliceId, patientArray=patientArrayThis) if patientArray is not None: self.patientObj[patientName] = patientArray self._managePatientObj(patientName) return patientSliceArray, [patientName, sliceId] ################################################## # Main ################################################## if __name__ == '__main__': # Step 1 - Setup patient slices (fixed count of slices per patient) patientSlicesList = { 'P1': [45, 62, 32, 21, 69] , 'P2': [13, 23, 87, 54, 5] , 'P3': [34, 56, 78, 90, 12] , 'P4': [34, 56, 78, 90, 12] } workerCount, batchSize, epochs = 4, 1, 3 # Step 2.1 - Create dataset and dataloader dataset = myDataset(patientSlicesList) dataloader = torch.utils.data.DataLoader(dataset, batch_size=3, num_workers=4) # Step 2.2 - Iterate over dataloader print ('\n - [main] Iterating over (my) dataloader...') for epochId in range(epochs): print (' - [main] --------------------------------------- Epoch {}/{}'.format(epochId+1, epochs)) for i, (patientSliceArray, meta) in enumerate(dataloader): print (' - [main] meta: ', meta) pbar.update(patientSliceArray.shape[0])
우리 사용 사례의 주요 관심사는 3D 의료 스캔의 크기가 크다는 것입니다(여기에서는 time.sleep()
작업 으로 에뮬레이션됨 ).
디스크에서 읽는 데 시간이 많이 걸릴 수 있습니다.
대부분의 경우 대규모 3D 스캔 데이터 세트를 메모리로 미리 읽을 수 없습니다.
이상적으로는 관련된 모든 슬라이스에 대해 각 환자 스캔을 한 번만 읽어야 합니다. 하지만 데이터는 torch.utils.data.dataloader(myDataset, batch_size=b, workers=n)
에 의해 배치 크기에 따라 작업자로 분할되므로 여러 작업자가 환자를 두 번 읽을 가능성이 있습니다( 이미지 및 로그 확인) 아래에 ).
- [main] Iterating over (my) dataloader... - [main] --------------------------------------- Epoch 1/3 - [getPatientArray()][worker=3] Loading volumes for patient: P2 - [getPatientArray()][worker=1] Loading volumes for patient: P1 - [getPatientArray()][worker=2] Loading volumes for patient: P2 - [getPatientArray()][worker=0] Loading volumes for patient: P1 - [getPatientArray()][worker=3] Loading volumes for patient: P3 - [main] meta: [('P1', 'P1', 'P1'), tensor([45, 62, 32])] - [getPatientArray()][worker=1] Loading volumes for patient: P2 - [main] meta: [('P1', 'P1', 'P2'), tensor([21, 69, 13])] - [main] meta: [('P2', 'P2', 'P2'), tensor([23, 87, 54])] - [main] meta: [('P2', 'P3', 'P3'), tensor([ 5, 34, 56])] - [getPatientArray()][worker=2] Loading volumes for patient: P4 - [getPatientArray()][worker=0] Loading volumes for patient: P3 - [getPatientArray()][worker=1] Loading volumes for patient: P4 - [main] meta: [('P3', 'P3', 'P3'), tensor([78, 90, 12])] - [main] meta: [('P4', 'P4', 'P4'), tensor([34, 56, 78])] - [main] meta: [('P4', 'P4'), tensor([90, 12])]
요약하자면, torch.utils.data.Dataloader
의 기존 구현과 관련된 문제는 다음과 같습니다.
myDataset()
의 복사본이 전달됩니다(참조:
patientSliceList
( 아래 이미지 참조 )를 순차적으로 반복하므로 (patientId, SliceId) 콤보 간에 자연스러운 순서 섞기가 불가능합니다. ( 참고: 섞을 수 있지만 여기에는 출력을 메모리에 저장하는 작업이 포함됩니다 )
참고: 각 환자의 3D 스캔에서 여러 조각을 함께 반환할 수도 있습니다. 그러나 슬라이스 종속 3D 배열도 반환하려는 경우(예: 대화형 미세 조정 네트워크( 이 작업의 그림 1 참조 )) 이로 인해 데이터로더의 메모리 공간이 크게 늘어납니다.
torch.multiprocessing
사용환자 스캔을 여러 번 읽는 것을 방지 하려면 특정 작업자가 각 환자( 8명의 환자를 상상해 보자 )를 읽는 것이 이상적으로 필요합니다.
이를 달성하기 위해 우리는 토치 데이터로더 클래스(예: torch.multiprocessing()
)와 동일한 내부 도구를 사용하지만 약간의 차이가 있습니다. 사용자 정의 데이터로더인 myDataloader
에 대한 워크플로 그림과 코드를 아래에서 확인하세요.
# check full code here: https://gist.github.com/prerakmody/0c5e9263d42b2fab26a48dfb6b818cca#file-mydataloader-py class myDataloader: def __init__(self, patientSlicesList, numWorkers, batchSize) -> None: ... self._initWorkers() def _initWorkers(self): # Step 1 - Initialize vas self.workerProcesses = [] self.workerInputQueues = [torchMP.Queue() for _ in range(self.numWorkers)] self.workerOutputQueue = torchMP.Queue() for workerId in range(self.numWorkers): p = torchMP.Process(target=getSlice, args=(workerId, self.workerInputQueues[workerId], self.workerOutputQueue)) p.start() def fillInputQueues(self): """ This function allows to split patients and slices across workers. One can implement custom logic here. """ patientNames = list(self.patientSlicesList.keys()) for workerId in range(self.numWorkers): idxs = ... for patientName in patientNames[idxs]: for sliceId in self.patientSlicesList[patientName]: self.workerInputQueues[workerId].put((patientName, sliceId)) def emptyAllQueues(self): # empties the self.workerInputQueues and self.workerOutputQueue def __iter__(self): try: # Step 0 - Init self.fillInputQueues() # once for each epoch batchArray, batchMeta = [], [] # Step 1 - Continuously yield results while True: if not self.workerOutputQueue.empty(): # Step 2.1 - Get data point patientSliceArray, patientName, sliceId = self.workerOutputQueue.get(timeout=QUEUE_TIMEOUT) # Step 2.2 - Append to batch ... # Step 2.3 - Yield batch if len(batchArray) == self.batchSize: batchArray = collate_tensor_fn(batchArray) yield batchArray, batchMeta batchArray, batchMeta = [], [] # Step 3 - End condition if np.all([self.workerInputQueues[i].empty() for i in range(self.numWorkers)]) and self.workerOutputQueue.empty(): break except GeneratorExit: self.emptyAllQueues() except KeyboardInterrupt: self.closeProcesses() except: traceback.print_exc() def closeProcesses(self): pass if __name__ == "__main__": # Step 1 - Setup patient slices (fixed count of slices per patient) patientSlicesList = { 'P1': [45, 62, 32, 21, 69] , 'P2': [13, 23, 87, 54, 5] , 'P3': [34, 56, 78, 90, 12] , 'P4': [34, 56, 78, 90, 12] , 'P5': [45, 62, 32, 21, 69] , 'P6': [13, 23, 87, 54, 5] , 'P7': [34, 56, 78, 90, 12] , 'P8': [34, 56, 78, 90, 12, 21] } workerCount, batchSize, epochs = 4, 1, 3 # Step 2 - Create new dataloader dataloaderNew = None try: dataloaderNew = myDataloader(patientSlicesList, numWorkers=workerCount, batchSize=batchSize) print ('\n - [main] Iterating over (my) dataloader...') for epochId in range(epochs): with tqdm.tqdm(total=len(dataset), desc=' - Epoch {}/{}'.format(epochId+1, epochs)) as pbar: for i, (X, meta) in enumerate(dataloaderNew): print (' - [main] {}'.format(meta.tolist())) pbar.update(X.shape[0]) dataloaderNew.closeProcesses() except KeyboardInterrupt: if dataloader is not None: dataloader.closeProcesses() except: traceback.print_exc() if dataloaderNew is not None: dataloaderNew.closeProcesses()
위의 스니펫( 대신 8명의 환자 포함 )에는 다음 기능이 포함되어 있습니다.
__iter__()
- myDataloader()
는 루프이므로 이것이 실제로 루프를 반복하는 함수입니다.
_initWorkers()
- 여기서는 개별 입력 큐인 workerInputQueues[workerId]
사용하여 작업자 프로세스를 생성합니다. 클래스가 초기화될 때 호출됩니다.
fillInputQueues()
- 이 함수는 루프를 시작할 때( 기본적으로 모든 에포크가 시작될 때 ) 호출됩니다. 개별 작업자의 입력 대기열을 채웁니다.
getSlice()
- 이는 환자 볼륨에서 슬라이스를 반환하는 기본 논리 함수입니다. 여기에서 코드를 확인하세요.
collate_tensor_fn()
- 이 함수는 토치 저장소인 torchv1.12.0 에서 직접 복사되며 데이터를 함께 일괄 처리하는 데 사용됩니다.데이터로더가 기본 옵션에 비해 속도 향상을 제공하는지 테스트하기 위해 다양한 작업자 수를 사용하여 각 데이터로더 루프의 속도를 테스트합니다. 우리는 실험에서 두 가지 매개변수를 변경했습니다.
먼저 장난감 데이터 세트를 실험하고 데이터 로더가 훨씬 빠르게 작동하는 것을 확인했습니다. 아래 그림을 참조하세요(또는 이 코드 로 재현).
여기서 우리는 다음을 볼 수 있습니다
patientSlicesList
변수에 환자당 5개의 슬라이스가 포함되어 있기 때문입니다. 따라서 작업자는 배치의 마지막 인덱스에 추가할 두 번째 환자를 읽을 때까지 기다려야 합니다. 그런 다음 3D 스캔이 로드되고 슬라이스가 추출되는 실제 데이터 세트를 벤치마킹합니다.
우리는 그것을 관찰했습니다
또한 다양한 작업자 수를 사용하여 데이터를 로드하는 동안 리소스 활용도를 모니터링했습니다. 작업자 수가 많을수록 CPU 및 메모리 사용량이 증가하는 것으로 나타났습니다. 이는 추가 프로세스에 의해 도입된 병렬 처리로 인해 예상되는 현상입니다. 사용자는 최적의 작업자 수를 선택할 때 하드웨어 제약 조건과 리소스 가용성을 고려해야 합니다.
이 블로그 게시물에서는 대규모 3D 의료 스캔이 포함된 데이터 세트를 처리할 때 PyTorch의 표준 DataLoader의 한계를 살펴보고 데이터 로딩 효율성을 향상시키기 위해 torch.multiprocessing
사용하는 맞춤형 솔루션을 제시했습니다.
이러한 3D 의료 스캔에서 슬라이스를 추출하는 경우 작업자가 메모리를 공유하지 않기 때문에 기본 dataLoader가 잠재적으로 동일한 환자 스캔을 여러 번 읽을 수 있습니다. 이러한 중복으로 인해 특히 대규모 데이터 세트를 처리할 때 상당한 지연이 발생합니다.
우리의 맞춤형 dataLoader는 작업자 간에 환자를 분할하여 각 3D 스캔이 작업자당 한 번만 읽히도록 보장합니다. 이 접근 방식은 중복된 디스크 읽기를 방지하고 병렬 처리를 활용하여 데이터 로드 속도를 높입니다.
성능 테스트에 따르면 사용자 정의 dataLoader는 일반적으로 특히 작은 배치 크기와 여러 작업자 프로세스에서 표준 dataLoader보다 성능이 뛰어난 것으로 나타났습니다.
우리의 맞춤형 dataLoader는 중복 읽기를 줄이고 병렬성을 최대화하여 대규모 3D 의료 데이터 세트의 데이터 로딩 효율성을 향상시킵니다. 이러한 개선을 통해 교육 시간이 단축되고 하드웨어 리소스 활용도가 향상될 수 있습니다.
이 블로그는 내 동료 Jingnan Jia 와 함께 작성되었습니다.