programing

기능을 병렬로 실행하는 방법은 무엇입니까?

showcode 2023. 6. 9. 22:12
반응형

기능을 병렬로 실행하는 방법은 무엇입니까?

제가 먼저 조사를 해봤는데 제 질문에 대한 답을 찾을 수 없었습니다.저는 파이썬에서 여러 기능을 병렬로 실행하려고 합니다.

나는 다음과 같은 것이 있습니다.

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

저는 func1과 func2에 전화해서 동시에 실행하고 싶습니다.함수는 서로 상호 작용하지 않거나 동일한 개체에서 상호 작용하지 않습니다.지금은 func1이 끝나기를 기다려야 func2가 시작됩니다.다음과 같은 작업을 수행하는 방법:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

분당 생성되는 파일 수를 계산하기 때문에 두 디렉터리를 거의 동시에 만들 수 있습니다.디렉터리가 없으면 타이밍이 맞지 않습니다.

또는 을 사용할 수 있습니다.

Cython의 특성상,threading진정한 병렬화를 달성할 가능성은 낮습니다. 이유로, 이한이유로러,multiprocessing일반적으로 더 나은 선택입니다.

다음은 완전한 예입니다.

from multiprocessing import Process


def func1():
    print("func1: starting")
    for i in range(10000000):
        pass

    print("func1: finishing")


def func2():
    print("func2: starting")
    for i in range(10000000):
        pass

    print("func2: finishing")


if __name__ == "__main__":
    p1 = Process(target=func1)
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    p1.join()
    p2.join()

하위 프로세스를 시작/연결하는 메커니즘은 다음과 같은 기능으로 쉽게 캡슐화할 수 있습니다.runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

기능이 주로 I/O 작업을 수행하고 CPU 작업이 적은 Python 3.2+를 사용하는 경우 ThreadPool Executer를 사용할 수 있습니다.

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

기능이 주로 CPU 작업을 수행하고 I/O 작업이 적은 경우 Python 3.2+를 사용하는 경우 ProcessPool Executer를 사용할 수 있습니다.

from concurrent.futures import ProcessPoolExecutor

def run_cpu_tasks_in_parallel(tasks):
    with ProcessPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])

또는 Python 2.6+만 사용하는 경우 멀티프로세싱 모듈을 직접 사용할 수 있습니다.

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])

이것은 파이썬 코드를 쉽게 병렬화하고 배포할 수 있는 시스템인 Ray로 우아하게 수행할 수 있습니다.

예를병하다면사음함정합여야니의다해수를용하을려렬제화▁with로 함수를 .@ray.remote 식가장나, 그고서그불들러다니입들을리▁them다▁decor▁with니,로 불러옵니다..remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

함수 인수를 에는 두함에동인전수인달고크수하이가면수다같를니습더방다과음법은효인율적는행하수일한를▁if▁using다▁를 사용하는 것이 더 효율적인 방법입니다.ray.put()이렇게 하면 큰 인수가 두 번 직렬화되고 두 개의 메모리 복사본이 생성되지 않습니다.

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

중요 - 경우func1()그리고.func2()결과가 반환되면 다음과 같이 코드를 다시 작성해야 합니다.

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

다중 처리 모듈에 비해 Ray를 사용하면 여러 가지 이점이 있습니다.특히, 동일한 코드가 단일 시스템과 시스템 클러스터에서 실행됩니다.Ray의 더 많은 이점은 이 관련 게시물을 참조하십시오.

두 개의 다른 매개 변수에 대해 호출해야 하는 단일 함수가 있는 것 같습니다.이 작업은 다음의 조합을 사용하여 우아하게 수행할 수 있습니다.concurrent.futures그리고.mapPython 3.2+와 함께 사용

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

바운드 에는 이작업제 IO 이있수습니사다다를 할 수 .ThreadPoolExecutor이와 같이:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

방법에 주목map여기서 사용되는 것은map변수 목록에 대한 함수입니다.

바인딩되어 에 바인딩된 기능을 할 수 .ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

확실하지 않은 경우 두 가지를 모두 시도하여 어느 것이 더 나은 결과를 제공하는지 확인할 수 있습니다.

마지막으로, 결과를 인쇄하려는 경우 다음과 같이 간단히 수행할 수 있습니다.

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)

2021년에 가장 쉬운 방법은 비동기식을 사용하는 것입니다.

import asyncio, time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():

    task1 = asyncio.create_task(
        say_after(4, 'hello'))

    task2 = asyncio.create_task(
        say_after(3, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

참조:

[1] https://docs.python.org/3/library/asyncio-task.html

만약 당신이 윈도우 사용자이고 python 3을 사용한다면, 이 게시물은 당신이 python에서 병렬 프로그래밍을 하는 것을 도울 것입니다.일반적인 멀티프로세싱 라이브러리의 풀 프로그래밍을 실행하면 프로그램의 주요 기능에 대한 오류가 발생합니다.창에 포크() 기능이 없기 때문입니다.아래 게시물은 언급된 문제에 대한 해결책을 제공하는 것입니다.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

저는 파이썬 3를 사용하고 있었기 때문에 다음과 같이 프로그램을 조금 변경했습니다.

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

이 기능 이후에는 위의 문제 코드도 다음과 같이 약간 변경됩니다.

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

그리고 다음과 같은 출력을 얻었습니다.

[1, 8, 27, 64, 125, 216]

저는 이 게시물이 일부 윈도우 사용자들에게 유용할 수 있다고 생각합니다.

당신이 원하는 것처럼 보이는 두 기능이 서로 동시에 실행된다는 것을 보장할 방법이 없습니다.

수 최선의 은 기능을 단계로 두 에서 끝날 입니다.Process.join@party의 대답이 언급하듯이.

은 이은보낫습다니다것보다 더 .time.sleep(10)정확한 시간을 보장할 수 없기 때문입니다.명시적으로 대기하는 경우, 기계에서 진행 중인 다른 상황에 따라 보장되지 않는 10ms 내에 수행될 것으로 가정하는 대신 다음 단계로 이동하기 전에 해당 단계를 실행해야 합니다.

(python에서 두 이상의 함수를 동시에 실행하려면 어떻게 해야 합니까?)

와 함께asynciosync은 다음과같이 할 수 .

import asyncio
import time

def function1():
    # performing blocking tasks
    while True:
        print("function 1: blocking task ...")
        time.sleep(1)

async def function2():
    # perform non-blocking tasks
    while True:
        print("function 2: non-blocking task ...")
        await asyncio.sleep(1)

async def main():
    loop = asyncio.get_running_loop()

    await asyncio.gather(
        # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
        loop.run_in_executor(None, function1),
        function2(),
    )

if __name__ == '__main__':
    asyncio.run(main())

언급URL : https://stackoverflow.com/questions/7207309/how-to-run-functions-in-parallel

반응형