paint-brush
Torch.multiprocessing을 사용하여 Torch 데이터 로더의 병렬화를 개선하는 방법~에 의해@pixelperfectionist
467 판독값
467 판독값

Torch.multiprocessing을 사용하여 Torch 데이터 로더의 병렬화를 개선하는 방법

~에 의해 Prerak Mody13m2024/06/10
Read on Terminal Reader

너무 오래; 읽다

PyTorch 데이터로더는 딥 러닝 모델 훈련을 위해 데이터를 효율적으로 로드하고 전처리하는 도구입니다. 이 게시물에서는 torch.multiprocessing과 함께 사용자 정의 데이터로더를 사용하여 이 프로세스의 속도를 높이는 방법을 살펴봅니다. 우리는 3D 의료 스캔 데이터 세트에서 여러 2D 슬라이스를 로드하는 실험을 합니다.
featured image - Torch.multiprocessing을 사용하여 Torch 데이터 로더의 병렬화를 개선하는 방법
Prerak Mody HackerNoon profile picture
0-item

소개

PyTorch의 DataLoader( torch.utils.data.Dataloader )는 이미 딥 러닝 모델 훈련을 위해 데이터를 효율적으로 로드하고 전처리하는 데 유용한 도구입니다. 기본적으로 PyTorch는 단일 작업자 프로세스 ( num_workers=0 )를 사용하지만 사용자는 병렬성을 활용하고 데이터 로딩 속도를 높이기 위해 더 높은 숫자를 지정할 수 있습니다.


그러나 범용 데이터로더이고 병렬화를 제공하더라도 특정 사용자 정의 사용 사례에는 여전히 적합하지 않습니다. 이 게시물에서는 torch.multiprocessing() 사용하여 3D 의료 스캔 데이터세트에서 여러 2D 슬라이스의 로드 속도를 높이는 방법을 살펴봅니다.


우리는 각 환자의 3D 스캔에서 일련의 조각을 추출하려고 합니다. 이 환자들은 대규모 데이터 세트의 일부입니다.



torch.utils.data.Dataset

나는 환자에 대한 3D 스캔 세트(예: P1, P2, P3 등)와 해당 슬라이스 목록이 있는 사용 사례를 상상합니다. 우리의 목표는 모든 반복에서 슬라이스를 출력하는 데이터로더를 구축하는 것입니다. myDataset 라는 토치 데이터세트를 빌드하고 이를 torch.utils.data.Dataloader() 에 전달하는 아래 Python 코드를 확인하세요.


 # check full code here: https://gist.github.com/prerakmody/0c5e9263d42b2fab26a48dfb6b818cca#file-torchdataloader-py import tqdm import time import torch # v1.12.1 import numpy as np ################################################## # myDataset ################################################## def getPatientArray(patientName): # return patients 3D scan def getPatientSliceArray(patientName, sliceId, patientArray=None): # return patientArray and a slice class myDataset(torch.utils.data.Dataset): def __init__(self, patientSlicesList, patientsInMemory=1): ... self.patientObj = {} # To store one patients 3D array. More patients lead to more memory usage. def _managePatientObj(self, patientName): if len(self.patientObj) > self.patientsInMemory: self.patientObj.pop(list(self.patientObj.keys())[0]) def __getitem__(self, idx): # Step 0 - Init patientName, sliceId = ... # Step 1 - Get patient slice array patientArrayThis = self.patientObj.get(patientName, None) patientArray, patientSliceArray = getPatientSliceArray(patientName, sliceId, patientArray=patientArrayThis) if patientArray is not None: self.patientObj[patientName] = patientArray self._managePatientObj(patientName) return patientSliceArray, [patientName, sliceId] ################################################## # Main ################################################## if __name__ == '__main__': # Step 1 - Setup patient slices (fixed count of slices per patient) patientSlicesList = { 'P1': [45, 62, 32, 21, 69] , 'P2': [13, 23, 87, 54, 5] , 'P3': [34, 56, 78, 90, 12] , 'P4': [34, 56, 78, 90, 12] } workerCount, batchSize, epochs = 4, 1, 3 # Step 2.1 - Create dataset and dataloader dataset = myDataset(patientSlicesList) dataloader = torch.utils.data.DataLoader(dataset, batch_size=3, num_workers=4) # Step 2.2 - Iterate over dataloader print ('\n - [main] Iterating over (my) dataloader...') for epochId in range(epochs): print (' - [main] --------------------------------------- Epoch {}/{}'.format(epochId+1, epochs)) for i, (patientSliceArray, meta) in enumerate(dataloader): print (' - [main] meta: ', meta) pbar.update(patientSliceArray.shape[0])


우리 사용 사례의 주요 관심사는 3D 의료 스캔의 크기가 크다는 것입니다(여기에서는 time.sleep() 작업 으로 에뮬레이션됨 ).

  • 디스크에서 읽는 데 시간이 많이 걸릴 수 있습니다.

  • 대부분의 경우 대규모 3D 스캔 데이터 세트를 메모리로 미리 읽을 수 없습니다.


이상적으로는 관련된 모든 슬라이스에 대해 각 환자 스캔을 한 번만 읽어야 합니다. 하지만 데이터는 torch.utils.data.dataloader(myDataset, batch_size=b, workers=n) 에 의해 배치 크기에 따라 작업자로 분할되므로 여러 작업자가 환자를 두 번 읽을 가능성이 있습니다( 이미지 및 로그 확인) 아래에 ).

Torch는 배치 크기(이 경우 =3)에 따라 데이터세트 로딩을 각 작업자로 나눕니다. 이로 인해 각 환자는 여러 작업자가 판독합니다.


 - [main] Iterating over (my) dataloader... - [main] --------------------------------------- Epoch 1/3 - [getPatientArray()][worker=3] Loading volumes for patient: P2 - [getPatientArray()][worker=1] Loading volumes for patient: P1 - [getPatientArray()][worker=2] Loading volumes for patient: P2 - [getPatientArray()][worker=0] Loading volumes for patient: P1 - [getPatientArray()][worker=3] Loading volumes for patient: P3 - [main] meta: [('P1', 'P1', 'P1'), tensor([45, 62, 32])] - [getPatientArray()][worker=1] Loading volumes for patient: P2 - [main] meta: [('P1', 'P1', 'P2'), tensor([21, 69, 13])] - [main] meta: [('P2', 'P2', 'P2'), tensor([23, 87, 54])] - [main] meta: [('P2', 'P3', 'P3'), tensor([ 5, 34, 56])] - [getPatientArray()][worker=2] Loading volumes for patient: P4 - [getPatientArray()][worker=0] Loading volumes for patient: P3 - [getPatientArray()][worker=1] Loading volumes for patient: P4 - [main] meta: [('P3', 'P3', 'P3'), tensor([78, 90, 12])] - [main] meta: [('P4', 'P4', 'P4'), tensor([34, 56, 78])] - [main] meta: [('P4', 'P4'), tensor([90, 12])]


요약하자면, torch.utils.data.Dataloader 의 기존 구현과 관련된 문제는 다음과 같습니다.

  • 각 작업자에는 myDataset() 의 복사본이 전달됩니다(참조: 토치 v1.2. 0 ), 공유 메모리가 없기 때문에 환자의 3D 스캔에 대한 이중 디스크 읽기가 발생합니다.


  • 또한 토치는 patientSliceList ( 아래 이미지 참조 )를 순차적으로 반복하므로 (patientId, SliceId) 콤보 간에 자연스러운 순서 섞기가 불가능합니다. ( 참고: 섞을 수 있지만 여기에는 출력을 메모리에 저장하는 작업이 포함됩니다 )


표준 torch.utils.data.Dataloader()에는 작업자로부터 출력이 추출되는 방법을 전체적으로 관리하는 내부 대기열이 있습니다. 특정 작업자가 데이터를 준비하더라도 이 전역 대기열을 존중해야 하므로 출력할 수 없습니다.



참고: 각 환자의 3D 스캔에서 여러 조각을 함께 반환할 수도 있습니다. 그러나 슬라이스 종속 3D 배열도 반환하려는 경우(예: 대화형 미세 조정 네트워크( 이 작업의 그림 1 참조 )) 이로 인해 데이터로더의 메모리 공간이 크게 늘어납니다.



torch.multiprocessing 사용

환자 스캔을 여러 번 읽는 것을 방지 하려면 특정 작업자가 각 환자( 8명의 환자를 상상해 보자 )를 읽는 것이 이상적으로 필요합니다.

여기에서 각 작업자는 환자 세트를 읽는 데 집중합니다.


이를 달성하기 위해 우리는 토치 데이터로더 클래스(예: torch.multiprocessing() )와 동일한 내부 도구를 사용하지만 약간의 차이가 있습니다. 사용자 정의 데이터로더인 myDataloader 에 대한 워크플로 그림과 코드를 아래에서 확인하세요.

여기서 출력 대기열(하단)에는 각 작업자의 출력이 포함됩니다. 각 작업자는 특정 환자 집합에 대해서만 입력 정보(상단에 표시된 입력 대기열)를 받습니다. 따라서 이는 환자의 3D 스캔을 여러 번 판독하는 것을 방지합니다.



 # check full code here: https://gist.github.com/prerakmody/0c5e9263d42b2fab26a48dfb6b818cca#file-mydataloader-py class myDataloader: def __init__(self, patientSlicesList, numWorkers, batchSize) -> None: ... self._initWorkers() def _initWorkers(self): # Step 1 - Initialize vas self.workerProcesses = [] self.workerInputQueues = [torchMP.Queue() for _ in range(self.numWorkers)] self.workerOutputQueue = torchMP.Queue() for workerId in range(self.numWorkers): p = torchMP.Process(target=getSlice, args=(workerId, self.workerInputQueues[workerId], self.workerOutputQueue)) p.start() def fillInputQueues(self): """ This function allows to split patients and slices across workers. One can implement custom logic here. """ patientNames = list(self.patientSlicesList.keys()) for workerId in range(self.numWorkers): idxs = ... for patientName in patientNames[idxs]: for sliceId in self.patientSlicesList[patientName]: self.workerInputQueues[workerId].put((patientName, sliceId)) def emptyAllQueues(self): # empties the self.workerInputQueues and self.workerOutputQueue def __iter__(self): try: # Step 0 - Init self.fillInputQueues() # once for each epoch batchArray, batchMeta = [], [] # Step 1 - Continuously yield results while True: if not self.workerOutputQueue.empty(): # Step 2.1 - Get data point patientSliceArray, patientName, sliceId = self.workerOutputQueue.get(timeout=QUEUE_TIMEOUT) # Step 2.2 - Append to batch ... # Step 2.3 - Yield batch if len(batchArray) == self.batchSize: batchArray = collate_tensor_fn(batchArray) yield batchArray, batchMeta batchArray, batchMeta = [], [] # Step 3 - End condition if np.all([self.workerInputQueues[i].empty() for i in range(self.numWorkers)]) and self.workerOutputQueue.empty(): break except GeneratorExit: self.emptyAllQueues() except KeyboardInterrupt: self.closeProcesses() except: traceback.print_exc() def closeProcesses(self): pass if __name__ == "__main__": # Step 1 - Setup patient slices (fixed count of slices per patient) patientSlicesList = { 'P1': [45, 62, 32, 21, 69] , 'P2': [13, 23, 87, 54, 5] , 'P3': [34, 56, 78, 90, 12] , 'P4': [34, 56, 78, 90, 12] , 'P5': [45, 62, 32, 21, 69] , 'P6': [13, 23, 87, 54, 5] , 'P7': [34, 56, 78, 90, 12] , 'P8': [34, 56, 78, 90, 12, 21] } workerCount, batchSize, epochs = 4, 1, 3 # Step 2 - Create new dataloader dataloaderNew = None try: dataloaderNew = myDataloader(patientSlicesList, numWorkers=workerCount, batchSize=batchSize) print ('\n - [main] Iterating over (my) dataloader...') for epochId in range(epochs): with tqdm.tqdm(total=len(dataset), desc=' - Epoch {}/{}'.format(epochId+1, epochs)) as pbar: for i, (X, meta) in enumerate(dataloaderNew): print (' - [main] {}'.format(meta.tolist())) pbar.update(X.shape[0]) dataloaderNew.closeProcesses() except KeyboardInterrupt: if dataloader is not None: dataloader.closeProcesses() except: traceback.print_exc() if dataloaderNew is not None: dataloaderNew.closeProcesses()


위의 스니펫( 대신 8명의 환자 포함 )에는 다음 기능이 포함되어 있습니다.

  • __iter__() - myDataloader() 는 루프이므로 이것이 실제로 루프를 반복하는 함수입니다.


  • _initWorkers() - 여기서는 개별 입력 큐인 workerInputQueues[workerId] 사용하여 작업자 프로세스를 생성합니다. 클래스가 초기화될 때 호출됩니다.


  • fillInputQueues() - 이 함수는 루프를 시작할 때( 기본적으로 모든 에포크가 시작될 때 ) 호출됩니다. 개별 작업자의 입력 대기열을 채웁니다.


  • getSlice() - 이는 환자 볼륨에서 슬라이스를 반환하는 기본 논리 함수입니다. 여기에서 코드를 확인하세요.


  • collate_tensor_fn() - 이 함수는 토치 저장소인 torchv1.12.0 에서 직접 복사되며 데이터를 함께 일괄 처리하는 데 사용됩니다.


성능

데이터로더가 기본 옵션에 비해 속도 향상을 제공하는지 테스트하기 위해 다양한 작업자 수를 사용하여 각 데이터로더 루프의 속도를 테스트합니다. 우리는 실험에서 두 가지 매개변수를 변경했습니다.


  • 근로자 수 : 1, 2, 4, 8개의 작업자 프로세스를 테스트했습니다.
  • 배치 크기 : 1부터 8까지 다양한 배치 크기를 평가했습니다.

장난감 데이터세트

먼저 장난감 데이터 세트를 실험하고 데이터 로더가 훨씬 빠르게 작동하는 것을 확인했습니다. 아래 그림을 참조하세요(또는 이 코드 로 재현).
총 시간이 낮고 초당 반복 횟수가 높을수록 데이터 로더가 더 우수하다는 의미입니다.

여기서 우리는 다음을 볼 수 있습니다

  • 단일 작업자를 사용하는 경우 두 데이터로더는 모두 동일합니다.


  • 추가 작업자(예: 2,4,8)를 사용하면 두 데이터로더 모두 속도가 향상되지만 사용자 정의 데이터로더의 속도 향상이 훨씬 더 높습니다.


  • 배치 크기 6(1,2,3,4와 비교)을 사용하면 성능이 약간 저하됩니다. 이는 장난감 데이터세트에서 patientSlicesList 변수에 환자당 5개의 슬라이스가 포함되어 있기 때문입니다. 따라서 작업자는 배치의 마지막 인덱스에 추가할 두 번째 환자를 읽을 때까지 기다려야 합니다.

실제 데이터 세트

그런 다음 3D 스캔이 로드되고 슬라이스가 추출되는 실제 데이터 세트를 벤치마킹합니다. 일부 추가 전처리가 완료되었습니다. , 슬라이스 및 기타 배열이 반환됩니다. 결과는 아래 그림을 참조하세요.


우리는 그것을 관찰했습니다 작업자(및 배치 크기) 프로세스 수를 늘리면 일반적으로 데이터 로드 속도가 빨라집니다. 따라서 더 빠른 훈련으로 이어질 수 있습니다. 더 작은 배치 크기(예: 1 또는 2)의 경우 작업자 수를 두 배로 늘리면 속도가 훨씬 더 빨라집니다. 그러나 배치 크기가 증가함에 따라 작업자 추가로 인한 한계 개선 효과는 감소했습니다.

반복/초가 높을수록 데이터 로더가 더 빨라집니다.

자원 활용

또한 다양한 작업자 수를 사용하여 데이터를 로드하는 동안 리소스 활용도를 모니터링했습니다. 작업자 수가 많을수록 CPU 및 메모리 사용량이 증가하는 것으로 나타났습니다. 이는 추가 프로세스에 의해 도입된 병렬 처리로 인해 예상되는 현상입니다. 사용자는 최적의 작업자 수를 선택할 때 하드웨어 제약 조건과 리소스 가용성을 고려해야 합니다.

요약

  1. 이 블로그 게시물에서는 대규모 3D 의료 스캔이 포함된 데이터 세트를 처리할 때 PyTorch의 표준 DataLoader의 한계를 살펴보고 데이터 로딩 효율성을 향상시키기 위해 torch.multiprocessing 사용하는 맞춤형 솔루션을 제시했습니다.


  2. 이러한 3D 의료 스캔에서 슬라이스를 추출하는 경우 작업자가 메모리를 공유하지 않기 때문에 기본 dataLoader가 잠재적으로 동일한 환자 스캔을 여러 번 읽을 수 있습니다. 이러한 중복으로 인해 특히 대규모 데이터 세트를 처리할 때 상당한 지연이 발생합니다.


  3. 우리의 맞춤형 dataLoader는 작업자 간에 환자를 분할하여 각 3D 스캔이 작업자당 한 번만 읽히도록 보장합니다. 이 접근 방식은 중복된 디스크 읽기를 방지하고 병렬 처리를 활용하여 데이터 로드 속도를 높입니다.


  4. 성능 테스트에 따르면 사용자 정의 dataLoader는 일반적으로 특히 작은 배치 크기와 여러 작업자 프로세스에서 표준 dataLoader보다 성능이 뛰어난 것으로 나타났습니다.


    1. 그러나 배치 크기가 커지면 성능 향상이 줄어듭니다.


우리의 맞춤형 dataLoader는 중복 읽기를 줄이고 병렬성을 최대화하여 대규모 3D 의료 데이터 세트의 데이터 로딩 효율성을 향상시킵니다. 이러한 개선을 통해 교육 시간이 단축되고 하드웨어 리소스 활용도가 향상될 수 있습니다.


이 블로그는 내 동료 Jingnan Jia 와 함께 작성되었습니다.