Python: programación multiproceso en acción
Table of Contents
- Directos al código
- Lanzar varios procesos
- Esperando a que terminen los procesos hijos: join
- Funciones del módulo multiprocessing
- Extender la clase Process
- Comunicación entre procesos o IPC (Interprocess Communication): el problema
- Comunicación entre procesos o IPC (Interprocess Communication): soluciones
- Sección crítica
Con la programación multiproceso podemos explotar las capacidades de paralelismo de un sistema. Con Python, tenemos un módulo, multiprocessing.Process, con el que podemos crear nuevos procesos que se ejecutan concurrentemente. Que se ejecuten en paralelo o no dependerá del sistema en el que se ejecuta, eso es algo que no podemos controlar nosotros como programadores de aplicaciones de usuario.
Cuando escribimos un programa en Python, este es el denominado proceso padre. En este programa podemos crear nuevos procesos que serán los procesos hijos. Es decir, cada vez que usemos el módulo multiprocessing.Process estaremos creando un proceso hijo.
Puedes leer la documentación al completo de este módulo en la Documentación oficial de Python.
Veremos que, dentro del módulo multiprocessing, existen más objetos (clases y funciones) como son:
Process: del que ya se ha hablado, se usa para crear procesos.Pool: es una clase con la que crear "piscinas", conjuntos, de procesos.Queue: es una clase que implementa una cola que permite compartir datos de forma segura entre procesos.Pipe: es una función que devuelve una conexión entre un par de objetos.Lock: es una clase que ofrece sincronización para el bloqueo seguro de procesos.
En los siguientes apartados, por medio de ejemplos, te muestro cómo trabajar con procesos en Python.
Directos al código
Vamos directos a ver cómo crear un proceso con el módulo multiprocessing.Process. El código está comentado para que entiendas que se hace en cada parte:
from multiprocessing import Process def worker(num): print(f"Worker: {num}") if __name__ == "__main__": # 1. Crea el objeto de tipo Process process = Process(target=worker, args=(1,)) # 2. Lanza el proceso process.start() # 3. Se espera a que termine el proceso process.join()
Este programa crear un proceso que ejecuta la función worker. El proceso padre crea el proceso con Process, lo lanza/ejecuta llamando a start() y espera que termine con join(). Esta es la estructura básica.
La clase Process necesite que indiques como target la función que se va a ejecutar cuando se llame a start(). Si la función que indicas en target necesita argumentos, estos se indican por medio de args que recibe la tupla con la lista de valores a pasar a la lista de argumentos.
Lanzar varios procesos
Primero el código del programa en Python:
from multiprocessing import Process, current_process from time import sleep from random import randint def task(n1, n2): p = current_process() sleep(randint(1, 3)) print(f"Suma proceso {p.pid}: {n1} + {n2} = {n1 + n2}") if __name__ == "__main__": for _ in range(5): process = Process(target=task, args=(10, 15)) process.start()
Este programa (proceso padre) crea 5 procesos hijos que realizan la misma suma.
Process(target=task, args=(10, 15)):la claseProcesses la que usamos para crear un proceso nuevo (subproceso o proceso hijo del actual). Necesita que le indiquemos la función que se ejecutará en el nuevo proceso hijo, a través del argumentotarget, y, si dicha función necesita argumentos, hay que pasarle los argumentos por medio de una tupla enargs.process.start(): tenemos que llamar al métodostartpara que el proceso hijo, ya creado, comience su ejecución.current_process(): con esta función podemos obtener el proceso que está ejecutando el código desde el que se llama.
Esperando a que terminen los procesos hijos: join
En el código anterior se están lanzando los procesos hijos sin ningún control sobre los mismos por parte del padre. Hay situaciones y escenarios donde vamos a necesitar sincronizar padre e hijos.
En el siguiente ejemplo tenemos una versión del programa anterior en el que el padre espera que terminen todos los hijos antes de acabar él mismo para imprimir un mensaje final por pantalla.
Se usa el método join que bloquea al padre hasta que el hijo concreto termine:
from multiprocessing import Process, current_process from time import sleep from random import randint def task(n1, n2): p = current_process() sleep(randint(1, 3)) print(f"Suma proceso {p.pid}: {n1} + {n2} = {n1 + n2}") if __name__ == "__main__": processes = [ Process(target=task, args=(10, 15)) for _ in range(5) ] for p in processes: p.start() for p in processes: p.join() print(f"Proceso padre, con PID={current_process().pid} terminado")
Primero se crean los procesos, luego se lanzan para que se ejecuten en paralelo y, por último, el proceso padre espera a que todos terminen.
Asegúrate que, llegados a este punto, sabes responder a esta pregunta: ¿por qué el siguiente código no ejecutaría los hijos en paralelo?
for p in processes: p.start() p.join()
Funciones del módulo multiprocessing
El módulo multiprocessing nos ofrece gran cantidad de propiedades y funciones con las que podemos obtener información relativa a los procesos.
Este es un ejemplo en el que se usan las funciones:
current_processpara obtener el proceso que está ejecutando la función o el código donde se llama.parent_processpara obtener el proceso padre que creó el proceso hijo desde el que se llama a esta función.cpu_countpara obtener el número de cores que tiene la CPU en la que se ejecuta el programa desde el cual se llama a dicha función.
Aquí puedes ver un ejemplo en el que se usan estas dos funciones para mostrar información adicional para completar el programa anterior:
from multiprocessing import Process, current_process, parent_process, cpu_count from time import sleep from random import randint def task(n1, n2): p = current_process() sleep(randint(1, 3)) print(f"Suma proceso {p.pid}: {n1} + {n2} = {n1 + n2} | Proceso padre: {parent_process().pid}") if __name__ == "__main__": print(f"Ejecutado con CPU con un total de {cpu_count()} cores") processes = [ Process(target=task, args=(10, 15)) for _ in range(5) ] for p in processes: p.start() for p in processes: p.join() print(f"Proceso padre, con PID={current_process().pid} terminado")
Extender la clase Process
En escenarios más complejos, donde la lógica de los procesos hijos se complica, heredar de la clase Process puede ser una buena solución.
Crear clases que heredan de la clase Process te permitirá, además, tener un código mejor organizado, más fácil de probar y mantener.
Para crar una subclase de Process solo tienes que sobreescribir el método run que es el método que se ejecuta, de la clase Process, cuando se llama al método start.
Si necesitas pasar datos de entrada al proceso hijo, hazlo a través del constructor para disponer de esos datos vía atributos de clase. Aquí, el ejemplo en el que se crean varios procesos que suman usando una clase derivada en vez de una función:
from multiprocessing import Process, current_process from time import sleep from random import randint class Adder(Process): def __init__(self, n1, n2): super().__init__() self._n1 = n1 self._n2 = n2 def run(self): sleep(randint(1, 3)) print(f"Suma proceso con PID {current_process().pid}: {self._n1} + {self._n2} = {self._n1 + self._n2}") if __name__ == "__main__": processes = [ Adder(10, 15) for _ in range(5) ] for p in processes: p.start() for p in processes: p.join() print(f"Proceso padre, con PID={current_process().pid} terminado")
Comunicación entre procesos o IPC (Interprocess Communication): el problema
Vamos a modificar el programa anterior:
- En primer lugar, cada proceso hijo va a realizar la suma de dos números aleatorios (hasta ahora siempre sumaban el número 10 y el número 15).
- Y, en segundo lugar, queremos realizar el promedio de las sumas de los 5 procesos hijos. Esta media la hará el padre.
Estos cambios nos obligan a compartir datos entre los hijos y el padre, ya que este último necesita conocer el resultado de las sumas para hacer el promedio.
Para terminar de entender lo que queremos hacer, veamos un caso de uso:
- El proceso 1 suma los número 3 y 7:
3 + 7 = 10 - El proceso 2 suma los número 1 y 3:
1 + 3 = 4 - El proceso 3 suma los número 4 y 4:
4 + 4 = 8 - El proceso 4 suma los número 9 y 5:
9 + 5 = 14 - El proceso 5 suma los número 6 y 3:
6 + 3 = 9 - De alguna manera vamos a tener que sumar los resultados:
10 + 4 + 8 + 14 + 9 = 45 - Y, por último, obtener el promedio:
45 / 5 = 9
Una solución podría ser usar una variable global que usen todos los procesos hijos para añadir sus resultados. Vamos directos al código:
from multiprocessing import Process, current_process from time import sleep from random import randint # Variable global para acumular las sumas shared_total: int = 0 class Adder(Process): def __init__(self, n1, n2): super().__init__() self._n1 = n1 self._n2 = n2 def run(self): # En Python tenemos que usar la instrucción global para indicar que # la variable que vamos a usar aquí es la variable global. # Si no lo hacemos así va a crear una variable local a este método # llamada shared_local. global shared_total sleep(randint(1, 3)) shared_total += self._n1 + self._n2 print(f"Suma proceso con PID {current_process().pid}: {self._n1} + {self._n2} = {self._n1 + self._n2}") print(f" Acumulado hasta ahora: {shared_total}") print() if __name__ == "__main__": processes = [ Adder(randint(1, 10), randint(1, 10)) for _ in range(5) ] for p in processes: p.start() for p in processes: p.join() print(f"Proceso padre, con PID={current_process().pid}") print(f" Promedio: {shared_total} / 5 = {shared_total / 5}")
Al ejecutar el programa, vemos en la salida "cosas extrañas":
Suma proceso con PID 28931: 2 + 4 = 6
Acumulado hasta ahora: 6
Suma proceso con PID 28932: 1 + 6 = 7
Acumulado hasta ahora: 7
Suma proceso con PID 28933: 8 + 10 = 18
Acumulado hasta ahora: 18
Suma proceso con PID 28934: 8 + 9 = 17
Acumulado hasta ahora: 17
Suma proceso con PID 28935: 10 + 7 = 17
Acumulado hasta ahora: 17
Proceso padre, con PID=28930
Promedio: 0 / 5 = 0.0
- El valor de la variable global
shared_totalen cada proceso coincide con la suma parcial que hacen (no acumula como esperábamos). - El proceso padre se encuentra que la variable global
shared_totales igual a0(el valor con el que se inicializó dicha variable).
No hay nada extraño, en realidad, si lo pensamos bien y recordamos la parte teórica de procesos en la que ya hablamos de que cada proceso tiene su propio espacio en memoria. Esto significa, en la práctica, y en este ejemplo, que cada proceso tiene su propia copia de sharedtotal.
Comunicación entre procesos o IPC (Interprocess Communication): soluciones
Existen varios métodos de comunicación entre procesos. Nosotros nos vamos a centrar en dos:
- Memoria Compartida: este método permite que varios procesos accedan a la misma región de memoria. Es muy eficiente porque evita la necesidad de copiar datos entre procesos. Sin embargo, requiere mecanismos de sincronización (como semáforos o mutexes) para evitar condiciones de carrera (lo veremos más adelante).
- Ficheros: los procesos pueden leer y escribir en ficheros en el sistema de archivos. Este método es más sencillo de implementar y no requiere sincronización en la memoria, pero puede ser más lento debido a la necesidad de acceder al disco.
Memoria compartida
Para implementar memoria compartida debemos usar mecanismos de sincronización en los que no hemos entrado todavía, así que, de momento vamos a obviar los problemas que esto conlleva con la intención de entender cómo se crea y se usa la memoria compartida.
La memoria compartida es un concepto general e independiente del lenguaje de programación a usar. Nosotros vamos a ver cómo se hace en Python pero se puede implementar con cualquier lenguaje como C, C++, Java, etc.
El Sistema Operativo es el encargado de crear dicha memoria compartida y mapearla en los procesos que la van a usar. Dicha memoria compartida la podríamos representar por medio del siguiente dibujo o esquema:
Figure 1: Memoria compartida entre dos procesos
Al utilizar lenguajes de alto nivel, nos abstraemos de estos detalles. En Python disponemos de dos clases, dentro del módulo multiprocessing, para compartir datos entre procesos:
multiprocessing.Valuepara compartir un único valormultiprocessing.Arraypara compartir varios valores a través de un array
Aquí te muestro el programa que intentábamos hacer anteriormente. Este sí funciona gracias al uso de Value para crear ese valor compartido:
from multiprocessing import Process, current_process, Value from time import sleep from random import randint class Adder(Process): def __init__(self, n1, n2, shared_total): super().__init__() self._n1 = n1 self._n2 = n2 self._shared_total = shared_total def run(self): self._shared_total.value += self._n1 + self._n2 print(f"Suma proceso con PID {current_process().pid}: {self._n1} + {self._n2} = {self._n1 + self._n2}") print(f" Acumulado hasta ahora: {self._shared_total.value}") print() if __name__ == "__main__": shared_total = Value("i", 0) processes = [ Adder(randint(1, 10), randint(1, 10), shared_total) for _ in range(5) ] for p in processes: p.start() for p in processes: p.join() print(f"Proceso padre, con PID={current_process().pid}") print(f" Promedio: {shared_total.value} / 5 = {shared_total.value / 5}")
Podemos crear la variable compartida en el proceso padre con el constructor de la clase Value y pasamos esa variable a los procesos.
El constructor de Value recibe dos parámetros:
- El tipo de la variable por medio de un typecode (un código de tipo en formato
str). Puedes ver la lista de typecodes aquí. - El valor inicial, que dependerá del tipo.
En nuestro caso hemos usado el constructor Value("i", 0) para crear una variable compartida de tipo entero con signo inicializado con valor 0.
Al ejecutar el programa obtenemos el resultado que esperábamos:
Suma proceso con PID 30700: 5 + 2 = 7
Acumulado hasta ahora: 7
Suma proceso con PID 30703: 5 + 6 = 11
Acumulado hasta ahora: 18
Suma proceso con PID 30699: 4 + 3 = 7
Acumulado hasta ahora: 25
Suma proceso con PID 30702: 4 + 3 = 7
Acumulado hasta ahora: 32
Suma proceso con PID 30701: 7 + 5 = 12
Acumulado hasta ahora: 44
Proceso padre, con PID=30698
Promedio: 44 / 5 = 8.8
A continuación tienes un ejemplo en el que se usa la clase Array para compartir un array de números enteros que va a ser usado por 5 procesos para añadir un valor aleatorio a dicho array. Al final de la ejecución, en el proceso padre, tendremos un array de 5 valores enteros aleatorios:
from multiprocessing import Process, Array from random import randint def append_number(shared_array, i): """Añade un número aleatorio al array en la posición i-ésima.""" shared_array[i] = randint(1, 10) if __name__ == "__main__": # Crea un array de 5 número enteros inicializados a 0 shared_array = Array("i", [0 for _ in range(5) ]) # Crea 5 procesos, cada uno de los cuales ejecuta la función append_number processes = [ Process(target=append_number, args=(shared_array, i)) for i in range(5) ] # Da compienzo a la ejecución de los 5 procesos for p in processes: p.start() # Esperamos que terminen todos los procesos for p in processes: p.join() # Resultado final print(f"Array final: {list(shared_array)}")
La clase Array no permite ni añadir ni eliminar elementos de dicho array.
Uniéndolo todo, lo que se hace internamente cuando usamos multiprocessing.Value o multiprocessing.Array es crear un espacio de memoria compartido mapeado en todos los procesos donde se guarda ese Value o ese Array. Así, todos los procesos pueden acceder a ese Value o ese Array.
En el siguiente gráfico tenemos representado el escenario del programa donde usábamos un multiprocessing.Value para ir acumulando las sumas (shared_total). Como ves, en realidad, la copia de la variable creada con multiprocessing.Value es compartida:
Figure 2: Representacion concreta de memoria compartida
Si ejecutas estos ejemplos, probablemente, comprobarás que funcionan sin ningún problema y que el resultado es el esperado. Si quitas del método run de la clase Adder el sleep verás que el resultado, en algunas ocasiones, no es el esperado. Estos ejemplos presentan un problema en potencia porque hay un riesgo de condición de carrera. Esto se dará si dos procesos intentan acceder y modificar la memoria compartida en el mismo instante. En este caso el resultado no es predecible. Veremos, más adelante, cómo resolver este problema.
Compartir datos por medio de ficheros
El uso de fichero como método de IPC es una técnica mucho más leta que la de memoria compartida pero es muy fácil de usar e implementar.
Consiste en usar ficheros del sistema para almacenar información a la que podrán acceder todos los procesos.
Sin más, veamos cómo usar este método de IPC usando el mismo programa que en el caso anterior:
from multiprocessing import Process, current_process from time import sleep from random import randint class Adder(Process): def __init__(self, n1, n2): super().__init__() self._n1 = n1 self._n2 = n2 def run(self): with open("shared_total.txt", "w+") as fd: current_value = fd.readline() or 0 accum = int(current_value) + self._n1 + self._n2 fd.write(str(accum)) print(f"Suma proceso con PID {current_process().pid}: {self._n1} + {self._n2} = {self._n1 + self._n2}") print(f" Acumulado hasta ahora: {accum}") print() if __name__ == "__main__": processes = [ Adder(randint(1, 10), randint(1, 10)) for i in range(5) ] for p in processes: p.start() for p in processes: p.join() print(f"Proceso padre, con PID={current_process().pid}") with open("shared_total.txt", "r") as fd: total = fd.readline() or 0 print(f" Promedio: {int(total)} / 5 = {int(total) / 5}")
Si has ejecutado el código anterior te habrás dado cuenta que no funciona. La salida esperada no es la que obtenemos. Por ejemplo:
Suma proceso con PID 36379: 5 + 9 = 14
Acumulado hasta ahora: 14
Suma proceso con PID 36381: 1 + 2 = 3
Acumulado hasta ahora: 3
Suma proceso con PID 36380: 3 + 10 = 13
Acumulado hasta ahora: 13
Suma proceso con PID 36383: 9 + 2 = 11
Acumulado hasta ahora: 11
Suma proceso con PID 36382: 3 + 4 = 7
Acumulado hasta ahora: 7
Proceso padre, con PID=36378
Promedio: 71 / 5 = 14.2
Se está produciendo lo que conocemos como condiciones de carrera por falta de sincronización. Estamos usando un recurso compartido: el mismo fichero para todos los procesos. Y no hemos controlado el acceso a dicho fichero, que debería ser accedido en orden y solo por un proceso a la vez.
Esto nos lleva a introducir los siguiente apartados: sección crítica y gestión de procesos a esta sección crítica.
Una solución simple al problema sería que cada proceso usara un fichero diferente pero, este caso concreto, tampoco se puede solucionar así porque quiero obtener, en los procesos hijos, la suma acumulada hasta el momento.
Sección crítica
Se llama sección o región crítica, en programación concurrente, a la porción de código de un programa en la que se accede a un recurso compartido que no debe ser accedido por más de un proceso o hilo.
Para manejar estas secciones críticas existen diferentes mecanismos que permiten sincronizar a estos procesos y/o hilos que compiten por el recurso compartido. Nosotros vamos a estudiar dos:
- Locks que permiten restringir el acceso a un recurso compartido para que solo un proceso/hilo acceda a él y no se produzcan accesos simultáneaos.
- Semáforos que son un caso concreto de lock en el que se permite el acceso a una sección crítica a uno o varios procesos.
Cuando usamos locks o semáforos, si un proceso intenta acceder al recurso compartido bajo dicho lock o semáforo, entonces el proceso/hilo quedará bloqueado hasta que se libere.
💥 Cuando dos o más procesos/hilos acceden simultáneamente a una sección crítica, se genera un bug conocido como condición de carrera, lo que puede llevar a resultados impredecibles debido a la interferencia entre ellos.
Sincronización con Lock
En los ejemplos anteriores de nuestro programa Adder teníamos un problema de sincronización con respecto al acceso a la memoria compartida.
La solución es usar exclusión mutua mediante un candado (lock). Este es un mecanismo atómico que permite a un proceso acceder a una sección crítica sin interferencia de otros procesos. Una vez que el proceso termina su operación en la sección crítica, libera el lock, permitiendo así el acceso al siguiente proceso. No utilizar un lock puede resultar en condiciones de carrera, donde múltiples procesos modifican datos compartidos de manera insegura.
En el siguiente código he usado este mecanismo de exclusión mutual por medio de un candado. He comentado las novedades añadidas y explico cómo funciona el Lock en Python:
# Hay que importar la clase Lock del módulo multiprocessing from multiprocessing import Process, current_process, Value, Lock from time import sleep from random import randint class Adder(Process): # En el constructor recibimos el objeto lock a utilizar def __init__(self, n1, n2, shared_total, lock): super().__init__() self._n1 = n1 self._n2 = n2 self._shared_total = shared_total # Lo guardo en un atributo de la clase para poder usarlo en todos los métodos self._lock = lock def run(self): # Usamos un "context manager" para proteger el bloque de código: seccion crítica # Solo un proceso podrá obtener el lock with self._lock: self._shared_total.value += self._n1 + self._n2 print(f"Suma proceso con PID {current_process().pid}: {self._n1} + {self._n2} = {self._n1 + self._n2}") print(f" Acumulado hasta ahora: {self._shared_total.value}") print() if __name__ == "__main__": shared_total = Value("i", 0) # Creamos un objeto del tipo lock, que será el que usemos en todos los procesos lock = Lock() # Hay que pasar a cada uno de los procesos el mismo lock processes = [ Adder(randint(1, 10), randint(1, 10), shared_total, lock) for _ in range(5) ] for p in processes: p.start() for p in processes: p.join() print(f"Proceso padre, con PID={current_process().pid}") print(f" Promedio: {shared_total.value} / 5 = {shared_total.value / 5}")
Sincronización con Semaphore
Un semáforo es una caso particular de lock en el que se permiten varios procesos al mismo tiempo en la sección crítica.
Por ejemplo, imagina un sistema que gestiona el acceso a tres impresoras en una empresa. Dicho sistema solo puede permitir el acceso a tres procesos porque solo hay tres impresoras. Cuando uno de estos procesos termine con la impresora se podrá dar paso al siguiente proceso.
Veamos este caso en código:
# Hay que importar la clase Sempahore del módulo multiprocessing from multiprocessing import Process, current_process, Semaphore from time import sleep from random import randint class Printer(Process): # En el constructor recibimos el objeto de tipo Semaphore def __init__(self, semaphore): super().__init__() # Lo guardo en un atributo de la clase para poder usarlo en todos los métodos self._semaphore = semaphore def run(self): # Usamos un "context manager" para proteger el bloque de código: seccion crítica # Solo un proceso podrá obtener el lock with self._semaphore: print(f"[START] Imprimiendo proceso {current_process().pid}") sleep(randint(1, 3)) print(f"[END] Imprimiendo proceso {current_process().pid}") if __name__ == "__main__": # Creamos un semáforo que permita el acceso de 3 procesos a la sección crítica semaphore = Semaphore(3) # Hay que pasar a cada uno de los procesos el mismo lock processes = [ Printer(semaphore) for _ in range(7) ] for p in processes: p.start() for p in processes: p.join()