Synchronization Mechanism

In concurrent programming, if there is no synchronization mechanism to protect variables shared by multiple threads, data race may occur.

The Cangjie programming language provides three common synchronization mechanisms to ensure data thread security: atomic operations, mutexes, and condition variables.

Atomic Operations

Cangjie provides atomic operations of the integer, Bool, and reference types.

The integer types include Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, and UInt64.

Atomic operations of the integer type support basic read, write, swap, and arithmetic operations.

OperationFunction
loadRead.
storeWrite.
swapSwap. The value before swap is returned.
compareAndSwapCompare and swap. If the swap is successful, true is returned. Otherwise, false is returned.
fetchAddAddition. The value before the addition operation is returned.
fetchSubSubtraction. The value before the subtraction operation is returned.
fetchAndAND. The value before the AND operation is returned.
fetchOrOR. The value before the OR operation is returned.
fetchXorXOR. The value before the XOR operation is returned.

Note that:

  1. The return values of the swap and arithmetic operations are values before the operations.
  2. compareAndSwap checks whether the value of the current atomic variable is equal to the old value. If they are equal, it replaces the old value with the new value. Otherwise, it does not perform the replacement.

Take the Int8 type as an example. The corresponding atomic operation type is declared as follows:

class AtomicInt8 {
    public func load(): Int8
    public func store(val: Int8): Unit
    public func swap(val: Int8): Int8
    public func compareAndSwap(old: Int8, new: Int8): Bool
    public func fetchAdd(val: Int8): Int8
    public func fetchSub(val: Int8): Int8
    public func fetchAnd(val: Int8): Int8
    public func fetchOr(val: Int8): Int8
    public func fetchXor(val: Int8): Int8
}

The methods for each atomic type listed above have corresponding versions that can accept memory ordering parameters, currently supporting only sequential consistency.

Similarly, other integer types correspond to the following atomic operation types:

class AtomicInt16 {...}
class AtomicInt32 {...}
class AtomicInt64 {...}
class AtomicUInt8 {...}
class AtomicUInt16 {...}
class AtomicUInt32 {...}
class AtomicUInt64 {...}

The following example shows how to use atomic operations to count in a multithreaded program:

import std.sync.*
import std.time.*
import std.collection.*

let count = AtomicInt64(0)

main(): Int64 {
    let list = ArrayList<Future<Int64>>()

    // create 1000 threads.
    for (_ in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) // sleep for 1 ms.
            count.fetchAdd(1)
        }
        list.add(fut)
    }

    // Wait for all threads finished.
    for (f in list) {
        f.get()
    }

    let val = count.load()
    println("count = ${val}")
    return 0
}

The output is as follows:

count = 1000

The following are some examples of using atomic operations of the integer type:

var obj: AtomicInt32 = AtomicInt32(1)
var x = obj.load() // x: 1, the type is Int32
x = obj.swap(2) // x: 1
x = obj.load() // x: 2
var y = obj.compareAndSwap(2, 3) // y: true, the type is Bool.
y = obj.compareAndSwap(2, 3) // y: false, the value in obj is no longer 2 but 3. Therefore, the CAS operation fails.
x = obj.fetchAdd(1) // x: 3
x = obj.load() // x: 4

Atomic operations of the Bool and reference types support only read, write, and swap operations.

OperationFunction
loadRead
storeWrite
swapSwap. The value before swap is returned.
compareAndSwapCompare and swap. If the swap is successful, true is returned. Otherwise, false is returned.

NOTE

The atomic operation of the reference type is valid only for the reference type.

The atomic reference type is AtomicReference. The following are some examples of using atomic operations of the Bool and reference types:

import std.sync.*

class A {}

main() {
    var obj = AtomicBool(true)
    var x1 = obj.load() // x1: true, the type is Bool
    println(x1)
    var t1 = A()
    var obj2 = AtomicReference(t1)
    var x2 = obj2.load() // x2 and t1 are the same object
    var y1 = obj2.compareAndSwap(x2, t1) // x2 and t1 are the same object, y1: true
    println(y1)
    var t2 = A()
    var y2 = obj2.compareAndSwap(t2, A()) // x and t1 are not the same object, CAS fails, y2: false
    println(y2)
    y2 = obj2.compareAndSwap(t1, A()) // CAS successes, y2: true
    println(y2)
}

The result is as follows:

true
true
false
true

Mutex

The function of the ReentrantMutex is to protect the critical section so that only one thread can execute the code in the critical section at any time. When a thread attempts to acquire a lock that has been held by another thread, the thread is blocked and is not woken up until the lock is released. Reentrancy means that a thread can acquire the same lock multiple times.

When using a ReentrantMutex, you must keep two rules in mind:

  1. You must attempt to acquire the lock before accessing shared data.
  2. After the shared data is processed, the shared data must be unlocked so that other threads can acquire the lock.

ReentrantMutex provides the following member functions:

public class Mutex <: UniqueLock {
    // Create a Mutex.
    public init()

    // Locks the mutex, blocks if the mutex is not available.
    public func lock(): Unit

    // Unlocks the mutex. If there are other threads blocking on this
    // lock, then wake up one of them.
    public func unlock(): Unit

    // Tries to lock the mutex, returns false if the mutex is not
    // available, otherwise returns true.
    public func tryLock(): Bool

    // Generate a Condition instance for the mutex.
    public func condition(): Condition
}

The following example shows how to use Mutex to protect access to the global shared variable count, where operations on count are considered a critical section:

import std.sync.*
import std.time.*
import std.collection.*

var count: Int64 = 0
let mtx = Mutex()

main(): Int64 {
    let list = ArrayList<Future<Unit>>()

    // create 1000 threads.
    for (i in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) // sleep for 1ms.
            mtx.lock()
            count++
            mtx.unlock()
        }
        list.add(fut)
    }

    // Wait for all threads finished.
    for (f in list) {
        f.get()
    }

    println("count = ${count}")
    return 0
}

The output is as follows:

count = 1000

The following example shows how to use tryLock:

import std.sync.*
import std.time.*

main(): Int64 {
    let mtx: Mutex = Mutex()
    var future: Future<Unit> = spawn {
        mtx.lock()
        println("get the lock, do something")
        sleep(Duration.millisecond * 10)
        mtx.unlock()
    }
    try {
        future.get(Duration.millisecond * 10)
    } catch (e: TimeoutException) {
        if (mtx.tryLock()) {
            println("tryLock sucess, do something")
            mtx.unlock()
            return 1
        }
        println("tryLock failed, do nothing")
        return 0
    }
    return 2
}

One possible output is as follows:

get the lock, do something

The following are some error examples involving mutexes:

Error example 1: A thread operates on the critical section without unlocking, causing other threads to be blocked and unable to acquire the lock.

import std.sync.*

var sum: Int64 = 0
let mutex = Mutex()

main() {
    let foo = spawn { =>
        mutex.lock()
        sum = sum + 1
    }
    let bar = spawn { =>
        mutex.lock()
        sum = sum + 1
    }
    foo.get()
    println("${sum}")
    bar.get() // Because the thread is not unlocked, other threads waiting to obtain the current mutex will be blocked.
}

Error example 2: If the current thread does not hold a lock, an exception is thrown when unlock is called.

import std.sync.*

var sum: Int64 = 0
let mutex = Mutex()

main() {
    let foo = spawn { =>
        sum = sum + 1
        mutex.unlock() // Error, Unlock without obtaining the lock and throw an exception: IllegalSynchronizationStateException.
    }
    foo.get()
    0
}

Error example 3: tryLock() does not guarantee that the lock will be acquired, which can lead to operations on the critical section without the lock's protection and exceptions being thrown when calling unlock without holding the lock.

import std.sync.*
var sum: Int64 = 0
let mutex = Mutex()

main() {
    for (i in 0..100) {
        spawn { =>
            mutex.tryLock() // Error, `tryLock()` just trying to acquire a lock, there is no guarantee that the lock will be acquired, and this can lead to abnormal behavior.
            sum = sum + 1
            mutex.unlock()
        }
    }
}

In addition, Mutex is designed to be a reentrant lock, meaning that if a thread already holds Mutex, it can immediately acquire the same Mutex again.

NOTE

Although Mutex is a reentrant lock, the number of calls to unlock() must be the same as the number of calls to lock() so that the lock can be successfully released.

The following example code demonstrates the reentrant feature of Mutex:

import std.sync.*
import std.time.*

var count: Int64 = 0
let mtx = Mutex()

func foo() {
    mtx.lock()
    count += 10
    bar()
    mtx.unlock()
}

func bar() {
    mtx.lock()
    count += 100
    mtx.unlock()
}

main(): Int64 {
    let fut = spawn {
        sleep(Duration.millisecond) // sleep for 1ms.
        foo()
    }

    foo()

    fut.get()

    println("count = ${count}")
    return 0
}

The output is as follows:

count = 220

In the example above, whether in the main thread or in the newly created thread, if the lock has already been acquired in foo(), calling bar() will allow the thread to immediately acquire the same ReentrantMutex again without causing a deadlock, since it is locking the same ReentrantMutex.

Condition

Condition is a conditional variable (waiting queue) bound to a mutex. Condition instances are created by mutexes, and one mutex can create multiple Condition instances. Condition can block a thread and wait for a signal from another thread to resume execution. This mechanism uses shared variables to synchronize threads. The following methods are provided:

public class Mutex <: UniqueLock {
    // ...
    // Generate a Condition instance for the mutex.
    public func condition(): Condition
}

public interface Condition {
    // Wait for a signal, blocking the current thread.
    func wait(): Unit
    func wait(timeout!: Duration): Bool

    // Wait for a signal and predicate, blocking the current thread.
    func waitUntil(predicate: ()->Bool): Unit
    func waitUntil(predicate: ()->Bool, timeout!: Duration): Bool

    // Wake up one thread of those waiting on the monitor, if any.
    func notify(): Unit

    // Wake up all threads waiting on the monitor, if any.
    func notifyAll(): Unit
}

Before calling the wait, notify, or notifyAll method of the Condition interface, ensure that the current thread has held a bound lock.The wait method contains the following actions:

  1. Add the current thread to the waiting queue of the corresponding lock.
  2. Block the current thread, completely release the lock, and record the number of lock re-entry times.
  3. Wait for another thread to use the notify or notifyAll method of the same Condition instance to send a signal to the thread.
  4. After the current thread is woken up, it automatically attempts to re-acquire the lock. The re-entry status of the lock is the same as the number of re-entry times recorded in step 2. However, if the attempt to re-acquire the lock fails, the current thread is blocked on the lock.

The wait method accepts an optional parameter timeout. It should be noted that many common OSs in the industry do not ensure real-time scheduling. Therefore, it cannot be ensured that a thread is blocked for "exact N nanoseconds." There may be system-related inaccuracies. In addition, the current language specification explicitly allows implementations to produce false wake-ups. In this case, the return value of wait is implementation-determined, possibly true or false. Therefore, developers are encouraged to always wrap wait in a loop.

synchronized (obj) {
  while (<condition is not true>) {
    obj.wait()
  }
}

The following is an example of using Condition:

import std.sync.*
import std.time.*

let mtx = Mutex()
let condition = synchronized(mtx) {
    mtx.condition()
}
var flag: Bool = true

main(): Int64 {
    let fut = spawn {
        mtx.lock()
        while (flag) {
            println("New thread: before wait")
            condition.wait()
            println("New thread: after wait")
        }
        mtx.unlock()
    }

    // Sleep for 10ms, to make sure the new thread can be executed.
    sleep(10 * Duration.millisecond)

    mtx.lock()
    println("Main thread: set flag")
    flag = false
    mtx.unlock()

    mtx.lock()
    println("Main thread: notify")
    condition.notifyAll()
    mtx.unlock()

    // wait for the new thread finished.
    fut.get()
    return 0
}

The output is as follows:

New thread: before wait
Main thread: set flag
Main thread: notify
New thread: after wait

When a Condition object executes wait, it must be protected by a lock. Otherwise, an exception is thrown when the lock is released in wait.

The following are some error examples of using condition variables:

import std.sync.*

let m1 = Mutex()
let c1 = synchronized(m1) {
    m1.condition()
}
let m2 = Mutex()
var flag: Bool = true
var count: Int64 = 0

func foo1() {
    spawn {
        m2.lock()
        while (flag) {
            c1.wait() // Error: The lock used together with the condition variable must be the same lock and in the locked state. Otherwise, the unlock operation in wait throws an exception.
        }
        count = count + 1
        m2.unlock()
    }
    m1.lock()
    flag = false
    c1.notifyAll()
    m1.unlock()
}

func foo2() {
    spawn {
        while (flag) {
            c1.wait() // Error: The wait of a conditional variable must be called with a lock held.
        }
        count = count + 1
    }
    m1.lock()
    flag = false
    c1.notifyAll()
    m1.unlock()
}

main() {
    foo1()
    foo2()
    c1.wait()
    return 0
}

In some complex inter-thread synchronization scenarios, multiple Condition instances need to be generated for the same lock object. The following example implements a fixed-length bounded FIFO queue. When the queue is empty, get() will be blocked. When the queue is full, put() will be blocked.

import std.sync.*

class BoundedQueue {
    // Create a Mutex, two Conditions.
    let m: Mutex = Mutex()
    var notFull: Condition
    var notEmpty: Condition

    var count: Int64 // Object count in buffer.
    var head: Int64  // Write index.
    var tail: Int64  // Read index.

    // Queue's length is 100.
    let items: Array<Object> = Array<Object>(100, {i => Object()})

    init() {
        count = 0
        head = 0
        tail = 0

        synchronized(m) {
          notFull  = m.condition()
          notEmpty = m.condition()
        }
    }

    // Insert an object, if the queue is full, block the current thread.
    public func put(x: Object) {
        // Acquire the mutex.
        synchronized(m) {
          while (count == 100) {
            // If the queue is full, wait for the "queue notFull" event.
            notFull.wait()
          }
          items[head] = x
          head++
          if (head == 100) {
            head = 0
          }
          count++

          // An object has been inserted and the current queue is no longer
          // empty, so wake up the thread previously blocked on get()
          // because the queue was empty.
          notEmpty.notify()
        } // Release the mutex.
    }

    // Pop an object, if the queue is empty, block the current thread.
    public func get(): Object {
        // Acquire the mutex.
        synchronized(m) {
          while (count == 0) {
            // If the queue is empty, wait for the "queue notEmpty" event.
            notEmpty.wait()
          }
          let x: Object = items[tail]
          tail++
          if (tail == 100) {
            tail = 0
          }
          count--

          // An object has been popped and the current queue is no longer
          // full, so wake up the thread previously blocked on put()
          // because the queue was full.
          notFull.notify()

          return x
        } // Release the mutex.
    }
}

synchronized

The Lock provides a convenient and flexible locking method. However, due to its flexibility, it can lead to issues such as forgetting to unlock or throwing exceptions while holding the mutex, which prevents automatic release of the held lock. To address these problems, the Cangjie programming language offers a synchronized keyword that can be used in conjunction with Lock. It automatically handles locking and unlocking within the scope that follows it.

The following example code demonstrates how to use the synchronized keyword to protect shared data:

import std.sync.*
import std.time.*
import std.collection.*

var count: Int64 = 0
let mtx = Mutex()

main(): Int64 {
    let list = ArrayList<Future<Unit>>()

    // create 1000 threads.
    for (i in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) // sleep for 1ms.
            // Use synchronized(mtx), instead of mtx.lock() and mtx.unlock().
            synchronized(mtx) {
                count++
            }
        }
        list.add(fut)
    }

    // Wait for all threads finished.
    for (f in list) {
        f.get()
    }

    println("count = ${count}")
    return 0
}

The output is as follows:

count = 1000

A Lock instance is added after synchronized to protect the modified code block. In this way, only one thread can execute the protected code at any time.

  1. Before entering the code block modified by synchronized, a thread automatically acquires the lock corresponding to the Lock instance. If the lock cannot be acquired, the current thread is blocked.
  2. Before exiting the code block modified by synchronized, a thread automatically releases the lock of the Lock instance.

For control transfer expressions (such as break, continue, return, and throw), when they cause the execution of the program to exit the synchronized code block, it also conforms to the previous description. That is, the lock corresponding to the synchronized expression is automatically released.

The following example demonstrates a case where a break statement appears in a synchronized code block:

import std.sync.*
import std.collection.*

var count: Int64 = 0
var mtx: Mutex = Mutex()

main(): Int64 {
    let list = ArrayList<Future<Unit>>()
    for (i in 0..10) {
        let fut = spawn {
            while (true) {
                synchronized(mtx) {
                    count = count + 1
                    break
                    println("in thread")
                }
            }
        }
        list.add(fut)
    }

    // Wait for all threads finished.
    for (f in list) {
        f.get()
    }

    synchronized(mtx) {
        println("in main, count = ${count}")
    }
    return 0
}

The output is as follows:

in main, count = 10

Actually, the in thread line is not printed because the break statement actually causes the program to exit the while loop (it exits the synchronized code block before exiting the while loop).

ThreadLocal

You can use ThreadLocal in the core package to create and use thread local variables. Each thread has an independent storage space to store these thread local variables. Therefore, each thread can safely access its own thread local variables without being affected by other threads.

public class ThreadLocal<T> {
    /* Constructs a Cangjie thread local variable with a null value. */     
    public init()

    /*
     * Obtains the value of a Cangjie thread local variable. If the value does not exist, Option<T>.None is returned.
     * Option<T>: return value of the Cangjie thread local variable
     */
    public func get(): Option<T>

    /*
     * Sets the value of the Cangjie thread local variable.
     * If Option<T>.None is passed, the value of the local variable will be deleted and cannot be acquired in subsequent operations of the thread.
     * value: value of the local variable to be set
     */
    public func set(value: Option<T>): Unit
}

The following example code demonstrates how to use the ThreadLocal class to create and use local variables of each thread:


main(): Int64 {
    let tl = ThreadLocal<Int64>()
    let fut1 = spawn {
        tl.set(123)
        println("tl in spawn1 = ${tl.get().getOrThrow()}")
    }
    let fut2 = spawn {
        tl.set(456)
        println("tl in spawn2 = ${tl.get().getOrThrow()}")
    }
    fut1.get()
    fut2.get()
    0
}

The possible output is as follows:

tl in spawn1 = 123
tl in spawn2 = 456

Or:

tl in spawn2 = 456
tl in spawn1 = 123