data_list = data.to_numpy()
CPU_CORE = multiprocessing.cpu_count()
# 한 리스트에 몇개씩 담을지 결정
n = len(data_list) // CPU_CORE
data_list_list = [data_list[i * n:(i + 1) * n] for i in range((len(data_list) + n - 1) // n )]
save_path = os.getenv("HOME")+"/aiffel/news_summarization/data/preprocessed/"
def func_map(data_list) :
start = time.time()
clean_text_fold = []#text를 전처리한 결과를 담을 리스트
textfile = open(os.path.join(save_path, "clean_text_fold_" + str(os.getpid()%len(data_list_list)) + ".csv" ), "w")
for s in data_list:
textfile.write( preprocess_sentence(s[0]) + ',' + preprocess_sentence(s[1]) + "\n" )
# clean_text_fold.append(preprocess_sentence(s))
print("pid : " + str(os.getpid()) + "에서 한 fold 완료. 걸린시간 : ", time.time()- start)
# 멀티프로세싱 CPU 사용 수
if __name__=='__main__':
pool = multiprocessing.Pool(processes=CPU_CORE)
pool.map(func_map, data_list_list)
pool.close()
pool.join()
allData = []
for p in tf.io.gfile.glob(save_path + '*.csv') :
df = pd.read_csv(p, header=None)
allData.append(df)
dataCombine = pd.concat(allData, axis = 0, ignore_index = True)
tfrecord를 생성할때도 Multiprocessing을 이용하면 더욱 빠르게 생성할 수 있습니다.
tfrecord생성과정을 여러개의 process로 병렬 처리 하기 위해서
ilist에 i값을 bath_size단위로 샘플개수 범위만큼 저장합니다.
그후 ilist_list에 ilist값을 cpu코어 개수만큼 등분하여 나눠담습니다.
ilist=[]
for i in tqdm(range(prev_index, prev_index + len(img_name_vector), BATCH_SIZE)):
ilist.append(i)
# 한 리스트에 몇개씩 담을지 결정
n = len(ilist) // multiprocessing.cpu_count()
ilist_list = [ilist[i * n:(i + 1) * n] for i in range((len(ilist) + n - 1) // n )]
def g_tf(ilist) :
for i in ilist:
tfrecord_path = perm_path + 'batch_' + str(i + BATCH_SIZE) #저장할 디렉토리와 파일명 정의
img_name_vector_new = img_name_vector[i:i + BATCH_SIZE] #배치 단위로 이미지 정보 저장하기 위해 이미지 이름 불러오기
cap_vector_new = cap_vector[i:i + BATCH_SIZE] #배치 단위로 이미지 라벨 저장
tfrecord(img_name_vector_new, cap_vector_new, tfrecord_path) #tfrecord_path에 저장
print("pid : " + str(os.getpid()) + "에서 batch " + str(i+BATCH_SIZE)+ "완료")
import multiprocessing
CPU_CORE = multiprocessing.cpu_count() # 멀티프로세싱 CPU 사용 수
if __name__=='__main__':
pool = multiprocessing.Pool(processes=CPU_CORE)
pool.map(g_tf, ilist_list)
pool.close()
pool.join()
import multiprocessing as mp # 멀티 프로세싱으로 전처리 속도를 획기적으로 줄여봅시다
from multiprocessing import Pool
import numpy as np
import time
from functools import partial # map을 할 때 함수에 여러 인자를 넣어줄 수 있도록 합니다
start = time.time()
def appendTexts(sentences, remove_stopwords):
texts = []
for s in sentences:
texts += preprocess_sentence(s, remove_stopwords),
return texts
def preprocess_data(data, remove_stopwords=True):
start_time = time.time()
num_cores = mp.cpu_count() # 컴퓨터의 코어 수를 구합니다
text_data_split = np.array_split(data, num_cores) # 코어 수만큼 데이터를 배분하여 병렬적으로 처리할 수 있게 합니다
pool = Pool(num_cores)
processed_data = np.concatenate(pool.map(partial(appendTexts, remove_stopwords=remove_stopwords), text_data_split)) # 각자 작업한 데이터를 하나로 합쳐줍니다
# processed_data = pd.Series(processed_data)
pool.close()
pool.join()
print(time.time() - start_time, " seconds")
return processed_data
made by 동현