Synchronization Mechanism

In concurrent programming, if there is no synchronization mechanism to protect variables shared by multiple threads, data race may occur.

The Cangjie programming language provides three common synchronization mechanisms to ensure data thread security: atomic operations, mutexes, and condition variables.

Atomic Operations

Cangjie provides atomic operations of the integer, Bool, and reference types.

The integer types include Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, and UInt64.

Atomic operations of the integer type support basic read, write, swap, and arithmetic operations.

OperationFunction
loadRead.
storeWrite.
swapSwap. The value before swap is returned.
compareAndSwapCompare and swap. If the swap is successful, true is returned. Otherwise, false is returned.
fetchAddAddition. The value before the addition operation is returned.
fetchSubSubtraction. The value before the subtraction operation is returned.
fetchAndAND. The value before the AND operation is returned.
fetchOrOR. The value before the OR operation is returned.
fetchXorXOR. The value before the XOR operation is returned.

Note that:

  1. The return values of the swap and arithmetic operations are values before the operations.
  2. compareAndSwap checks whether the value of the current atomic variable is equal to the old value. If they are equal, it replaces the old value with the new value. Otherwise, it does not perform the replacement.

Take the Int8 type as an example. The corresponding atomic operation type is declared as follows:

class AtomicInt8 {
    public func load(): Int8
    public func store(val: Int8): Unit
    public func swap(val: Int8): Int8
    public func compareAndSwap(old: Int8, new: Int8): Bool
    public func fetchAdd(val: Int8): Int8
    public func fetchSub(val: Int8): Int8
    public func fetchAnd(val: Int8): Int8
    public func fetchOr(val: Int8): Int8
    public func fetchXor(val: Int8): Int8
}

The methods for each atomic type listed above have corresponding versions that can accept memory ordering parameters, currently supporting only sequential consistency.

Similarly, other integer types correspond to the following atomic operation types:

class AtomicInt16 {...}
class AtomicInt32 {...}
class AtomicInt64 {...}
class AtomicUInt8 {...}
class AtomicUInt16 {...}
class AtomicUInt32 {...}
class AtomicUInt64 {...}

The following example shows how to use atomic operations to count in a multithreaded program:

import std.sync.*
import std.time.*
import std.collection.*

let count = AtomicInt64(0)

main(): Int64 {
    let list = ArrayList<Future<Int64>>()

    // create 1000 threads.
    for (_ in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) // sleep for 1 ms.
            count.fetchAdd(1)
        }
        list.append(fut)
    }

    // Wait for all threads finished.
    for (f in list) {
        f.get()
    }

    let val = count.load()
    println("count = ${val}")
    return 0
}

The output is as follows:

count = 1000

The following are some examples of using atomic operations of the integer type:

var obj: AtomicInt32 = AtomicInt32(1)
var x = obj.load() // x: 1, the type is Int32
x = obj.swap(2) // x: 1
x = obj.load() // x: 2
var y = obj.compareAndSwap(2, 3) // y: true, the type is Bool.
y = obj.compareAndSwap(2, 3) // y: false, the value in obj is no longer 2 but 3. Therefore, the CAS operation fails.
x = obj.fetchAdd(1) // x: 3
x = obj.load() // x: 4

Atomic operations of the Bool and reference types support only read, write, and swap operations.

OperationFunction
loadRead
storeWrite
swapSwap. The value before swap is returned.
compareAndSwapCompare and swap. If the swap is successful, true is returned. Otherwise, false is returned.

Note:

The atomic operation of the reference type is valid only for the reference type.

The atomic reference type is AtomicReference. The following are some examples of using atomic operations of the Bool and reference types:

import std.sync.*

class A {}

main() {
    var obj = AtomicBool(true)
    var x1 = obj.load() // x1: true, the type is Bool
    println(x1)
    var t1 = A()
    var obj2 = AtomicReference(t1)
    var x2 = obj2.load() // x2 and t1 are the same object
    var y1 = obj2.compareAndSwap(x2, t1) // x2 and t1 are the same object, y1: true
    println(y1)
    var t2 = A()
    var y2 = obj2.compareAndSwap(t2, A()) // x and t1 are not the same object, CAS fails, y2: false
    println(y2)
    y2 = obj2.compareAndSwap(t1, A()) // CAS successes, y2: true
    println(y2)
}

The result is as follows:

true
true
false
true

ReentrantMutex

The function of the ReentrantMutex is to protect the critical section so that only one thread can execute the code in the critical section at any time. When a thread attempts to acquire a lock that has been held by another thread, the thread is blocked and is not woken up until the lock is released. Reentrancy means that a thread can acquire the same lock multiple times.

Note:

ReentrantMutex is a built-in mutex. You need to ensure that it is not inherited.

When using a ReentrantMutex, you must keep two rules in mind:

  1. You must attempt to acquire the lock before accessing shared data.
  2. After the shared data is processed, the shared data must be unlocked so that other threads can acquire the lock.

ReentrantMutex provides the following member functions:

public open class ReentrantMutex {
    // Create a ReentrantMutex.
    public init()

    // Locks the mutex, blocks if the mutex is not available.
    public func lock(): Unit

    // Unlocks the mutex. If there are other threads blocking on this
    // lock, then wake up one of them.
    public func unlock(): Unit

    // Tries to lock the mutex, returns false if the mutex is not
    // available, otherwise returns true.
    public func tryLock(): Bool
}

The following example shows how to use ReentrantMutex to protect access to the global shared variable count, where operations on count are considered a critical section:

import std.sync.*
import std.time.*
import std.collection.*

var count: Int64 = 0
let mtx = ReentrantMutex()

main(): Int64 {
    let list = ArrayList<Future<Unit>>()

    // create 1000 threads.
    for (i in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) // sleep for 1 ms.
            mtx.lock()
            count++
            mtx.unlock()
        }
        list.append(fut)
    }

    // Wait for all threads finished.
    for (f in list) {
        f.get()
    }

    println("count = ${count}")
    return 0
}

The output is as follows:

count = 1000

The following example shows how to use tryLock:

import std.sync.*
import std.time.*

main(): Int64 {
    let mtx: ReentrantMutex = ReentrantMutex()
    var future: Future<Unit> = spawn {
        mtx.lock()
        println("get the lock, do something")
        sleep(Duration.millisecond * 10)
        mtx.unlock()
    }
    let res: Option<Unit> = future.get(10 * 1000 * 1000)
    match (res) {
        case Some(v) =>
            println("after wait 10 ms, end")
        case None =>
            println("after wait 10 ms, try to get the lock")
            if (mtx.tryLock()) {
                println("tryLock sucess, do something")
                mtx.unlock()
                return 1
            }
            println("tryLock failed, do nothing")
            return 0
    }
    return 2
}

One possible output is as follows:

get the lock, do something
after wait 10 ms, try to get the lock
tryLock failed, do nothing

The following are some error examples involving mutexes:

Error example 1: A thread operates on the critical section without unlocking, causing other threads to be blocked and unable to acquire the lock.

import std.sync.*

var sum: Int64 = 0
let mutex = ReentrantMutex()

main() {
    let foo = spawn { =>
        mutex.lock()
        sum = sum + 1
    }
    let bar = spawn { =>
        mutex.lock()
        sum = sum + 1
    }
    foo.get()
    println("${sum}")
    bar.get() // Because the thread is not unlocked, other threads waiting to obtain the current mutex will be blocked.
}

Error example 2: If the current thread does not hold a lock, an exception is thrown when unlock is called.

import std.sync.*

var sum: Int64 = 0
let mutex = ReentrantMutex()

main() {
    let foo = spawn { =>
        sum = sum + 1
        mutex.unlock() // Error, Unlock without obtaining the lock and throw an exception: IllegalSynchronizationStateException.
    }
    foo.get()
    0
}

Error example 3: tryLock() does not guarantee that the lock will be acquired, which can lead to operations on the critical section without the lock's protection and exceptions being thrown when calling unlock without holding the lock.

import std.sync.*
var sum: Int64 = 0
let mutex = ReentrantMutex()

main() {
    for (i in 0..100) {
        spawn { =>
            mutex.tryLock() // Error, `tryLock()` just trying to acquire a lock, there is no guarantee that the lock will be acquired, and this can lead to abnormal behavior.
            sum = sum + 1
            mutex.unlock()
        }
    }
}

In addition, ReentrantMutex is designed to be a reentrant lock, meaning that if a thread already holds ReentrantMutex, it can immediately acquire the same ReentrantMutex again.

Note:

Although ReentrantMutex is a reentrant lock, the number of calls to unlock() must be the same as the number of calls to lock() so that the lock can be successfully released.

The following example code demonstrates the reentrant feature of ReentrantMutex:

import std.sync.*
import std.time.*

var count: Int64 = 0
let mtx = ReentrantMutex()

func foo() {
    mtx.lock()
    count += 10
    bar()
    mtx.unlock()
}

func bar() {
    mtx.lock()
    count += 100
    mtx.unlock()
}

main(): Int64 {
    let fut = spawn {
        sleep(Duration.millisecond) // sleep for 1 ms.
        foo()
    }

    foo()

    fut.get()

    println("count = ${count}")
    return 0
}

The output is as follows:

count = 220

In the example above, whether in the main thread or in the newly created thread, if the lock has already been acquired in foo(), calling bar() will allow the thread to immediately acquire the same ReentrantMutex again without causing a deadlock, since it is locking the same ReentrantMutex.

Monitor

Monitor is a built-in data structure that binds a mutex to a single related condition variable (wait queue). Monitor can block a thread and wait for a signal from another thread to resume execution. This mechanism uses shared variables to synchronize threads. The following methods are provided:

public class Monitor <: ReentrantMutex {
    // Create a monitor.
    public init()

    // Wait for a signal, blocking the current thread.
    public func wait(timeout!: Duration = Duration.Max): Bool

    // Wake up one thread of those waiting on the monitor, if any.
    public func notify(): Unit

    // Wake up all threads waiting on the monitor, if any.
    public func notifyAll(): Unit
}

Before calling the wait, notify, or notifyAll method of the Monitor object, ensure that the current thread has held the corresponding Monitor lock. The wait method contains the following actions:

  1. Add the current thread to the waiting queue corresponding to the Monitor.
  2. Block the current thread, release the Monitor lock, and record the number of lock re-entry times.
  3. Wait for another thread to use the notify or notifyAll method of the same Monitor instance to send a signal to the thread.
  4. After the current thread is woken up, it automatically attempts to re-acquire the Monitor lock. The re-entry status of the lock is the same as the number of re-entry times recorded in step 2. However, if the attempt to acquire the Monitor lock fails, the current thread is blocked on the Monitor lock.

The wait method accepts an optional parameter timeout. It should be noted that many common OSs in the industry do not ensure real-time scheduling. Therefore, it cannot be ensured that a thread is blocked for "exact N nanoseconds." There may be system-related inaccuracies. In addition, the current language specification explicitly allows implementations to produce false wake-ups. In this case, the return value of wait is implementation-determined, possibly true or false. Therefore, developers are encouraged to always wrap wait in a loop.

synchronized (obj) {
  while (<condition is not true>) {
    obj.wait()
  }
}

The following is an example of using Monitor:

import std.sync.*
import std.time.*

var mon = Monitor()
var flag: Bool = true

main(): Int64 {
    let fut = spawn {
        mon.lock()
        while (flag) {
            println("New thread: before wait")
            mon.wait()
            println("New thread: after wait")
        }
        mon.unlock()
    }

    // Sleep for 10 ms, to make sure the new thread can be executed.
    sleep(10 * Duration.millisecond)

    mon.lock()
    println("Main thread: set flag")
    flag = false
    mon.unlock()

    mon.lock()
    println("Main thread: notify")
    mon.notifyAll()
    mon.unlock()

    // wait for the new thread finished.
    fut.get()
    return 0
}

The output is as follows:

New thread: before wait
Main thread: set flag
Main thread: notify
New thread: after wait

When a Monitor object executes wait, it must be protected by a lock. Otherwise, an exception is thrown when the lock is released in wait.

The following are some error examples of using condition variables:

import std.sync.*

var m1 = Monitor()
var m2 = ReentrantMutex()
var flag: Bool = true
var count: Int64 = 0

func foo1() {
    spawn {
        m2.lock()
        while (flag) {
            m1.wait() // Error: The lock used together with the condition variable must be the same lock and in the locked state. Otherwise, the unlock operation in `wait` throws an exception.
        }
        count = count + 1
        m2.unlock()
    }
    m1.lock()
    flag = false
    m1.notifyAll()
    m1.unlock()
}

func foo2() {
    spawn {
        while (flag) {
            m1.wait() // Error: The `wait` of a conditional variable must be called with a lock held.
        }
        count = count + 1
    }
    m1.lock()
    flag = false
    m1.notifyAll()
    m1.unlock()
}

main() {
    foo1()
    foo2()
    m1.wait()
    return 0
}

MultiConditionMonitor

MultiConditionMonitor is a built-in data structure that binds a mutex to a group of dynamically created condition variables. This class is used only when the Monitor class is insufficient for complex inter-thread synchronization. It primarily provides the following methods:

public class MultiConditionMonitor <: ReentrantMutex {
   // Constructor.
   init()

   // Returns a new ConditionID associated with this monitor. May be used to implement
   // "single mutex -- multiple wait queues" concurrent primitives.
   // Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
   func newCondition(): ConditionID

   // Blocks until either a paired `notify` is invoked or `timeout` nanoseconds pass.
   // Returns `true` if the specified condition was signalled by another thread or `false` on timeout.
   // Spurious wakeups are allowed.
   // Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
   // Throws IllegalSynchronizationStateException("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
   func wait(id: ConditionID, timeout!: Duration = Duration.Max): Bool

   // Wakes up a single thread waiting on the specified condition, if any (no particular admission policy implied).
   // Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
   // Throws IllegalSynchronizationStateException("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
   func notify(id: ConditionID): Unit

   // Wakes up all threads waiting on the specified condition, if any (no particular admission policy implied).
   // Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
   // Throws IllegalSynchronizationStateException("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
   func notifyAll(id: ConditionID): Unit
}
  1. newCondition(): ConditionID: creates a condition variable, associates it with the current object, and returns a specific ConditionID.
  2. wait(id: ConditionID, timeout!: Duration = Duration.Max): Bool: waits for a signal and blocks the current thread.
  3. notify(id: ConditionID): Unit: wakes up a thread waiting on the Monitor (if any).
  4. notifyAll(id: ConditionID): Unit: wakes up all threads waiting on the Monitor (if any).

During initialization, the MultiConditionMonitor does not have a related ConditionID instance. Each time newCondition is called, a new condition variable is created and associated with the current object, returning a unique identifier of the following type:

public struct ConditionID {
   private init() { ... } // constructor is intentionally private to prevent
                          // creation of such structs outside of MultiConditionMonitor
}

Note that users cannot pass a ConditionID returned by a MultiConditionMonitor instance to other instances or manually create a ConditionID (for example, using unsafe). Because the data contained in the ConditionID (such as the index of the internal array, the direct address of the internal queue, or any other type of data) is related to the MultiConditionMonitor that created it, passing an "external" conditionID to the MultiConditionMonitor will result in IllegalSynchronizationStateException.

The following describes how to use MultiConditionMonitor to implement a bounded FIFO queue with a fixed length. When the queue is empty, get() is blocked. When the queue is full, put() is blocked.

import std.sync.*

class BoundedQueue {
    // Create a MultiConditionMonitor, two Conditions.
    let m: MultiConditionMonitor = MultiConditionMonitor()
    var notFull: ConditionID
    var notEmpty: ConditionID

    var count: Int64 // Object count in buffer.
    var head: Int64  // Write index.
    var tail: Int64  // Read index.

    // Queue's length is 100.
    let items: Array<Object> = Array<Object>(100, {i => Object()})

    init() {
        count = 0
        head = 0
        tail = 0

        synchronized(m) {
          notFull  = m.newCondition()
          notEmpty = m.newCondition()
        }
    }

    // Insert an object, if the queue is full, block the current thread.
    public func put(x: Object) {
        // Acquire the mutex.
        synchronized(m) {
          while (count == 100) {
            // If the queue is full, wait for the "queue notFull" event.
            m.wait(notFull)
          }
          items[head] = x
          head++
          if (head == 100) {
            head = 0
          }
          count++

          // An object has been inserted and the current queue is no longer
          // empty, so wake up the thread previously blocked on get()
          // because the queue was empty.
          m.notify(notEmpty)
        } // Release the mutex.
    }

    // Pop an object, if the queue is empty, block the current thread.
    public func get(): Object {
        // Acquire the mutex.
        synchronized(m) {
          while (count == 0) {
            // If the queue is empty, wait for the "queue notEmpty" event.
            m.wait(notEmpty)
          }
          let x: Object = items[tail]
          tail++
          if (tail == 100) {
            tail = 0
          }
          count--

          // An object has been popped and the current queue is no longer
          // full, so wake up the thread previously blocked on put()
          // because the queue was full.
          m.notify(notFull)

          return x
        } // Release the mutex.
    }
}

synchronized

The ReentrantMutex provides a convenient and flexible locking method. However, due to its flexibility, it can lead to issues such as forgetting to unlock or throwing exceptions while holding the mutex, which prevents automatic release of the held lock. To address these problems, the Cangjie programming language offers a synchronized keyword that can be used in conjunction with ReentrantMutex. It automatically handles locking and unlocking within the scope that follows it.

The following example code demonstrates how to use the synchronized keyword to protect shared data:

import std.sync.*
import std.time.*
import std.collection.*

var count: Int64 = 0
let mtx = ReentrantMutex()

main(): Int64 {
    let list = ArrayList<Future<Unit>>()

    // create 1000 threads.
    for (i in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) // sleep for 1 ms.
            // Use synchronized(mtx), instead of mtx.lock() and mtx.unlock().
            synchronized(mtx) {
                count++
            }
        }
        list.append(fut)
    }

    // Wait for all threads finished.
    for (f in list) {
        f.get()
    }

    println("count = ${count}")
    return 0
}

The output is as follows:

count = 1000

A ReentrantMutex instance is added after synchronized to protect the modified code block. In this way, only one thread can execute the protected code at any time.

  1. Before entering the code block modified by synchronized, a thread automatically acquires the lock corresponding to the ReentrantMutex instance. If the lock cannot be acquired, the current thread is blocked.
  2. Before exiting the code block modified by synchronized, a thread automatically releases the lock of the ReentrantMutex instance.

For control transfer expressions (such as break, continue, return, and throw), when they cause the execution of the program to exit the synchronized code block, it also conforms to the previous description. That is, the lock corresponding to the synchronized expression is automatically released.

The following example demonstrates a case where a break statement appears in a synchronized code block:

import std.sync.*
import std.collection.*

var count: Int64 = 0
var mtx: ReentrantMutex = ReentrantMutex()

main(): Int64 {
    let list = ArrayList<Future<Unit>>()
    for (i in 0..10) {
        let fut = spawn {
            while (true) {
                synchronized(mtx) {
                    count = count + 1
                    break
                    println("in thread")
                }
            }
        }
        list.append(fut)
    }

    // Wait for all threads finished.
    for (f in list) {
        f.get()
    }

    synchronized(mtx) {
        println("in main, count = ${count}")
    }
    return 0
}

The output is as follows:

in main, count = 10

Actually, the in thread line is not printed because the break statement actually causes the program to exit the while loop (it exits the synchronized code block before exiting the while loop).

ThreadLocal

You can use ThreadLocal in the core package to create and use thread local variables. Each thread has an independent storage space to store these thread local variables. Therefore, each thread can safely access its own thread local variables without being affected by other threads.

public class ThreadLocal<T> {
    /*
     * Constructs a Cangjie thread local variable with a null value.
     */
    public init()

    /*
     * Obtains the value of a Cangjie thread local variable. If the value does not exist, Option<T>.None is returned.
     * Option<T>: return value of the Cangjie thread local variable
     */
    public func get(): Option<T>

    /*
     * Sets the value of the Cangjie thread local variable.
     * If Option<T>.None is passed, the value of the local variable will be deleted and cannot be acquired in subsequent operations of the thread.
     * value: value of the local variable to be set
     */
    public func set(value: Option<T>): Unit
}

The following example code demonstrates how to use the ThreadLocal class to create and use local variables of each thread:


main(): Int64 {
    let tl = ThreadLocal<Int64>()
    let fut1 = spawn {
        tl.set(123)
        println("tl in spawn1 = ${tl.get().getOrThrow()}")
    }
    let fut2 = spawn {
        tl.set(456)
        println("tl in spawn2 = ${tl.get().getOrThrow()}")
    }
    fut1.get()
    fut2.get()
    0
}

The possible output is as follows:

tl in spawn1 = 123
tl in spawn2 = 456

Or:

tl in spawn2 = 456
tl in spawn1 = 123