Как использовать несколько ядер процессора python
Перейти к содержимому

Как использовать несколько ядер процессора python

  • автор:

Способы реализации параллельных вычислений в программах на Python

Параллелизм дает возможность работать над несколькими вычислениями одновременно в одной программе. Такого поведения в Python можно добиться несколькими способами:

  • Используя многопоточность threading , позволяя нескольким потокам работать по очереди.
  • Используя несколько ядер процессора multiprocessing . Делать сразу несколько вычислений, используя несколько ядер процессора. Это и называется параллелизмом.
  • Используя асинхронный ввод-вывод с модулем asyncio . Запуская какую то задачу, продолжать делать другие вычисления, вместо ожидания ответа от сетевого подключения или от операций чтения/записи.

Разница между потоками и процессами.

Поток threading — это независимая последовательность выполнения каких то вычислений. Поток thread делит выделенную память ядру процессора, а также его процессорное время со всеми другими потоками, которые создаются программой в рамках одного ядра процессора. Программы на языке Python имеют, по умолчанию, один основной поток. Можно создать их больше и позволить Python переключаться между ними. Это переключение происходит очень быстро и кажется, что они работают параллельно.

Понятие процесс в multiprocessing — представляет собой также независимую последовательность выполнения вычислений. В отличие от потоков threading , процесс имеет собственное ядро и следовательно выделенную ему память, которое не используется совместно с другими процессами. Процесс может клонировать себя, создавая два или более экземпляра в одном ядре процессора.

Асинхронный ввод-вывод не является ни потоковым ( threading ), ни многопроцессорным ( multiprocessing ). По сути, это однопоточная, однопроцессная парадигма и не относится к параллельным вычислениям.

У Python есть одна особенность, которая усложняет параллельное выполнение кода. Она называется GIL, сокращенно от Global Interpreter Lock. GIL гарантирует, что в любой момент времени работает только один поток. Из этого следует, что с потоками невозможно использовать несколько ядер процессора.

GIL был введен в Python потому, что управление памятью CPython не является потокобезопасным. Имея такую блокировку Python может быть уверен, что никогда не будет условий гонки.

Что такое условия гонки и потокобезопасность?

  • Состояние гонки возникает, когда несколько потоков могут одновременно получать доступ к общей структуре данных или местоположению в памяти и изменять их, вследствии чего могут произойти непредсказуемые вещи. Пример из жизни: если два пользователя одновременно редактируют один и тот же документ онлайн и второй пользователь сохранит данные в базу, то перезапишет работу первого пользователя. Чтобы избежать условий гонки, необходимо заставить второго пользователя ждать, пока первый закончит работу с документом и только после этого разрешить второму пользователю открыть и начать редактировать документ.
  • Потокобезопасность работает путем создания копии локального хранилища в каждом потоке, чтобы данные не сталкивались с другим потоком.

Алгоритм планирования доступа потоков к общим данным.

Как уже говорилось, потоки используют одну и ту же выделенную память. Когда несколько потоков работают одновременно, то нельзя угадать порядок, в котором потоки будут обращаются к общим данным. Результат доступа к совместно используемым данным зависит от алгоритма планирования. который решает, какой поток и когда запускать. Если такого алгоритма нет, то конечные данные могут быть не такими как ожидаешь.

Например, есть общая переменная a = 2 . Теперь предположим, что есть два потока, thread_one и thread_two . Они выполняют следующие операции:

a = 2 # функция 1 потока def thread_one(): global a a = a + 2 # функция 2 потока def thread_two(): global a a = a * 3 

Если поток thread_one получит доступ к общей переменной a первым и thread_two вторым, то результат будет 12:

или наоборот, сначала запустится thread_two , а затем thread_one , то мы получим другой результат:

Таким образом очевидно, что порядок выполнения операций потоками имеет значение

Без алгоритмов планирования доступа потоков к общим данным такие ошибки очень трудно найти и произвести отладку. Кроме того, они, как правило, происходят случайным образом, вызывая беспорядочное и непредсказуемое поведение.

Есть еще худший вариант развития событий, который может произойти без встроенной в Python блокировки потоков GIL . Например, если оба потока начинают читать глобальную переменную a одновременно, оба потока увидят, что a = 2 , а дальше, в зависимости от того какой поток произведет вычисления последним, в конечном итоге и будет равна переменная a (4 или 6). Не то, что ожидалось!

Исследование разных подходов к параллельным вычислениям в Python.

Определим функцию, которую будем использовать для сравнения различных вариантов вычислений. Во всех следующих примерах используется одна и та же функция, называемая heavy() :

def heavy(n): for x in range(1, n): for y in range(1, n): x**y 

Функция heavy() представляет собой вложенный цикл, который выполняет возведение в степень. Это функция связана со скоростью ядра процессора производить математические вычисления. Если понаблюдать за операционной системой во время выполнения функции, то можно увидеть загрузку ЦП близкую к 100%.

Будем запускать эту функцию по-разному, тем самым исследуя различия между обычной однопоточной программой Python, многопоточностью и многопроцессорностью.

Однопоточный режим работы.

Каждая программа Python имеет по крайней мере один основной поток. Ниже представлен пример кода для запуска функции heavy() в одном основном потоке одного ядра процессора, который производит все операции последовательно и будет служить эталоном с точки зрения скорости выполнения:

import time def heavy(n): for x in range(1, n): for y in range(1, n): x**y def sequential(n): for i in range(n): heavy(500) print(f"n> циклов вычислений закончены") if __name__ == "__main__": start = time.time() sequential(80) end = time.time() print("Общее время работы: ", end - start) # 80 циклов вычислений закончены # Общее время работы: 23.573118925094604 

Использование потоков threading .

В следующем примере будем использовать несколько потоков для выполнения функции heavy() . Также произведем 80 циклов вычислений. Для этого разделим вычисления на 4 потока, в каждом из которых запустим 20 циклов:

import threading import time def heavy(n, i, thead): for x in range(1, n): for y in range(1, n): x**y print(f"Цикл № i>. Поток thead>") def sequential(calc, thead): print(f"Запускаем поток № thead>") for i in range(calc): heavy(500, i, thead) print(f"calc> циклов вычислений закончены. Поток № thead>") def threaded(theads, calc): # theads - количество потоков # calc - количество операций на поток threads = [] # делим вычисления на `theads` потоков for thead in range(theads): t = threading.Thread(target=sequential, args=(calc, thead)) threads.append(t) t.start() # Подождем, пока все потоки # завершат свою работу. for t in threads: t.join() if __name__ == "__main__": start = time.time() # разделим вычисления на 4 потока # в каждом из которых по 20 циклов threaded(4, 20) end = time.time() print("Общее время работы: ", end - start) # Показано часть вывода # . # . # . # Общее время работы: 43.33752250671387 

Однопоточный режим работы, оказался почти в 2 раза быстрее, потому что один поток не имеет накладных расходов на создание потоков (в нашем случае создается 4 потока) и переключение между ними.

Если бы у Python не было GIL, то вычисления функции heavy() происходили быстрее, а общее время выполнения программы стремилось к времени выполнения однопоточной программы. Причина, по которой многопоточный режим в данном примере не будет работать быстрее однопоточного — это вычисления, связанные с процессором и заключаются в GIL!

Если бы функция heavy() имела много блокирующих операций, таких как сетевые вызовы или операции с файловой системой, то применение многопоточного режима работы было бы оправдано и дало огромное увеличение скорости!

Это утверждение можно проверить смоделировав операции ввода-вывода при помощи функции time.sleep() .

import threading import time def heavy(): # имитации операций ввода-вывода time.sleep(2) def threaded(theads): threads = [] # делим операции на `theads` потоков for thead in range(theads): t = threading.Thread(target=heavy) threads.append(t) t.start() # Подождем, пока все потоки # завершат свою работу. for t in threads: t.join() print(f"theads> циклов имитации операций ввода-вывода закончены") if __name__ == "__main__": start = time.time() # 80 потоков - это неправильно и показано # чисто в демонстрационных целях threaded(80) end = time.time() print("Общее время работы: ", end - start) # 80 циклов имитации операций ввода-вывода закончены # Общее время работы: 2.008725881576538 

Даже если воображаемый ввод-вывод делится на 80 потоков и все они будут спать в течение двух секунд, то код все равно завершится чуть более чем за две секунды, т. к. многопоточной программе нужно время на планирование и запуск потоков.

Примечание! Каждый процессор поддерживает определенное количество потоков на ядро, заложенное производителем, при которых он работает оптимально быстро. Нельзя создавать безгранично много потоков. При увеличении числа потоков на величину, большую, чем заложил производитель, программа будет выполняться дольше или вообще поведет себя непредсказуемым образом (вплоть до зависания).

Использование многопроцессорной обработки multiprocessing .

Теперь попробуем настоящую параллельную обработку с использованием модуля multiprocessing . Модуль multiprocessing во многом повторяет API модуля threading , поэтому изменения в коде будут незначительны.

Для того, чтобы произвести 80 циклов вычислений функции heavy() , узнаем сколько процессор имеет ядер, а потом поделим циклы вычислений на количество ядер.

import multiprocessing import time def heavy(n, i, proc): for x in range(1, n): for y in range(1, n): x**y print(f"Цикл № i> ядро proc>") def sequential(calc, proc): print(f"Запускаем поток № proc>") for i in range(calc): heavy(500, i, proc) print(f"calc> циклов вычислений закончены. Процессор № proc>") def processesed(procs, calc): # procs - количество ядер # calc - количество операций на ядро processes = [] # делим вычисления на количество ядер for proc in range(procs): p = multiprocessing.Process(target=sequential, args=(calc, proc)) processes.append(p) p.start() # Ждем, пока все ядра # завершат свою работу. for p in processes: p.join() if __name__ == "__main__": start = time.time() # узнаем количество ядер у процессора n_proc = multiprocessing.cpu_count() # вычисляем сколько циклов вычислений будет приходится # на 1 ядро, что бы в сумме получилось 80 или чуть больше calc = 80 // n_proc + 1 processesed(n_proc, calc) end = time.time() print(f"Всего n_proc> ядер в процессоре") print(f"На каждом ядре произведено calc> циклов вычислений") print(f"Итого n_proc*calc> циклов за: ", end - start) # Весь вывод показывать не будем # . # . # . # Всего 6 ядер в процессоре # На каждом ядре произведено 14 циклов вычислений # Итого 84 циклов вычислений за: 5.0251686573028564 

Код выполнился почти в 5 раз быстрее. Это прекрасно демонстрирует линейное увеличение скорости вычислений от количества ядер процессора.

Использование многопроцессорной обработки с пулом.

Можно сделать предыдущую версию программы немного более элегантной, используя multiprocessing.Pool() . Объект пула, управляет пулом рабочих процессов, в который могут быть отправлены задания. Используя метод Pool.starmap() , можно произвести инициализацию функции sequential () для каждого процесса.

В целях эксперимента в функции запуска пула процессов pooled(core) предусмотрено ручное указание количества ядер процессора. Если не указывать значение core , то по умолчанию будет использоваться количество ядер процессора вашей системы, что является разумным выбором:

import multiprocessing import time def heavy(n, i, proc): for x in range(1, n): for y in range(1, n): x**y print(f"Вычисление № i> процессор proc>") def sequential(calc, proc): print(f"Запускаем поток № proc>") for i in range(calc): heavy(500, i, proc) print(f"calc> циклов вычислений закончены. Процессор № proc>") def pooled(core=None): # вычисляем количество ядер процессора n_proc = multiprocessing.cpu_count() if core is None else core # вычисляем количество операций на процесс calc = int(80 / n_proc) if 80 % n_proc == 0 else int(80 // n_proc + 1) # создаем список инициализации функции # sequential(calc, proc) для каждого процесса init = map(lambda x: (calc, x), range(n_proc)) with multiprocessing.Pool() as pool: pool.starmap(sequential, init) print (calc, n_proc, core) return (calc, n_proc, core) if __name__ == "__main__": start = time.time() # в целях эксперемента, укажем количество # ядер больше чем есть на самом деле calc, n_proc, n = pooled(20) end = time.time() text = '' if n is None else 'задано ' print(f"Всего text>n_proc> ядер процессора") print(f"На каждом ядре произведено calc> циклов вычислений") print(f"Итого n_proc*calc> циклов за: ", end - start) # Весь вывод показывать не будем # . # . # . # Всего задано 20 ядер процессора # На каждом ядре произведено 4 циклов вычислений # Итого 80 циклов за: 5.422096252441406 

Из результатов работы видно, что время работы незначительно увеличилось.

Если запустить этот код, то можно проследить, что вычисления все равно происходят на том количестве ядер, которые имеются в процессоре. Только вычисления происходят поочередно — из за этого незначительное увеличение времени работы программы.

Выводы:

  • Используйте модули threading или asyncio для программ, связанных с сетевым вводом-выводом, чтобы значительно повысить производительность.
  • Используйте модуль multiprocessing для решения проблем, связанных с операциями ЦП. Этот модуль использует весь потенциал всех ядер в процессоре.
  • КРАТКИЙ ОБЗОР МАТЕРИАЛА.
  • Global Interpreter Lock (GIL)

Как использовать все процессоры при записи данных в файл используя python?

У меня есть функция код, который по определенной логике записывает данные в Базу данных. Я считываю данные с 2-х файлов построчно и на основании этих данных создаю запись в Базе Данных. Как БД использую ESRI geodatabase. Но, проблема с том, что для работы мы используем 1 ядро и тратим очень много времени на выполнения этого кода. У меня же 96 ядер и я хочу сэкономить время при использовании всех ядер. Это можно сделать через распаралеливание процессов. Вот мой код: Создаю БД и таблицу в ней:

 arcpy.CreateFileGDB_management("C:\Users\ivan\CellRebell\ESRI_New_Zeland", "%s.gdb"%Island) result = arcpy.management.CreateFeatureclass( "C:\Users\ivan\CellRebell\ESRI_New_Zeland/%s.gdb"%Island, "esri_square", "POLYGON", spatial_reference=4326) feature_class = result[0] arcpy.AddField_management(feature_class, 'ID', 'TEXT') 

Записываю данные в таблицу:

with arcpy.da.InsertCursor(feature_class, ['ID','SHAPE@']) as cursor: with open('%s_long_double.txt'%Island, 'r') as long: for i in long: i_1, i_2 = i.split() with open('%s_short_double.txt'%Island, 'r') as short: for k in short: k_1, k_2 = k.split() coord = [[float(i_1),float(k_1)],[float(i_2),float(k_1)],[float(i_2),float(k_2)],[float(i_1),float(k_2)]] count+=1 row = ['%s_%s'%(Island, count), coord] cursor.insertRow(row) del cursor 

Как использовать все ядра для записи данных в файл? Скрипт написан на Python 2.7. Основная задача, которую я хочу решить — это ускорить процесс записи данных в таблицу через использования всех процессоров. Спасибо

Параллелизм в Python – многопроцессорность

В этой главе мы сосредоточимся больше на сравнении между многопроцессорностью и многопоточностью.

многопроцессорная обработка

Это использование двух или более процессорных блоков в одной компьютерной системе. Это лучший способ получить полный потенциал от нашего оборудования, используя полное количество процессорных ядер, доступных в нашей компьютерной системе.

Многопоточность

Это способность ЦП управлять использованием операционной системы путем одновременного выполнения нескольких потоков. Основная идея многопоточности заключается в достижении параллелизма путем разделения процесса на несколько потоков.

Следующая таблица показывает некоторые важные различия между ними –

многопроцессорная обработка Мультипрограммирование
Под многопроцессорной обработкой понимается обработка нескольких процессов одновременно несколькими процессорами. Мультипрограммирование хранит несколько программ в основной памяти одновременно и выполняет их одновременно, используя один процессор.
Он использует несколько процессоров. Он использует один процессор.
Это позволяет параллельную обработку. Переключение контекста происходит.
Меньше времени уходит на обработку работ. Больше времени уходит на обработку заданий.
Это способствует гораздо более эффективному использованию устройств компьютерной системы. Менее эффективен, чем многопроцессорная.
Обычно дороже. Такие системы дешевле.

Устранение влияния глобальной блокировки интерпретатора (GIL)

При работе с параллельными приложениями в Python есть ограничение, называемое GIL (Global Interpreter Lock) . GIL никогда не позволяет нам использовать несколько ядер CPU, и поэтому мы можем сказать, что в Python нет настоящих потоков. GIL – мьютекс – блокировка взаимного исключения, которая делает вещи безопасными. Другими словами, мы можем сказать, что GIL препятствует параллельному выполнению кода Python несколькими потоками. Блокировка может удерживаться только одним потоком за раз, и если мы хотим выполнить поток, он должен сначала получить блокировку.

Используя многопроцессорность, мы можем эффективно обойти ограничение, вызванное GIL –

  • Используя многопроцессорность, мы используем возможности нескольких процессов и, следовательно, мы используем несколько экземпляров GIL.
  • В связи с этим нет ограничений на выполнение байт-кода одного потока в наших программах одновременно.

Используя многопроцессорность, мы используем возможности нескольких процессов и, следовательно, мы используем несколько экземпляров GIL.

В связи с этим нет ограничений на выполнение байт-кода одного потока в наших программах одновременно.

Запуск процессов в Python

Следующие три метода могут быть использованы для запуска процесса в Python внутри модуля многопроцессорной обработки:

Создание процесса с помощью Fork

Команда Fork – это стандартная команда в UNIX. Он используется для создания новых процессов, называемых дочерними процессами. Этот дочерний процесс выполняется одновременно с процессом, называемым родительским процессом. Эти дочерние процессы также идентичны своим родительским процессам и наследуют все ресурсы, доступные родительскому процессу. Следующие системные вызовы используются при создании процесса с помощью Fork –

  • fork () – это системный вызов, обычно реализованный в ядре. Он используется для создания копии процесса.
  • getpid () – этот системный вызов возвращает идентификатор процесса (PID) вызывающего процесса.

fork () – это системный вызов, обычно реализованный в ядре. Он используется для создания копии процесса.

getpid () – этот системный вызов возвращает идентификатор процесса (PID) вызывающего процесса.

пример

Следующий пример скрипта Python поможет вам понять, как создать новый дочерний процесс и получить PID дочернего и родительского процессов.

import os def child(): n = os.fork() if n > 0: print("PID of Parent process is : ", os.getpid()) else: print("PID of Child process is : ", os.getpid()) child()

Выход

PID of Parent process is : 25989 PID of Child process is : 25990

Создание процесса с помощью Spawn

Spawn означает начать что-то новое. Следовательно, порождение процесса означает создание нового процесса родительским процессом. Родительский процесс продолжает свое выполнение асинхронно или ожидает, пока дочерний процесс не завершит свое выполнение. Выполните следующие шаги для запуска процесса –

  • Импорт многопроцессорного модуля.
  • Создание объекта процесса.
  • Запуск процесса с помощью вызова метода start () .
  • Дождитесь, пока процесс завершит свою работу, и выйдите, вызвав метод join () .

Импорт многопроцессорного модуля.

Создание объекта процесса.

Запуск процесса с помощью вызова метода start () .

Дождитесь, пока процесс завершит свою работу, и выйдите, вызвав метод join () .

пример

Следующий пример скрипта Python помогает в порождении трех процессов

import multiprocessing def spawn_process(i): print ('This is process: %s' %i) return if __name__ == '__main__': Process_jobs = [] for i in range(3): p = multiprocessing.Process(target = spawn_process, args = (i,)) Process_jobs.append(p) p.start() p.join()

Выход

This is process: 0 This is process: 1 This is process: 2

Создание процесса с помощью Forkserver

Механизм Forkserver доступен только на тех выбранных платформах UNIX, которые поддерживают передачу файловых дескрипторов по каналам Unix. Рассмотрим следующие моменты, чтобы понять работу механизма Forkserver –

  • Сервер создается с использованием механизма Forkserver для запуска нового процесса.
  • Затем сервер получает команду и обрабатывает все запросы на создание новых процессов.
  • Для создания нового процесса наша программа на Python отправит запрос в Forkserver и создаст для нас процесс.
  • Наконец, мы можем использовать этот новый созданный процесс в наших программах.

Сервер создается с использованием механизма Forkserver для запуска нового процесса.

Затем сервер получает команду и обрабатывает все запросы на создание новых процессов.

Для создания нового процесса наша программа на Python отправит запрос в Forkserver и создаст для нас процесс.

Наконец, мы можем использовать этот новый созданный процесс в наших программах.

Демонстрационные процессы в Python

Модуль многопроцессорной обработки Python позволяет нам запускать процессы-демоны с помощью его опции-демона. Процессы демона или процессы, работающие в фоновом режиме, следуют той же концепции, что и потоки демона. Чтобы выполнить процесс в фоновом режиме, нам нужно установить для демонического флага значение true. Процесс демона будет продолжать работать до тех пор, пока выполняется основной процесс, и он будет остановлен после завершения своего выполнения или когда основная программа будет уничтожена.

пример

Здесь мы используем тот же пример, что и в потоках демона. Единственное отличие – это изменение модуля с многопоточности на многопроцессорность и установка флага демона в значение true. Тем не менее, будет изменение в выходе, как показано ниже –

import multiprocessing import time def nondaemonProcess(): print("starting my Process") time.sleep(8) print("ending my Process") def daemonProcess(): while True: print("Hello") time.sleep(2) if __name__ == '__main__': nondaemonProcess = multiprocessing.Process(target = nondaemonProcess) daemonProcess = multiprocessing.Process(target = daemonProcess) daemonProcess.daemon = True nondaemonProcess.daemon = False daemonProcess.start() nondaemonProcess.start()

Выход

starting my Process ending my Process

Вывод отличается от того, который генерируется потоками демона, потому что выход процесса не в режиме демона. Следовательно, демонический процесс завершается автоматически после завершения основных программ, чтобы избежать сохранения запущенных процессов.

Завершение процессов в Python

Мы можем немедленно завершить или завершить процесс с помощью метода terminate () . Мы будем использовать этот метод для завершения дочернего процесса, который был создан с помощью функции, непосредственно перед завершением его выполнения.

пример

import multiprocessing import time def Child_process(): print ('Starting function') time.sleep(5) print ('Finished function') P = multiprocessing.Process(target = Child_process) P.start() print("My Process has terminated, terminating main thread") print("Terminating Child Process") P.terminate() print("Child Process successfully terminated")

Выход

My Process has terminated, terminating main thread Terminating Child Process Child Process successfully terminated

Выходные данные показывают, что программа завершается до выполнения дочернего процесса, созданного с помощью функции Child_process (). Это подразумевает, что дочерний процесс был успешно завершен.

Определение текущего процесса в Python

Каждый процесс в операционной системе имеет идентификатор процесса, известный как PID. В Python мы можем узнать PID текущего процесса с помощью следующей команды –

import multiprocessing print(multiprocessing.current_process().pid)

пример

Следующий пример скрипта Python помогает узнать PID основного процесса, а также PID дочернего процесса –

import multiprocessing import time def Child_process(): print("PID of Child Process is: <>".format(multiprocessing.current_process().pid)) print("PID of Main process is: <>".format(multiprocessing.current_process().pid)) P = multiprocessing.Process(target=Child_process) P.start() P.join()

Выход

PID of Main process is: 9401 PID of Child Process is: 9402

Использование процесса в подклассе

Мы можем создавать потоки, подклассифицируя класс threading.Thread . Кроме того, мы также можем создавать процессы, подклассифицируя класс multiprocessing.Process . Для использования процесса в подклассе нам необходимо учитывать следующие моменты:

  • Нам нужно определить новый подкласс класса Process .
  • Нам нужно переопределить класс _init_ (self [, args]) .
  • Нам нужно переопределить метод run (self [, args]), чтобы реализовать какой процесс
  • Нам нужно запустить процесс, вызвав метод start () .

Нам нужно определить новый подкласс класса Process .

Нам нужно переопределить класс _init_ (self [, args]) .

Нам нужно переопределить метод run (self [, args]), чтобы реализовать какой процесс

Нам нужно запустить процесс, вызвав метод start () .

пример

import multiprocessing class MyProcess(multiprocessing.Process): def run(self): print ('called run method in process: %s' %self.name) return if __name__ == '__main__': jobs = [] for i in range(5): P = MyProcess() jobs.append(P) P.start() P.join()

Выход

called run method in process: MyProcess-1 called run method in process: MyProcess-2 called run method in process: MyProcess-3 called run method in process: MyProcess-4 called run method in process: MyProcess-5

Модуль многопроцессорной обработки Python – класс пула

Если мы говорим о простых задачах параллельной обработки в наших приложениях Python, то многопроцессорный модуль предоставляет нам класс Pool. Следующие методы класса Pool могут быть использованы для ускорения числа дочерних процессов в нашей основной программе

применить () метод

Этот метод аналогичен методу .submit () класса .ThreadPoolExecutor. Он блокируется, пока результат не будет готов.

apply_async () метод

Когда нам нужно параллельное выполнение наших задач, тогда мы должны использовать метод apply_async () для отправки задач в пул. Это асинхронная операция, которая не блокирует основной поток, пока не будут выполнены все дочерние процессы.

метод map ()

Как и метод apply () , он также блокируется, пока результат не будет готов. Это эквивалентно встроенной функции map (), которая разбивает итерируемые данные на несколько частей и отправляет их в пул процессов как отдельные задачи.

метод map_async ()

Это вариант метода map (), так как apply_async () относится к методу apply () . Возвращает объект результата. Когда результат становится готовым, к нему применяется вызываемый элемент. Призыв должен быть завершен немедленно; в противном случае поток, обрабатывающий результаты, будет заблокирован.

пример

Следующий пример поможет вам реализовать пул процессов для параллельного выполнения. Простое вычисление квадрата числа было выполнено путем применения функции square () с помощью метода multiprocessing.Pool . Затем pool.map () использовался для отправки 5, потому что input – это список целых чисел от 0 до 4. Результат будет сохранен в p_outputs и напечатан.

Оптимизации работы Jupyter notebook при помощи параллельных вычислений (Библиотека Joblib)

Меня зовут Серов Александр, я участник профессионального сообщества NTA.

Я расскажу о возможностях применения параллельных вычислений в интерактивной среде Jupyter notebook языка Python.

Для чего нам необходим параллелизм?

Параллелизм играет важную роль в задачах Data Science, так как может значительно ускорить вычисления и обработку больших объемов данных. Вот некоторые основные причины, почему мультипроцессинг важен для этих задач:

  • Ускорение вычислений: многие задачи в DS, такие как обучение моделей машинного обучения, кластеризация, обработка изображений и анализ больших данных, являются вычислительно интенсивными. Использование параллельных вычислений позволяет распределить работу между несколькими ядрами процессора или даже между несколькими компьютерами, что приводит к существенному ускорению выполнения задач.
  • Обработка больших объемов данных: параллельные вычисления позволяют эффективно распараллелить обработку данных, разделив ее на более мелкие части и выполняя их одновременно.
  • Оптимизация гиперпараметров: за счет параллельного выполнения экспериментов с различными значениями гиперпараметров можно ускорить процесс поиска оптимальных параметров модели.
  • Обработка потоковых данных: может быть необходимо обрабатывать потоковые данные в реальном времени. Мультипроцессинг позволяет эффективно обрабатывать и анализировать потоки данных, особенно в случае высоких нагрузок и необходимости обработки данных в режиме реального времени.

В языке Python уже есть реализация параллелизма на основе базового модуля — multiprocessing. Тогда почему в Jupyter notebook он не будет работать?

Почему не работает multiprocessing?

В Jupyter Notebook возникают проблемы при использовании модуля multiprocessing из‑за его особенностей взаимодействия с интерактивной средой Jupyter. Эти проблемы связаны с тем, что Jupyter Notebook запускает ядро Python в собственном процессе, который уже выполняет код ячеек.

Модуль multiprocessing в Python использует форкирование процессов для достижения параллельного выполнения. Однако в Jupyter Notebook уже есть запущенный процесс Python, и при попытке использования multiprocessing в ячейке происходит попытка создания нового дочернего процесса внутри уже существующего процесса, что вызывает конфликт.

Также следует отметить, что Jupyter Notebook сам по себе является интерактивной средой, где можно выполнять код в ячейках в любом порядке и в любое время. Однако multiprocessing требует выполнения кода в основном (главном) модуле программы, что делает его работу с Jupyter Notebook сложной.

Joblib vs. multiprocessing

Библиотека joblib предоставляет простой интерфейс для параллельного выполнения задач на нескольких ядрах процессора, и она может быть использована в Jupyter Notebook для задействования параллелизма.

Основное отличие между multiprocessing и joblib заключается в том, как они взаимодействуют с интерпретатором Python. В отличие от multiprocessing, joblib использует фоновые процессы, которые запускаются независимо от основного процесса Jupyter Notebook. Таким образом, joblib избегает проблем, связанных с созданием дочерних процессов внутри уже существующего процесса.

Перейдем к практической демонстрация работы кода без использования параллельных вычислений.

Monkey sort без параллельных вычислений

Для начала необходимо сымитировать вычислительную функцию с длительным выполнением, из самых известных и простых вариантов это monkey sort — алгоритм сортировки, который проверяет является ли массив отсортированным, если нет, то случайным образом перемешивают до тех пор, пока он не отсортируется. Его средняя асимптотика будет равна O((n+1)!), в среднем, потому что существует фактор случайности и перемешивание может случиться как быстрее, так и дольше, но при применении закона больших чисел, асимптотика устремиться к этому значению.

Импортируем необходимые библиотеки:

from joblib import Parallel, delayed import pandas as pd import random import numpy as np import warnings import random warnings.filterwarnings(‘ignore’)

Реализуем алгоритм «самой быстрой сортировки» bogosort (monkey sort) на языке Python:

def bogosort(arr): def correct(arr, comparator=lambda x: x): for i in range(1, len(arr)): if comparator(arr[i — 1]) — comparator(arr[i]) > 0: return False return True while not correct(arr): random.shuffle(arr) return arr

Для тестирования гипотез сгенерируем двумерный массив, в котором будет 8 случайно расположенных целочисленных значений и всего таких наборов в количестве 1000:

bigdata = np.array([[random.randint(0, 100) for _ in range(8)] for _ in range(1000)]) print(bigdata[:5]) # выводим первые 5 элементов

Проверить работу алгоритма можно на этом наборе данных, для учета времени используем встроенную магическую функцию Python%%time:

%%time bg = bigdata.copy() order_bg = list(map(bogosort, bg))
print(order_bg[:5]) # выводим первые 5 элементов

Всё успешно отсортировано за 4 минуты 32 секунды.

А если применить мультипроцессинг?

Теперь решим эту же задачу, применив мультипроцессинг.

Следует отметить, что здесь необходим иной подход к реализации функций, выделим в рамках этой задачи 2 подхода:

  • Первый подход состоит в декомпозиции задачи и параллельном выполнении вычислительных итераций. Однако, в конкретном случае, у нас есть только одна подзадача — случайное перемешивание. Разделение этой задачи на параллельные части не имеет смысла, поскольку процессор будет тратить время на координацию и синхронизацию параллельных процессов, что может увеличить накладные расходы и замедлить выполнение.
  • Вместо этого, я предлагаю второй подход — использование разделение данных на партиции и выполнения вычислений для каждой из них. Этот подход похож на методы, используемых в Apache Spark.

Перейдем к написанию кода для второго варианта, в данном случае функция будем сохранена изначальной, и у нас нет необходимости изолировать процесс, а после чего синхронизировать с общей очередью сбора данных, это нам позволяет сделать joblib и lambda функции python:

N_CORES = 12 # количество задействованных ядер процессора list_array = np.array_split(bigdata, N_CORES) data = Parallel(n_jobs=N_CORES, verbose=10)(delayed(lambda array: list(map(bogosort, array)))(array) for array in list_array)

Joblib предоставляет класс Parallel, который позволяет распределить выполнение итераций цикла или вызовы функций на несколько ядер процессора. Он может использовать различные методы параллелизма, включая использование процессов или потоков. В аргументе функции delayed обозначаю функцию, естественно, без вызова. Дальше должны упомянуть аргумент, для подачи в pipeline функции и объект, из которого его будем брать. Все это оформляется в формате list comprehended.

Помимо lambda, для удобства читаемости, можем объявить функцию multi_bogosort:

def multi_bogosort(ndarray): return list(map(bogosort, ndarray))

И тогда итоговый вариант с ней будет выглядеть, как:

N_CORES = 12 list_array = np.array_split(bigdata, N_CORES) data = Parallel(n_jobs=N_CORES, verbose=10)(delayed(multi_bogosort)(array) for array in list_array)

Обратите внимание, что joblib автоматически обрабатывает разделение данных и сбор результатов, поэтому вам не нужно беспокоиться о явном управлении процессами или потоками.

Посмотрим на время выполнения:

Видим значительное ускорение, но на деле не всё так «идеально», формат данных немного изменился и нам необходимо после разделения их снова слиять, допустим, следующим алгоритмом:

from functools import reduce merge_data = reduce(lambda x, y: x.extend(y) or x, data)

И проверим уже по традиции данные:

merge_data[:5]

Из‑за включения дополнительной предобработки и постобработки результатов, а также координацию и синхронизацию процессоров тратим некоторые время, а следовательно результат не будет иметь чистого t/N_cores зависимости.

Итоговое ускорение процесса с 4 мин 32 секунд (272 секунды) против 44.9 секунд, а это 6-ти кратное увеличение производительности.

Давайте также проведем тест для 6-ти процессоров для сравнения:

%%time N_CORES = 6 list_array = np.array_split(bigdata, N_CORES) data = Parallel(n_jobs=N_CORES, verbose=10)(delayed(multi_bogosort)(array) for array in list_array) merge_data = reduce(lambda x, y: x.extend(y) or x, data)

Ниже можно увидеть зависимость времени выполнения от количество задействованных ядер процессора для параллельной функции. (Важно отметить, что пункт с 12 ядрами стоит понимать, как 6 физических + 6 логических, и поэтому не увидели существенного прироста, т.к. 6 логических ядер — это потоки, и здесь уже оказывает влияние GIL).

Использование параллельных вычислений может принести значительную пользу в задачах анализа данных и машинного обучения. Особенно это важно, когда работа идет с действительно большими объемами данных. Зачастую в работе специалистов в сфере Data Science наиболее используемых инструмент — это интерактивные среды Jupyter, это обусловлено легкости проводимых экспериментов и тестирование в нём. А без возможности использовать параллелизм — функциональность ограничивается, и в этом случае нас выручает тот самый joblib.

Хочется добавить ещё один пример на реальной задаче в RL, когда нам необходимо найти оптимальное количество кластеров при помощи, так называемого, метода локтя. Кратко: алгоритм работает следующим образом, рассчитывает модель KMeans итеративно для определенной области поиска. После чего — подсчитывает определенную метрику, в данном случае буду использовать силуэт. И по итогу определяем оптимальную метрику, когда увеличения кластеров не даёт существенного прироста, интерпретируя это высказывания в график получаем что‑то наподобие сгиба локтя, когда ошибки становится относительно гладкой.

from sklearn.cluster import KMeans from sklearn.datasets import make_blobs from sklearn.metrics import silhouette_score from sklearn import preprocessing from sklearn.decomposition import PCA from sklearn.pipeline import Pipeline from sklearn.base import BaseEstimator, TransformerMixin from sklearn.model_selection import GridSearchCV class KMeansWithSilhouette(BaseEstimator, TransformerMixin): def __init__(self, n_clusters): self.n_clusters = n_clusters def fit(self, X, y=None): self.kmeans = KMeans(n_clusters=self.n_clusters) self.kmeans.fit(X) return self def transform(self, X): return self.kmeans.transform(X) def score(self, X, y=None): labels = self.kmeans.predict(X) return silhouette_score(X, labels) def calculate_silhouette_scores(X, cluster_range): pipeline = Pipeline([ (‘scaling’, preprocessing.StandardScaler()), (‘pca’, PCA(n_components=2)), (‘kmeans’, KMeansWithSilhouette(n_clusters=cluster_range)) ]) grid_search = GridSearchCV(pipeline, param_grid=<>, cv=5, n_jobs=1) grid_search.fit(X) return grid_search.best_score_

Реализуем функцию вычисления, без параллельного выполнения, также для теста возьмем случайно сгенерированы данные с 7-ю центроидами.

def calculate_elbow(X, cluster_range): silhouette_scores = [] for n in cluster_range: score = calculate_silhouette_scores(X, n) silhouette_scores.append(score) deltas = np.diff(silhouette_scores) elbow_index = np.argmax(deltas) + 1 return cluster_range[elbow_index] X, _ = make_blobs(n_samples=10000, n_features=100, centers=7, random_state=42) cluster_range = range(2, 15) start_time = time.time() elbow_value = calculate_elbow(X, cluster_range) elapsed_time = time.time() — start_time print(«The optimal number of clusters is:», elbow_value) print(«Execution time:», elapsed_time, «seconds»)

За 15,5 секунды просчитал 15 кластеров и выбрал оптимального количество = 3. Теперь сделаем это с применением joblib.

def calculate_elbow(X, cluster_range): silhouette_scores = Parallel(n_jobs=6)( delayed(calculate_silhouette_scores)(X, n) for n in cluster_range ) deltas = np.diff(silhouette_scores) elbow_index = np.argmax(deltas) + 1 return cluster_range[elbow_index] X, _ = make_blobs(n_samples=10000, n_features=100, centers=7, random_state=42) cluster_range = range(2, 15) start_time = time.time() elbow_value = calculate_elbow(X, cluster_range) elapsed_time = time.time() — start_time print(«The optimal number of clusters is:», elbow_value) print(«Execution time:», elapsed_time, «seconds»)

Результат существенно сократился в 3 раза, и получился равен = 4.9 секунды.

Важно отметить, что эффективность параллельного выполнения в Jupyter Notebook может быть ограничена некоторыми факторами, такими как наличие глобальной блокировки GIL (Global Interpreter Lock) в интерпретаторе Python. Это может снижать производительность при выполнении CPU‑интенсивных задач, даже при использовании параллельного выполнения. Также играют роль и накладные расходы (планирование, передача, синхронизация) на задействования нескольких ядер. Помимо этого, стоит не забывать про кэш и память. Следовательно необходимо находить «золотую» середину и она разнится от каждой задачи, а также и от архитектуры процессоров.

Заключение

Для достижения оптимального ускорения с помощью мультипроцессинга необходимо тщательно разработать и параллельно выполнить алгоритм, минимизировать коммуникацию и синхронизацию, а также обеспечить равномерное распределение нагрузки между ядрами. Кроме того, использование эффективных методов и техник параллелизации, таких как балансировка нагрузки и разделение данных, поможет максимизировать преимущества мультипроцессинга.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *