跳转至

Python Multiprocessing 多进程并行计算

利用 Multiprocessing 实现多进程并行计算。

image-20220809160512676

在运行一些耗时较长的任务(例如 Python 中的循环)时,使用更多的 CPU 资源进行多进程并行计算将大大缩短程序的运行时间。本文记录了 Multiprocessing 的基本用法和学习过程中的理解。

创建与运行进程

Process, start, join

示例代码:

Python
import multiprocessing
import time
import datetime


def sleep(t):
    time.sleep(t)


if __name__ == "__main__":
    starttime = datetime.datetime.now()
    # creating processes
    p1 = multiprocessing.Process(target=sleep, args=(5,))
    p2 = multiprocessing.Process(target=sleep, args=(5,))

    # starting process 1
    p1.start()
    # starting process 2
    p2.start()

    # wait until process 1 is finished
    p1.join()
    # wait until process 2 is finished
    p2.join()

    endtime = datetime.datetime.now()
    print("多进程一共耗时{}秒".format((endtime - starttime)))

两个进程分别执行了sleep(5),总耗时约等于 5 秒。如果不使用多进程,则需要运行 10 秒。

image-20220809160512676

  • p1 = multiprocessing.Process(target=sleep, args=(5, ))新建了一个进程。这个进程要执行的函数是sleep(),参数是5

  • 如果只有一个参数,那么5后面的逗号的后面留空,但是逗号不能省略。

  • 如果有两个或以上的参数,那么只需要将所有参数用括号括起来。

  • p1.start()开始执行这个进程。

  • p1.join()可以在当前位置阻塞主进程。这意味着,必须等p1这个进程执行完毕,才能执行p1.join()这行代码后面的代码。

  • 如果不加p1.join()p2.join(),那么主程序会立即结束,而不会等待p1p2运行完成后再结束。所以要记得加p1.join()p2.join()

image-20220809160423709

pid, is_alive

示例代码:

Python
# importing the multiprocessing module
import multiprocessing
import os


def worker1():
    # printing process id
    print("ID of process running worker1: {}".format(os.getpid()))


def worker2():
    # printing process id
    print("ID of process running worker2: {}".format(os.getpid()))


if __name__ == "__main__":
    # printing main program process id
    print("ID of main process: {}".format(os.getpid()))

    # creating processes
    p1 = multiprocessing.Process(target=worker1)
    p2 = multiprocessing.Process(target=worker2)

    # starting processes
    p1.start()
    p2.start()

    # process IDs
    print("ID of process p1: {}".format(p1.pid))
    print("ID of process p2: {}".format(p2.pid))

    # wait until processes are finished
    p1.join()
    p2.join()

    # both processes finished
    print("Both processes finished execution!")

    # check if processes are alive
    print("Process p1 is alive: {}".format(p1.is_alive()))
    print("Process p2 is alive: {}".format(p2.is_alive()))

image-20220809161902345

  • pid是 Process ID,即当前进程的 ID。

  • 在 multiprocessing 的 Process 对象中,可以用p1.pid直接查看p1的进程 ID。

  • 在程序运行时,可以用os.getpid()查看此处的进程 ID。

  • p1.is_alive()可以判断 Process 是否仍然存在。当执行p1.join()后,p1便被释放了,因此返回False

进程之间的内存共享和数据传输

multiprocessing.Array, multiprocessing.Value创建可以共享的变量

前文所实现的多进程中,各个进程之间的运行是相互独立的,各个进程有自己的内存空间。在某一个进程中进行的运算,得到的结果是不会影响其他进程的内存的。要想在多个进程之间共享内存,可以用multiprocessing中的ArrayValue对象。

示例代码:

Python
import multiprocessing


def square_list(mylist, result, square_sum):
    """
    function to square a given list
    """
    # append squares of mylist to result array
    for idx, num in enumerate(mylist):
        result[idx] = num * num

    # square_sum value
    square_sum.value = sum(result)

    # print result Array
    print("Result(in process p1): {}".format(result[:]))

    # print square_sum Value
    print("Sum of squares(in process p1): {}".format(square_sum.value))


if __name__ == "__main__":
    # input list
    mylist = [1, 2, 3, 4]

    # creating Array of int data type with space for 4 integers
    result = multiprocessing.Array("i", 4)

    # creating Value of int data type
    square_sum = multiprocessing.Value("i")

    # creating new process
    p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))

    # starting process
    p1.start()

    # wait until the process is finished
    p1.join()

    # print result array
    print("Result(in main program): {}".format(result[:]))

    # print square_sum Value
    print("Sum of squares(in main program): {}".format(square_sum.value))

image-20220809163920994

  • result = multiprocessing.Array('i', 4)创建result变量,它可以在主进程和p1进程中共享。在p1中的运算更新了result,意味着在主进程中也被更新了。
  • i代表 interger,即整形。其他的,例如d代表浮点型。
  • 4是这个数组的长度。
  • square_sum = multiprocessing.Value('i')创建square_sum变量。
  • 同样地,i代表 interger。
  • 也可以指定初始值(例如 10),用square_sum = multiprocessing.Value('i', 10)

Manager创建可以共享的变量,支持更多数据结构

ArrayValue能创建的数据类型有限。如果想对更多的数据结构进行共享,可以用Manager来创建。

示例代码:

Python
import multiprocessing


def print_records(records):
    """
    function to print record(tuples) in records(list)
    """
    for record in records:
        print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))


def insert_record(record, records):
    """
    function to add a new record to records(list)
    """
    records.append(record)
    print("New record added!\n")


if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
        # creating a list in server process memory
        records = manager.list([("Sam", 10), ("Adam", 9), ("Kevin", 9)])
        # new record to be inserted in records
        new_record = ("Jeff", 8)

        # creating new processes
        p1 = multiprocessing.Process(target=insert_record, args=(new_record, records))
        p2 = multiprocessing.Process(target=print_records, args=(records,))

        # running process p1 to insert new record
        p1.start()
        p1.join()

        # running process p2 to print records
        p2.start()
        p2.join()

image-20220809170054409

  • with multiprocessing.Manager() as manager:
  • records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)])创建了records变量,它是一个 List,并且可以在进程之间共享。

  • 类似地,可以用manager.dict()创建可以共享的字典。

Queue创建共享变量

示例代码:

Python
import multiprocessing


def square_list(mylist, q):
    """
    function to square a given list
    """
    # append squares of mylist to queue
    for num in mylist:
        q.put(num * num)


def print_queue(q):
    """
    function to print queue elements
    """
    print("Queue elements:")
    while not q.empty():
        print(q.get())
    print("Queue is now empty!")


if __name__ == "__main__":
    # input list
    mylist = [1, 2, 3, 4]

    # creating multiprocessing Queue
    q = multiprocessing.Queue()

    # creating new processes
    p1 = multiprocessing.Process(target=square_list, args=(mylist, q))
    p2 = multiprocessing.Process(target=print_queue, args=(q,))

    # running process p1 to square list
    p1.start()
    p1.join()

    # running process p2 to get queue elements
    p2.start()
    p2.join()

image-20220809171959194

  • q = multiprocessing.Queue()创建q变量。
  • 首先在p1进程中,将mylist中的元素逐一平方并放到q中。
  • 然后在p2进程中,将q中的元素注意取出并打印。

Pipes在两个进程之间传输数据

示例代码:

Python
import multiprocessing


def sender(conn, msgs):
    """
    function to send messages to other end of pipe
    """
    for msg in msgs:
        conn.send(msg)
        print("Sent the message: {}".format(msg))
    conn.close()


def receiver(conn):
    """
    function to print the messages received from other
    end of pipe
    """
    while 1:
        msg = conn.recv()
        if msg == "END":
            break
        print("Received the message: {}".format(msg))


if __name__ == "__main__":
    # messages to be sent
    msgs = ["hello", "hey", "hru?", "END"]

    # creating a pipe
    parent_conn, child_conn = multiprocessing.Pipe()

    # creating new processes
    p1 = multiprocessing.Process(target=sender, args=(parent_conn, msgs))
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,))

    # running processes
    p1.start()
    p2.start()

    # wait until processes finish
    p1.join()
    p2.join()

image-20220809173008703

  • parent_conn, child_conn = multiprocessing.Pipe()创建了一个Pipe
  • conn.send(msg)msg传到pipe的另一端。
  • msg = conn.recv()pipe接收信息,并将信息存入msg

  • 注意,这里的p1p2是同时进行的,因此senderreceiver打印出来的信息可能会发生错乱(例如没有换行),这是正常的。但发送和接受信息的顺序不会乱,例如,hellohey先被发送,那么前者也比后者先被接收。

防阻塞:Lock锁定资源

共享资源可能会造成进程之间的竞争(Race condition)。

示例代码:

Python
# Python program to illustrate
# the concept of race condition
# in multiprocessing
import multiprocessing


# function to withdraw from account
def withdraw(balance):
    for _ in range(10000):
        balance.value = balance.value - 1


# function to deposit to account
def deposit(balance):
    for _ in range(10000):
        balance.value = balance.value + 1


def perform_transactions():
    # initial balance (in shared memory)
    balance = multiprocessing.Value("i", 100)

    # creating new processes
    p1 = multiprocessing.Process(target=withdraw, args=(balance,))
    p2 = multiprocessing.Process(target=deposit, args=(balance,))

    # starting processes
    p1.start()
    p2.start()

    # wait until processes are finished
    p1.join()
    p2.join()

    # print final balance
    print("Final balance = {}".format(balance.value))


if __name__ == "__main__":
    for _ in range(10):
        # perform same transaction process 10 times
        perform_transactions()

两个同时运行的进程,在访问资源时没有先后顺序,两者都是基于自己的资源进行计算,没有考虑到对方的计算结果,因而计算结果不符合预期。

image-20220809223605553

加上lock之后的代码:

Python
# Python program to illustrate
# the concept of locks
# in multiprocessing
import multiprocessing


# function to withdraw from account
def withdraw(balance, lock):
    for _ in range(10000):
        lock.acquire()
        balance.value = balance.value - 1
        lock.release()


# function to deposit to account
def deposit(balance, lock):
    for _ in range(10000):
        lock.acquire()
        balance.value = balance.value + 1
        lock.release()


def perform_transactions():
    # initial balance (in shared memory)
    balance = multiprocessing.Value("i", 100)

    # creating a lock object
    lock = multiprocessing.Lock()

    # creating new processes
    p1 = multiprocessing.Process(target=withdraw, args=(balance, lock))
    p2 = multiprocessing.Process(target=deposit, args=(balance, lock))

    # starting processes
    p1.start()
    p2.start()

    # wait until processes are finished
    p1.join()
    p2.join()

    # print final balance
    print("Final balance = {}".format(balance.value))


if __name__ == "__main__":
    for _ in range(10):
        # perform same transaction process 10 times
        perform_transactions()
  • lock = multiprocessing.Lock()可以创建一个Lock对象。
  • 变量lock作为参数传入两个进程的函数中。
  • 在函数执行前后加上lock.acquire()lock.release(),可以暂时锁定资源,不被其他进程影响。

进程池Pooling

在第一节中,我们通过新建Process对象来创建进程。如果有很多个进程,我们岂不是要写很多行创建Process的语句?

Pool对象可以方便地一次创建很多进程,将这些进程放到一个“池子”里,如果有一个进程运行结束了,那么空闲下的 CPU 就可以继续完成剩余的任务,这样就可以最大化地利用计算性能。

Python
# Python program to understand
# the concept of pool
import multiprocessing
import os


def square(n):
    print("Worker process id for {0}: {1}".format(n, os.getpid()))
    return n * n


if __name__ == "__main__":
    # input list
    mylist = [1, 2, 3, 4, 5]

    # creating a pool object
    p = multiprocessing.Pool()

    # map list to target function
    result = p.map(square, mylist)

    print(result)
  • p = multiprocessing.Pool()创建了一个Pool对象。括号中还可以指定最多使用多少个 CPU。
  • result = p.map(square, mylist)将任务(也就是square函数)分配给进程池p,传入给任务函数的参数列表放在mylist中。
  • 每个进程完成任务后所返回的结果会汇总到result列表中。

评论