Intereting Posts
Имеет ли смысл создавать разделы подкачки для новых установок в настоящее время? Загрузитесь в Ubuntu с Android Tablet с Live USB Drive В разработке, основанном на Quickly, почему коды начинаются с «from gi.repository import Gtk»? Проблема с установкой Project R Двойная загрузка Dell XPS 13 не загружается в Ubuntu после обновления прошивки Символы Xmodmap, не включенные в список. Есть ли способ добавить их? Различные типы памяти в системном мониторе Как по-настоящему защитить жесткий диск? Проблема с загрузкой накопителя на диске: Дисковод для / tmp еще не готов или нет! 12.04 и проблема с NVIDIA-картой с NVIDIA Geforce 7025 Как установить плагин Jason для jEdit? Приложение текстового редактора, которое восстанавливает несохраненные файлы, которые были открыты на последнем сеансе Объединение домашних каталогов Является ли это исправление для безопасности Avast Antivirus безопасным для использования? bower install ничего не делает, даже не выдает ошибку

Как использовать многопроцессорность для параллелизации двух вызовов одной и той же функции с разными аргументами в цикле for?

В цикле for я argSet1 функцию дважды, но с разными наборами аргументов ( argSet1 , argSet2 ), которые изменяются на каждой итерации цикла for . Я хочу распараллелить эту операцию, поскольку один набор аргументов заставляет вызываемую функцию работать быстрее, а другой набор аргументов вызывает медленный запуск функции. Обратите внимание, что я не хочу иметь два цикла для этой операции. У меня также есть другое требование: каждая из этих функций выполняет некоторые параллельные операции, и поэтому я не хочу, чтобы какие-либо из функций с argSet1 или argSet2 выполнялись более одного раза из-за ограниченного количества ресурсов, которые у меня есть. Убедившись, что функция с обоими наборами аргументов работает, поможет мне как можно больше использовать ядра процессора. Вот как это делается без распараллеливания:

 def myFunc(arg1, arg2): if arg1: print ('do something that does not take too long') else: print ('do something that takes long') for i in range(10): argSet1 = arg1Storage[i] argSet1 = arg2Storage[i] myFunc(argSet1) myFunc(argSet2) 

Это, безусловно, не будет использовать преимущества вычислительных ресурсов, которые у меня есть. Вот моя попытка распараллеливать операции:

 from multiprocessing import Process def myFunc(arg1, arg2): if arg1: print ('do something that does not take too long') else: print ('do something that takes long') for i in range(10): argSet1 = arg1Storage[i] argSet1 = arg2Storage[i] p1 = Process(target=myFunc, args=argSet1) p1.start() p2 = Process(target=myFunc, args=argSet2) p2.start() 

Однако таким образом каждая функция со своими соответствующими аргументами будет называться 10 раз, и все становится очень медленным. Учитывая мои ограниченные знания в области многопроцессорности, я попытался улучшить ситуацию еще немного, добавив p1.join() и p2.join() в конец цикла for, но это все еще замедляет работу, поскольку p1 выполняется намного быстрее, и все ждут пока не будет выполнено p2 . Я также подумал об использовании multiprocessing.Value Чтобы выполнить некоторую связь с функциями, но затем я должен добавить цикл while внутри функции для каждого вызова функции, который замедляет все снова. Интересно, может ли кто-нибудь предложить практическое решение?

Поскольку я построил этот ответ в патчах, прокрутите вниз для лучшего решения этой проблемы

Вам нужно указать , как именно вы хотите, чтобы все было выполнено. Насколько я могу судить, вы хотите, чтобы два процесса работали не более, но, по крайней мере. Кроме того, вам не нужен тяжелый вызов, чтобы задержать быстрые. Один простой неоптимальный способ запуска:

 from multiprocessing import Process def func(counter,somearg): j = 0 for i in range(counter): j+=i print(somearg) def loop(counter,arglist): for i in range(10): func(counter,arglist[i]) heavy = Process(target=loop,args=[1000000,['heavy'+str(i) for i in range(10)]]) light = Process(target=loop,args=[500000,['light'+str(i) for i in range(10)]]) heavy.start() light.start() heavy.join() light.join() 

Вывод здесь (для одного примера):

 light0 heavy0 light1 light2 heavy1 light3 light4 heavy2 light5 light6 heavy3 light7 light8 heavy4 light9 heavy5 heavy6 heavy7 heavy8 heavy9 

Вы можете видеть, что последняя часть является субоптимальной, так как у вас есть последовательность тяжелых прогонов – это означает, что есть один процесс вместо двух.

Легкий способ оптимизировать это, если вы можете оценить, насколько дольше работает тяжелый процесс. Если он в два раза медленнее, как здесь, просто запустите 7 итераций тяжелого первого, присоединитесь к процессу освещения и запустите дополнительные 3.

Другой способ – запустить тяжелый процесс парами, поэтому сначала у вас есть 3 процесса, пока быстрый процесс не закончится, а затем продолжит с 2.

Главное – разделение тяжелых и легких вызовов на другой процесс полностью, поэтому, когда быстрые вызовы завершаются один за другим, вы быстро можете работать со своими медленными вещами. Как только вы закончите быстро, все зависит от вас, насколько вы хотите продолжить, но я думаю, что пока оценка того, как разбить тяжелые звонки, достаточно хороша. Это для моего примера:

 from multiprocessing import Process def func(counter,somearg): j = 0 for i in range(counter): j+=i print(somearg) def loop(counter,amount,arglist): for i in range(amount): func(counter,arglist[i]) heavy1 = Process(target=loop,args=[1000000,7,['heavy1'+str(i) for i in range(7)]]) light = Process(target=loop,args=[500000,10,['light'+str(i) for i in range(10)]]) heavy2 = Process(target=loop,args=[1000000,3,['heavy2'+str(i) for i in range(7,10)]]) heavy1.start() light.start() light.join() heavy2.start() heavy1.join() heavy2.join() 

с выходом:

 light0 heavy10 light1 light2 heavy11 light3 light4 heavy12 light5 light6 heavy13 light7 light8 heavy14 light9 heavy15 heavy27 heavy16 heavy28 heavy29 

Гораздо лучшее использование. Конечно, вы можете сделать это более продвинутым, поделившись очередью для медленных прогонов, поэтому, когда скорость будет выполнена, они могут присоединиться как рабочие в медленной очереди, но только для двух разных вызовов это может быть чрезмерным (хотя и не намного сложнее использовать очередь ). Лучшее решение :

 from multiprocessing import Queue,Process import queue def func(index,counter,somearg): j = 0 for i in range(counter): j+=i print("Worker",index,':',somearg) def worker(index): try: while True: func,args = q.get(block=False) func(index,*args) except queue.Empty: pass q = Queue() for i in range(10): q.put((func,(500000,'light'+str(i)))) q.put((func,(1000000,'heavy'+str(i)))) nworkers = 2 workers = [] for i in range(nworkers): workers.append(Process(target=worker,args=(i,))) workers[-1].start() q.close() for worker in workers: worker.join() 

Это лучшее и масштабируемое решение для того, что вы хотите. Выход:

 Worker 0 : light0 Worker 0 : light1 Worker 1 : heavy0 Worker 1 : light2 Worker 0 : heavy1 Worker 0 : light3 Worker 1 : heavy2 Worker 1 : light4 Worker 0 : heavy3 Worker 0 : light5 Worker 1 : heavy4 Worker 1 : light6 Worker 0 : heavy5 Worker 0 : light7 Worker 1 : heavy6 Worker 1 : light8 Worker 0 : heavy7 Worker 0 : light9 Worker 1 : heavy8 Worker 0 : heavy9 

Возможно, вы захотите использовать multiprocessing.Pool процесс. myFunc процессов и сопоставьте свой myFunc с ним, например:

 from multiprocessing import Pool import time def myFunc(arg1, arg2): if arg1: print ('do something that does not take too long') time.sleep(0.01) else: print ('do something that takes long') time.sleep(1) def wrap(args): return myFunc(*args) if __name__ == "__main__": p = Pool() argStorage = [(True, False), (False, True)] * 12 p.map(wrap, argStorage) 

Я добавил функцию p.map , так как функция, переданная в p.map должна принять один аргумент. Вы можете так же хорошо адаптировать myFunc чтобы принять кортеж, если это возможно в вашем случае.

Мой образец appStorage из 24 элементов, из которых 12 из них будут обрабатывать 1сек для обработки, а 12 – через 10 мс. В общем, этот скрипт работает через 3-4 секунды (у меня 4 ядра).

Одна из возможных реализаций может заключаться в следующем:

 import concurrent.futures import math list_of_args = [arg1, arg2] def my_func(arg): .... print ('do something that takes long') def main(): with concurrent.futures.ProcessPoolExecutor() as executor: for arg, result in zip(list_of_args, executor.map(is_prime, list_of_args)): print('my_func({0}) => {1}'.format(arg, result)) 

executor.map похож на встроенную функцию, метод карты позволяет несколько вызовов предоставляемой функции, передавая каждый из элементов в итерабельном для этой функции.