Multithreading
Objectives: Create queue of data to be processed and queue of processing results. Spawn T threads, where each thread is a worker that performs data processing.
Python
CPython 3.8.10 has GIL (global interpreter lock) that prevents threads from running in parallel. However multiple threads can be spawned and run concurently.
from random import randint
from time import sleep
from threading import Thread
from queue import Queue
N = 100
T = 3
def shift(q_in, q_out, offset):
while True:
x = q_in.get()
if x is None:
break
q_out.put(x + offset)
sleep(0.001 * randint(0, 10))
q_in = Queue()
q_out = Queue()
offsets = ((i+1)*N for i in range(T))
threads = [Thread(target=shift, args=(q_in, q_out, offset))
for offset in offsets]
[t.start() for t in threads]
for x in range(N):
q_in.put(x)
[q_in.put(None) for _ in threads]
[t.join() for t in threads]
for x in range(N):
print(q_out.get())
In practice ThreadPoolExecutor
can efficiently simplify this code. However, for the sake of this exercise we use Thread
in order to explore more of this field.
Rust
Rust supports threads that can be run in parallel. Crate flume has been used as a replacement of std::sync::mpsc
. Flume provides multiple-producers-multiple-consumers channels. Additionally crate rand provides random numbers for demonstrational purposes.
use std::time; use std::thread; use rand::Rng; const N: i32 = 100; const T: i32 = 3; fn shift(q_in: flume::Receiver<Option<i32>>, q_out: flume::Sender<Option<i32>>, offset: i32) { let ms = time::Duration::from_millis(1); let mut rng = rand::thread_rng(); loop { let x = q_in.recv().unwrap(); match x { Some(v) => { q_out.send( Some(v + offset) ).unwrap(); } None => { break; } } thread::sleep(rng.gen_range(0..10) * ms); } } fn main() { let (q_in_tx, q_in_rx) = flume::unbounded(); let (q_out_tx, q_out_rx) = flume::unbounded(); let mut t = Vec::<thread::JoinHandle<()>>::new(); for x in 0..T { let q_in_rx_c = q_in_rx.clone(); let q_out_tx_c = q_out_tx.clone(); t.push(thread::spawn(move || { shift(q_in_rx_c, q_out_tx_c, (x + 1) * 100); })); } for x in 0..N { q_in_tx.send(Some(x)).unwrap(); } for _ in 0..T { q_in_tx.send(None).unwrap(); } for _ in 0..T { t.pop().unwrap().join().unwrap(); } for _ in 0..N { if let Some(res) = q_out_rx.recv().unwrap() { println!("{:?}", res); } } }
Crystal
Crystal 1.8.2 doesn't support multithreading. Entire application runs in a single thread, except for garbage collector which runs in a separate thread.