Synchronization Mechanism
In concurrent programming, if there is no synchronization mechanism to protect variables shared by multiple threads, data race may occur.
The Cangjie programming language provides three common synchronization mechanisms to ensure data thread security: atomic operations, mutexes, and condition variables.
Atomic Operations
Cangjie provides atomic operations of the integer, Bool
, and reference types.
The integer types include Int8
, Int16
, Int32
, Int64
, UInt8
, UInt16
, UInt32
, and UInt64
.
Atomic operations of the integer type support basic read, write, swap, and arithmetic operations.
Operation | Function |
---|---|
load | Read. |
store | Write. |
swap | Swap. The value before swap is returned. |
compareAndSwap | Compare and swap. If the swap is successful, true is returned. Otherwise, false is returned. |
fetchAdd | Addition. The value before the addition operation is returned. |
fetchSub | Subtraction. The value before the subtraction operation is returned. |
fetchAnd | AND. The value before the AND operation is returned. |
fetchOr | OR. The value before the OR operation is returned. |
fetchXor | XOR. The value before the XOR operation is returned. |
Note that:
- The return values of the swap and arithmetic operations are values before the operations.
- compareAndSwap checks whether the value of the current atomic variable is equal to the old value. If they are equal, it replaces the old value with the new value. Otherwise, it does not perform the replacement.
Take the Int8
type as an example. The corresponding atomic operation type is declared as follows:
class AtomicInt8 {
public func load(): Int8
public func store(val: Int8): Unit
public func swap(val: Int8): Int8
public func compareAndSwap(old: Int8, new: Int8): Bool
public func fetchAdd(val: Int8): Int8
public func fetchSub(val: Int8): Int8
public func fetchAnd(val: Int8): Int8
public func fetchOr(val: Int8): Int8
public func fetchXor(val: Int8): Int8
}
The methods for each atomic type listed above have corresponding versions that can accept memory ordering parameters, currently supporting only sequential consistency.
Similarly, other integer types correspond to the following atomic operation types:
class AtomicInt16 {...}
class AtomicInt32 {...}
class AtomicInt64 {...}
class AtomicUInt8 {...}
class AtomicUInt16 {...}
class AtomicUInt32 {...}
class AtomicUInt64 {...}
The following example shows how to use atomic operations to count in a multithreaded program:
import std.sync.*
import std.time.*
import std.collection.*
let count = AtomicInt64(0)
main(): Int64 {
let list = ArrayList<Future<Int64>>()
// create 1000 threads.
for (_ in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond) // sleep for 1 ms.
count.fetchAdd(1)
}
list.append(fut)
}
// Wait for all threads finished.
for (f in list) {
f.get()
}
let val = count.load()
println("count = ${val}")
return 0
}
The output is as follows:
count = 1000
The following are some examples of using atomic operations of the integer type:
var obj: AtomicInt32 = AtomicInt32(1)
var x = obj.load() // x: 1, the type is Int32
x = obj.swap(2) // x: 1
x = obj.load() // x: 2
var y = obj.compareAndSwap(2, 3) // y: true, the type is Bool.
y = obj.compareAndSwap(2, 3) // y: false, the value in obj is no longer 2 but 3. Therefore, the CAS operation fails.
x = obj.fetchAdd(1) // x: 3
x = obj.load() // x: 4
Atomic operations of the Bool
and reference types support only read, write, and swap operations.
Operation | Function |
---|---|
load | Read |
store | Write |
swap | Swap. The value before swap is returned. |
compareAndSwap | Compare and swap. If the swap is successful, true is returned. Otherwise, false is returned. |
Note:
The atomic operation of the reference type is valid only for the reference type.
The atomic reference type is AtomicReference
. The following are some examples of using atomic operations of the Bool
and reference types:
import std.sync.*
class A {}
main() {
var obj = AtomicBool(true)
var x1 = obj.load() // x1: true, the type is Bool
println(x1)
var t1 = A()
var obj2 = AtomicReference(t1)
var x2 = obj2.load() // x2 and t1 are the same object
var y1 = obj2.compareAndSwap(x2, t1) // x2 and t1 are the same object, y1: true
println(y1)
var t2 = A()
var y2 = obj2.compareAndSwap(t2, A()) // x and t1 are not the same object, CAS fails, y2: false
println(y2)
y2 = obj2.compareAndSwap(t1, A()) // CAS successes, y2: true
println(y2)
}
The result is as follows:
true
true
false
true
ReentrantMutex
The function of the ReentrantMutex is to protect the critical section so that only one thread can execute the code in the critical section at any time. When a thread attempts to acquire a lock that has been held by another thread, the thread is blocked and is not woken up until the lock is released. Reentrancy means that a thread can acquire the same lock multiple times.
Note:
ReentrantMutex is a built-in mutex. You need to ensure that it is not inherited.
When using a ReentrantMutex, you must keep two rules in mind:
- You must attempt to acquire the lock before accessing shared data.
- After the shared data is processed, the shared data must be unlocked so that other threads can acquire the lock.
ReentrantMutex
provides the following member functions:
public open class ReentrantMutex {
// Create a ReentrantMutex.
public init()
// Locks the mutex, blocks if the mutex is not available.
public func lock(): Unit
// Unlocks the mutex. If there are other threads blocking on this
// lock, then wake up one of them.
public func unlock(): Unit
// Tries to lock the mutex, returns false if the mutex is not
// available, otherwise returns true.
public func tryLock(): Bool
}
The following example shows how to use ReentrantMutex
to protect access to the global shared variable count
, where operations on count
are considered a critical section:
import std.sync.*
import std.time.*
import std.collection.*
var count: Int64 = 0
let mtx = ReentrantMutex()
main(): Int64 {
let list = ArrayList<Future<Unit>>()
// create 1000 threads.
for (i in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond) // sleep for 1 ms.
mtx.lock()
count++
mtx.unlock()
}
list.append(fut)
}
// Wait for all threads finished.
for (f in list) {
f.get()
}
println("count = ${count}")
return 0
}
The output is as follows:
count = 1000
The following example shows how to use tryLock
:
import std.sync.*
import std.time.*
main(): Int64 {
let mtx: ReentrantMutex = ReentrantMutex()
var future: Future<Unit> = spawn {
mtx.lock()
println("get the lock, do something")
sleep(Duration.millisecond * 10)
mtx.unlock()
}
let res: Option<Unit> = future.get(10 * 1000 * 1000)
match (res) {
case Some(v) =>
println("after wait 10 ms, end")
case None =>
println("after wait 10 ms, try to get the lock")
if (mtx.tryLock()) {
println("tryLock sucess, do something")
mtx.unlock()
return 1
}
println("tryLock failed, do nothing")
return 0
}
return 2
}
One possible output is as follows:
get the lock, do something
after wait 10 ms, try to get the lock
tryLock failed, do nothing
The following are some error examples involving mutexes:
Error example 1: A thread operates on the critical section without unlocking, causing other threads to be blocked and unable to acquire the lock.
import std.sync.*
var sum: Int64 = 0
let mutex = ReentrantMutex()
main() {
let foo = spawn { =>
mutex.lock()
sum = sum + 1
}
let bar = spawn { =>
mutex.lock()
sum = sum + 1
}
foo.get()
println("${sum}")
bar.get() // Because the thread is not unlocked, other threads waiting to obtain the current mutex will be blocked.
}
Error example 2: If the current thread does not hold a lock, an exception is thrown when unlock
is called.
import std.sync.*
var sum: Int64 = 0
let mutex = ReentrantMutex()
main() {
let foo = spawn { =>
sum = sum + 1
mutex.unlock() // Error, Unlock without obtaining the lock and throw an exception: IllegalSynchronizationStateException.
}
foo.get()
0
}
Error example 3: tryLock()
does not guarantee that the lock will be acquired, which can lead to operations on the critical section without the lock's protection and exceptions being thrown when calling unlock
without holding the lock.
import std.sync.*
var sum: Int64 = 0
let mutex = ReentrantMutex()
main() {
for (i in 0..100) {
spawn { =>
mutex.tryLock() // Error, `tryLock()` just trying to acquire a lock, there is no guarantee that the lock will be acquired, and this can lead to abnormal behavior.
sum = sum + 1
mutex.unlock()
}
}
}
In addition, ReentrantMutex
is designed to be a reentrant lock, meaning that if a thread already holds ReentrantMutex
, it can immediately acquire the same ReentrantMutex
again.
Note:
Although
ReentrantMutex
is a reentrant lock, the number of calls tounlock()
must be the same as the number of calls tolock()
so that the lock can be successfully released.
The following example code demonstrates the reentrant feature of ReentrantMutex
:
import std.sync.*
import std.time.*
var count: Int64 = 0
let mtx = ReentrantMutex()
func foo() {
mtx.lock()
count += 10
bar()
mtx.unlock()
}
func bar() {
mtx.lock()
count += 100
mtx.unlock()
}
main(): Int64 {
let fut = spawn {
sleep(Duration.millisecond) // sleep for 1 ms.
foo()
}
foo()
fut.get()
println("count = ${count}")
return 0
}
The output is as follows:
count = 220
In the example above, whether in the main thread or in the newly created thread, if the lock has already been acquired in foo()
, calling bar()
will allow the thread to immediately acquire the same ReentrantMutex
again without causing a deadlock, since it is locking the same ReentrantMutex
.
Monitor
Monitor
is a built-in data structure that binds a mutex to a single related condition variable (wait queue). Monitor
can block a thread and wait for a signal from another thread to resume execution. This mechanism uses shared variables to synchronize threads. The following methods are provided:
public class Monitor <: ReentrantMutex {
// Create a monitor.
public init()
// Wait for a signal, blocking the current thread.
public func wait(timeout!: Duration = Duration.Max): Bool
// Wake up one thread of those waiting on the monitor, if any.
public func notify(): Unit
// Wake up all threads waiting on the monitor, if any.
public func notifyAll(): Unit
}
Before calling the wait
, notify
, or notifyAll
method of the Monitor
object, ensure that the current thread has held the corresponding Monitor
lock. The wait
method contains the following actions:
- Add the current thread to the waiting queue corresponding to the
Monitor
. - Block the current thread, release the
Monitor
lock, and record the number of lock re-entry times. - Wait for another thread to use the
notify
ornotifyAll
method of the sameMonitor
instance to send a signal to the thread. - After the current thread is woken up, it automatically attempts to re-acquire the
Monitor
lock. The re-entry status of the lock is the same as the number of re-entry times recorded in step 2. However, if the attempt to acquire theMonitor
lock fails, the current thread is blocked on theMonitor
lock.
The wait
method accepts an optional parameter timeout
. It should be noted that many common OSs in the industry do not ensure real-time scheduling. Therefore, it cannot be ensured that a thread is blocked for "exact N nanoseconds." There may be system-related inaccuracies. In addition, the current language specification explicitly allows implementations to produce false wake-ups. In this case, the return value of wait
is implementation-determined, possibly true
or false
. Therefore, developers are encouraged to always wrap wait
in a loop.
synchronized (obj) {
while (<condition is not true>) {
obj.wait()
}
}
The following is an example of using Monitor
:
import std.sync.*
import std.time.*
var mon = Monitor()
var flag: Bool = true
main(): Int64 {
let fut = spawn {
mon.lock()
while (flag) {
println("New thread: before wait")
mon.wait()
println("New thread: after wait")
}
mon.unlock()
}
// Sleep for 10 ms, to make sure the new thread can be executed.
sleep(10 * Duration.millisecond)
mon.lock()
println("Main thread: set flag")
flag = false
mon.unlock()
mon.lock()
println("Main thread: notify")
mon.notifyAll()
mon.unlock()
// wait for the new thread finished.
fut.get()
return 0
}
The output is as follows:
New thread: before wait
Main thread: set flag
Main thread: notify
New thread: after wait
When a Monitor
object executes wait
, it must be protected by a lock. Otherwise, an exception is thrown when the lock is released in wait
.
The following are some error examples of using condition variables:
import std.sync.*
var m1 = Monitor()
var m2 = ReentrantMutex()
var flag: Bool = true
var count: Int64 = 0
func foo1() {
spawn {
m2.lock()
while (flag) {
m1.wait() // Error: The lock used together with the condition variable must be the same lock and in the locked state. Otherwise, the unlock operation in `wait` throws an exception.
}
count = count + 1
m2.unlock()
}
m1.lock()
flag = false
m1.notifyAll()
m1.unlock()
}
func foo2() {
spawn {
while (flag) {
m1.wait() // Error: The `wait` of a conditional variable must be called with a lock held.
}
count = count + 1
}
m1.lock()
flag = false
m1.notifyAll()
m1.unlock()
}
main() {
foo1()
foo2()
m1.wait()
return 0
}
MultiConditionMonitor
MultiConditionMonitor
is a built-in data structure that binds a mutex to a group of dynamically created condition variables. This class is used only when the Monitor
class is insufficient for complex inter-thread synchronization. It primarily provides the following methods:
public class MultiConditionMonitor <: ReentrantMutex {
// Constructor.
init()
// Returns a new ConditionID associated with this monitor. May be used to implement
// "single mutex -- multiple wait queues" concurrent primitives.
// Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
func newCondition(): ConditionID
// Blocks until either a paired `notify` is invoked or `timeout` nanoseconds pass.
// Returns `true` if the specified condition was signalled by another thread or `false` on timeout.
// Spurious wakeups are allowed.
// Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
// Throws IllegalSynchronizationStateException("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
func wait(id: ConditionID, timeout!: Duration = Duration.Max): Bool
// Wakes up a single thread waiting on the specified condition, if any (no particular admission policy implied).
// Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
// Throws IllegalSynchronizationStateException("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
func notify(id: ConditionID): Unit
// Wakes up all threads waiting on the specified condition, if any (no particular admission policy implied).
// Throws IllegalSynchronizationStateException("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
// Throws IllegalSynchronizationStateException("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
func notifyAll(id: ConditionID): Unit
}
newCondition(): ConditionID
: creates a condition variable, associates it with the current object, and returns a specificConditionID
.wait(id: ConditionID, timeout!: Duration = Duration.Max): Bool
: waits for a signal and blocks the current thread.notify(id: ConditionID): Unit
: wakes up a thread waiting on theMonitor
(if any).notifyAll(id: ConditionID): Unit
: wakes up all threads waiting on theMonitor
(if any).
During initialization, the MultiConditionMonitor
does not have a related ConditionID
instance. Each time newCondition
is called, a new condition variable is created and associated with the current object, returning a unique identifier of the following type:
public struct ConditionID {
private init() { ... } // constructor is intentionally private to prevent
// creation of such structs outside of MultiConditionMonitor
}
Note that users cannot pass a ConditionID
returned by a MultiConditionMonitor
instance to other instances or manually create a ConditionID
(for example, using unsafe
). Because the data contained in the ConditionID
(such as the index of the internal array, the direct address of the internal queue, or any other type of data) is related to the MultiConditionMonitor
that created it, passing an "external" conditionID
to the MultiConditionMonitor
will result in IllegalSynchronizationStateException
.
The following describes how to use MultiConditionMonitor
to implement a bounded FIFO
queue with a fixed length. When the queue is empty, get()
is blocked. When the queue is full, put()
is blocked.
import std.sync.*
class BoundedQueue {
// Create a MultiConditionMonitor, two Conditions.
let m: MultiConditionMonitor = MultiConditionMonitor()
var notFull: ConditionID
var notEmpty: ConditionID
var count: Int64 // Object count in buffer.
var head: Int64 // Write index.
var tail: Int64 // Read index.
// Queue's length is 100.
let items: Array<Object> = Array<Object>(100, {i => Object()})
init() {
count = 0
head = 0
tail = 0
synchronized(m) {
notFull = m.newCondition()
notEmpty = m.newCondition()
}
}
// Insert an object, if the queue is full, block the current thread.
public func put(x: Object) {
// Acquire the mutex.
synchronized(m) {
while (count == 100) {
// If the queue is full, wait for the "queue notFull" event.
m.wait(notFull)
}
items[head] = x
head++
if (head == 100) {
head = 0
}
count++
// An object has been inserted and the current queue is no longer
// empty, so wake up the thread previously blocked on get()
// because the queue was empty.
m.notify(notEmpty)
} // Release the mutex.
}
// Pop an object, if the queue is empty, block the current thread.
public func get(): Object {
// Acquire the mutex.
synchronized(m) {
while (count == 0) {
// If the queue is empty, wait for the "queue notEmpty" event.
m.wait(notEmpty)
}
let x: Object = items[tail]
tail++
if (tail == 100) {
tail = 0
}
count--
// An object has been popped and the current queue is no longer
// full, so wake up the thread previously blocked on put()
// because the queue was full.
m.notify(notFull)
return x
} // Release the mutex.
}
}
synchronized
The ReentrantMutex
provides a convenient and flexible locking method. However, due to its flexibility, it can lead to issues such as forgetting to unlock or throwing exceptions while holding the mutex, which prevents automatic release of the held lock. To address these problems, the Cangjie programming language offers a synchronized
keyword that can be used in conjunction with ReentrantMutex
. It automatically handles locking and unlocking within the scope that follows it.
The following example code demonstrates how to use the synchronized
keyword to protect shared data:
import std.sync.*
import std.time.*
import std.collection.*
var count: Int64 = 0
let mtx = ReentrantMutex()
main(): Int64 {
let list = ArrayList<Future<Unit>>()
// create 1000 threads.
for (i in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond) // sleep for 1 ms.
// Use synchronized(mtx), instead of mtx.lock() and mtx.unlock().
synchronized(mtx) {
count++
}
}
list.append(fut)
}
// Wait for all threads finished.
for (f in list) {
f.get()
}
println("count = ${count}")
return 0
}
The output is as follows:
count = 1000
A ReentrantMutex
instance is added after synchronized
to protect the modified code block. In this way, only one thread can execute the protected code at any time.
- Before entering the code block modified by
synchronized
, a thread automatically acquires the lock corresponding to theReentrantMutex
instance. If the lock cannot be acquired, the current thread is blocked. - Before exiting the code block modified by
synchronized
, a thread automatically releases the lock of theReentrantMutex
instance.
For control transfer expressions (such as break
, continue
, return
, and throw
), when they cause the execution of the program to exit the synchronized code block, it also conforms to the previous description. That is, the lock corresponding to the synchronized
expression is automatically released.
The following example demonstrates a case where a break
statement appears in a synchronized
code block:
import std.sync.*
import std.collection.*
var count: Int64 = 0
var mtx: ReentrantMutex = ReentrantMutex()
main(): Int64 {
let list = ArrayList<Future<Unit>>()
for (i in 0..10) {
let fut = spawn {
while (true) {
synchronized(mtx) {
count = count + 1
break
println("in thread")
}
}
}
list.append(fut)
}
// Wait for all threads finished.
for (f in list) {
f.get()
}
synchronized(mtx) {
println("in main, count = ${count}")
}
return 0
}
The output is as follows:
in main, count = 10
Actually, the in thread
line is not printed because the break
statement actually causes the program to exit the while
loop (it exits the synchronized
code block before exiting the while
loop).
ThreadLocal
You can use ThreadLocal
in the core package to create and use thread local variables. Each thread has an independent storage space to store these thread local variables. Therefore, each thread can safely access its own thread local variables without being affected by other threads.
public class ThreadLocal<T> {
/*
* Constructs a Cangjie thread local variable with a null value.
*/
public init()
/*
* Obtains the value of a Cangjie thread local variable. If the value does not exist, Option<T>.None is returned.
* Option<T>: return value of the Cangjie thread local variable
*/
public func get(): Option<T>
/*
* Sets the value of the Cangjie thread local variable.
* If Option<T>.None is passed, the value of the local variable will be deleted and cannot be acquired in subsequent operations of the thread.
* value: value of the local variable to be set
*/
public func set(value: Option<T>): Unit
}
The following example code demonstrates how to use the ThreadLocal
class to create and use local variables of each thread:
main(): Int64 {
let tl = ThreadLocal<Int64>()
let fut1 = spawn {
tl.set(123)
println("tl in spawn1 = ${tl.get().getOrThrow()}")
}
let fut2 = spawn {
tl.set(456)
println("tl in spawn2 = ${tl.get().getOrThrow()}")
}
fut1.get()
fut2.get()
0
}
The possible output is as follows:
tl in spawn1 = 123
tl in spawn2 = 456
Or:
tl in spawn2 = 456
tl in spawn1 = 123