Multithreading

Objectives: Create queue of data to be processed and queue of processing results. Spawn T threads, where each thread is a worker that performs data processing.

Python

CPython 3.8.10 has GIL (global interpreter lock) that prevents threads from running in parallel. However multiple threads can be spawned and run concurently.

from random import randint
from time import sleep
from threading import Thread
from queue import Queue

N = 100
T = 3

def shift(q_in, q_out, offset):
    while True:
        x = q_in.get()
        if x is None:
            break
        q_out.put(x + offset)
        sleep(0.001 * randint(0, 10))

q_in = Queue()
q_out = Queue()
offsets = ((i+1)*N for i in range(T))
threads = [Thread(target=shift, args=(q_in, q_out, offset))
           for offset in offsets]

[t.start() for t in threads]

for x in range(N):
    q_in.put(x)

[q_in.put(None) for _ in threads]

[t.join() for t in threads]

for x in range(N):
    print(q_out.get())

In practice ThreadPoolExecutor can efficiently simplify this code. However, for the sake of this exercise we use Thread in order to explore more of this field.

Rust

Rust supports threads that can be run in parallel. Crate flume has been used as a replacement of std::sync::mpsc. Flume provides multiple-producers-multiple-consumers channels. Additionally crate rand provides random numbers for demonstrational purposes.

use std::time;
use std::thread;
use rand::Rng;

const N: i32 = 100;
const T: i32 = 3;

fn shift(q_in: flume::Receiver<Option<i32>>, q_out: flume::Sender<Option<i32>>, offset: i32) {
    let ms = time::Duration::from_millis(1);
    let mut rng = rand::thread_rng();
    loop {
        let x = q_in.recv().unwrap();
        match x {
            Some(v) => { q_out.send( Some(v + offset) ).unwrap(); }
            None => { break; }
        }
        thread::sleep(rng.gen_range(0..10) * ms);
    }
}

fn main() {
    let (q_in_tx, q_in_rx) = flume::unbounded();
    let (q_out_tx, q_out_rx) = flume::unbounded();

    let mut t = Vec::<thread::JoinHandle<()>>::new();

    for x in 0..T {
        let q_in_rx_c = q_in_rx.clone();
        let q_out_tx_c = q_out_tx.clone();
        t.push(thread::spawn(move || {
            shift(q_in_rx_c, q_out_tx_c, (x + 1) * 100);
        }));
     }
     
    for x in 0..N {
        q_in_tx.send(Some(x)).unwrap();
    }

    for _ in 0..T {
        q_in_tx.send(None).unwrap();
    }

    for _ in 0..T {
        t.pop().unwrap().join().unwrap();
    }
    
    for _ in 0..N {
        if let Some(res) = q_out_rx.recv().unwrap() {
            println!("{:?}", res);
        }
    }
    
}

Crystal

Crystal 1.8.2 doesn't support multithreading. Entire application runs in a single thread, except for garbage collector which runs in a separate thread.