Concurrency
Cangjie Threads
Cangjie provides a preemptive concurrency model, in which the Cangjie thread is a basic execution unit. Cangjie threads are managed by Cangjie at runtime and are not underlying native threads (such as operating system threads). In unambiguous contexts, we simply refer to "Cangjie threads" as "threads." Each thread has the following properties:
- A thread may be preempted by another thread at any time during execution.
- Multiple threads can execute concurrently.
- When a thread is blocked, it is suspended.
- Threads can share memory (explicit synchronization is required).
Cangjie program execution begins with the initialization of global variables, then the program entry main
is called. When main
exits, the entire program exits without waiting for other threads to complete execution.
Creating Threads
You can create and start a thread using the spawn
keyword, a default parameter of the ThreadContext
type (see below for an introduction to ThreadContext
), and a lambda
expression without parameters. The expression returns an instance of Future<T>
. (For details about Future<T>
, see [Future<T>
Generic Class].) The BNF of the spawn
expression is as follows:
spawnExpression
: 'spawn' ( '(' expression ')' )? trailingLambdaExpression
;
The default parameter of spawn
is of the ThreadContext
type. An example of a spawn
expression with a default parameter of the ThreadContext
type is as follows:
func add(a: Int32, b: Int32): Int32 {
println("This is a new thread")
return a + b
}
main(): Int64 {
println("This is main")
// Create a thread.
let fut: Future<Int32> = spawn {
add(1, 2)
}
println("main waiting...")
// Waiting for the results of thread execution.
let res: Int32 = fut.get()
// Print the result.
println("1 + 2 = ${res}")
}
// OUTPUT maybe:
// This is main
// main waiting...
// This is a new thread
// 1 + 2 = 3
During the execution of the spawn
expression, the closure is encapsulated into a runtime task and submitted to the scheduler. The scheduler policies determine the execution time of the task. Once the spawn
expression is executed, the thread corresponding to the task is in the executable state. Note that the local variables declared by var
cannot be captured in the closure of the spawn
expression. For details about the closure, see [Closure].
Future<T>
Generic Class
The return value of the spawn
expression is a Future<T>
object that can be used to obtain the calculation result of a thread. The type of T
depends on the return value type of the closure in the spawn
expression.
func foo(): Int64 {...}
func bar(): String {...}
// The return type of foo() is Int64.
let f1: Future<Int64> = spawn {
foo()
}
// The return type of bar() is String.
let f2: Future<String> = spawn {
bar()
}
// Waiting for the threads' execution results.
let r1: Int64 = f1.get()
let r2: String = f2.get()
A Future<T>
object represents an unfinished calculation or task. Future<T>
is a generic class. Its definition is as follows:
class Future<T> {
... ...
/**
* Blocking the current thread,
* waiting for the result of the thread corresponding to this Future<T> object.
* @throws Exception or Error if an exception occurs in the corresponding thread.
*/
func get(): T
/**
* Blocking the current thread,
* waiting for the result of the thread corresponding to this Future<T> object.
* If the corresponding thread has not completed execution within `ns` nanoseconds,
* the method will return `Option<T>.None`.
* If `ns <= 0`, same as `get()`.
* @throws Exception or Error if an exception occurs in the corresponding thread.
*/
func get(ns: Int64): Option<T>
/**
* Non-blocking method that immediately returns None if thread has not finished execution.
* Returns the computed result otherwise.
* @throws Exception or Error if an exception occurs in the corresponding thread.
*/
func tryGet(): Option<T>
// Get the associated thread object.
prop thread: Thread
}
If a thread is terminated due to an exception, it transfers the exception to the Future<T>
object and prints the exception information by default before the thread is terminated. When the Future<T>
object corresponding to the thread calls get()
, the exception is thrown again. For example:
main(args:Array<String>) {
let fut: Future<Int64> = spawn {
if (args.size != -1) {
throw IllegalArgumentException()
}
return 100
}
try {
let res: Int64 = fut.get()
print("val = ${res}\n")
} catch (e: IllegalArgumentException) {
print("Exception occured\n")
}
}
// OUTPUT:
// An exception has occurred:
// IllegalArgumentException
// ...
// Exception occurred
Thread Class
Each Future<T>
object has an associated thread object of the Thread
type, which can be obtained through the thread
property of Future<T>
. The thread class Thread
is used to obtain thread information, such as the thread ID. Some of its definitions are as follows:
class Thread {
... ...
// Get the currently running thread
static prop currentThread: Thread
// Get the unique identifier (represented as an integer) of the thread object
prop id: Int64
// Get or set the name of the thread
mut prop name: String
}
The static property currentThread
can be used to obtain the thread object that is being executed. The following sample code can be used to print the identifier of the currently executing thread. Note: A thread may be assigned a different thread identifier each time it is executed.
main() {
let fut: Future<Unit> = spawn {
let tid = Thread.currentThread.id
println("New thread id: ${tid}")
}
fut.get()
}
// OUTPUT:
// New thread id: 2
Thread Sleep
The sync
package in the Cangjie standard library provides the sleep
function to enable threads to sleep for a specified period. If the sleep duration is zero, that is, the parameter is duration.Zero
, the current thread only releases execution resources and does not enter the sleep state.
func sleep(duration: Duration)
Thread Termination
During normal execution, if execution of a closure corresponding to a thread is completed, execution of the thread ends. In addition, a termination request may be sent to a corresponding thread by using a cancel()
method of a Future<T>
object. This method only sends a request and does not affect execution of the thread. The hasPendingCancellation
property of the thread class is used to check whether the current thread has a termination request. In this way, you can determine whether to terminate a thread in advance and how to do so.
class Future<T> {
... ...
// Send a termination request to its executing thread.
public func cancel(): Unit
}
class Thread {
... ...
// Check whether the thread has any cancellation request
prop hasPendingCancellation: Bool
}
The following example shows how to terminate a thread.
main(): Unit {
let future = spawn { // Create a new thread
while (true) {
//...
if (Thread.currentThread.hasPendingCancellation) {
return // Terminate when having a request
}
//...
}
}
//...
future.cancel() // Send a termination request
future.get() // Wait for thread termination
}
Thread Context
Threads are always executed within a specific context, and the thread context can influence the behavior of the thread at runtime. When a thread is created, besides the default spawn
expression input parameter, instances of the ThreadContext
type can be passed to select different thread contexts, thereby controlling concurrency to some extent.
ThreadContext
Interface
The ThreadContext
interface type is defined as follows:
- The
end
method is used to send a termination request to the current context. (For details, see [Closing Thread Context] below.) - The
hasEnded
method is used to check whether the current context has ended.
interface ThreadContext {
func end(): Unit
func hasEnded(): Bool
}
Currently, you are not allowed to implement the ThreadContext
interface by yourself. Cangjie provides a thread context type based on its application scenario.
- The thread context
MainThreadContext
can be used in terminal application development.
For details, see the terminal framework library.
Closing Thread Context
When closing a ThreadContext
object, you can send a termination request to the corresponding context using the end
method provided by the ThreadContext
object. This method sends a termination request to all threads running in the context (see [Thread Termination]) and returns immediately.After all Cangjie threads bound to this context are complete, the context is closed.
Similarly, the termination request does not affect the execution of all threads in this context. Users can use hasPendingCancellation
to decide whether to terminate threads early and how to do so.
Thread Local Variables
The ThreadLocal<T>
type can be used to define thread local variables. Compared with common variables, thread local variables have different access semantics. When multiple threads share the same thread local variable, each thread has its own copy of the value. When a thread accesses a variable, the thread reads and writes the local value of the variable without affecting the values of variables in other threads. Therefore, the ThreadLocal<T>
class is thread-safe. The ThreadLocal<T>
class is defined as follows:
- The constructor method
init
is used to construct thread local variables. When the variable is constructed but the variable value is not set, the thread reads the variable and obtains the null valueNone
. - The read and write methods
get
andset
are used to access the value of a thread local variable. After a thread sets the value of a variable toNone
, the variable value in the current thread is cleared.
class ThreadLocal<T> {
// Construct a thread-local variable contains None.
public init()
// Get the value of the thread-local variable of the current executing thread.
public func get(): ?T
// Set a value to the thread-local variable.
public func set(value: ?T): Unit
}
The following example shows how to use thread local variables.
let tlv = ThreadLocal<Int64>() // Define a thread-local variable
main(): Unit {
for (i in 0..3) { // Spawn three threads
spawn {
tlv.set(i) // Each thread sets a different value
// ...
println("${tlv.get()}") // Each thread prints its own value
}
}
// ...
println("${tlv.get()}") // Print `None`
// since the current thread does not set any value.
}
Synchronization Mechanism
Atomic Operations
Atomic operations ensure that instructions are executed atomically, that is, the execution process is not interrupted. A write operation on an atomic variable is always visible to subsequent read operations on the same atomic variable. In addition, atomic operations are non-blocking actions and do not cause thread blocking. Cangjie provides atomic operations for integer types (including Int8
, Int16
, Int32
, Int64
, UInt8
, UInt16
, UInt32
, and UInt64
), Boolean types (Bool
), and reference types.
- For the integer type, basic read, write, swap, and arithmetic operations are provided.
load
: readstore
: writeswap
: swapcompareAndSwap
: compare and exchangefetchAdd
: addfetchSub
: subtractfetchAnd
: andfetchOr
: orfetchXor
: exclusive OR
// Signed Integers.
class AtomicInt8 {
... ...
init(val: Int8)
public func load(): Int8
public func store(val: Int8): Unit
public func swap(val: Int8): Int8
public func compareAndSwap(old: Int8, new: Int8): Bool
public func fetchAdd(val: Int8): Int8
public func fetchSub(val: Int8): Int8
public func fetchAnd(val: Int8): Int8
public func fetchOr(val: Int8): Int8
public func fetchXor(val: Int8): Int8
... ... // Operator overloading, etc.
}
class AtomicInt16 {...}
class AtomicInt32 {...}
class AtomicInt64 {...}
// Unsigned Integers.
class AtomicUInt8 {...}
class AtomicUInt16 {...}
class AtomicUInt32 {...}
class AtomicUInt64 {...}
- For the Boolean type, only the basic read, write and swap operations are provided, and arithmetic operations are not provided.
load
: readstore
: writeswap
: swapcompareAndSwap
: compare and exchange
// Boolean.
class AtomicBool {
... ...
init(val: Bool)
public func load(): Bool
public func store(val: Bool): Unit
public func swap(val: Bool): Bool
public func compareAndSwap(old: Bool, new: Bool): Bool
... ... // Operator overloading, etc.
}
- For the reference type, only the basic read, write and swap operations are provided, and arithmetic operations are not provided.
load
: readstore
: writeswap
: swapcompareAndSwap
: compare and exchange
class AtomicReference<T> where T <: Object {
... ...
init(val: T)
public func load(): T
public func store(val: T): Unit
public func swap(val: T): T
public func compareAndSwap(old: T, new: T): Bool
}
- In addition, for a nullable reference type, a "null reference" (represented by
None
) may be stored by usingAtomicOptionReference
.
class AtomicOptionReference<T> where T <: Object {
public init(val: Option(T))
public func load(): Option<T>
public func store(val: Option<T>): Unit
public func swap(val: Option<T>): Option<T>
public func compareAndSwap(old: Option<T>, new: Option<T>): Bool
}
Each of the preceding methods contains a hidden parameter
memory order
. Currently, only the Sequential Consistency memory model is supported. In the future, looser memory models, such as the acquire or release memory order, may be added.
IllegalSynchronizationStateException
IllegalSynchronizationStateException
is a runtime exception thrown by built-in concurrency primitives when an illegal action occurs, such as when a thread attempts to release a mutex that it has not acquired.
IReentrantMutex
IReentrantMutex
is a public interface of reentrant mutex concurrency primitives. It provides the following three methods, which need to be implemented:
lock(): Unit
: acquires the lock. If the lock cannot be acquired, the current thread is blocked.tryLock(): Bool
: attempts to acquire the lock.unlock(): Unit
: releases the lock.
interface IReentrantMutex {
// Locks the mutex, blocks the current thread if the mutex is not available.
public func lock(): Unit
// Tries to lock the mutex, returns false if the mutex is not available, otherwise locks the mutex and returns true.
public func tryLock(): Bool
// Unlocks the mutex. If the mutex was locked repeatedly N times, this method should be invoked N times to
// fully unlock the mutex. When the mutex is fully unlocked, unblocks one of the threads waiting on its `lock`
// (no particular admission policy implied).
// Throws ISSE("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
public func unlock(): Unit
}
Note:
- When implementing this interface, ensure that the underlying mutex supports nested locks.
- When implementing this interface, ensure that
IllegalSynchronizationStateException
is thrown when the lock status is abnormal.
ReentrantMutex
ReentrantMutex
provides the following methods:
lock(): Unit
: acquires the lock. If the lock cannot be acquired, the current thread is blocked.tryLock(): Bool
: attempts to acquire the lock.unlock(): Unit
: releases the lock.
ReentrantMutex
is a built-in lock that can be held by only one thread at a time. If the given ReentrantMutex
has been held by another thread, the lock
method blocks the current thread until the mutex is released, and the tryLock
method immediately returns false
.
ReentrantMutex
is a reentrant lock. That is, if a thread attempts to acquire a ReentrantMutex
lock that it already holds, the thread immediately acquires the ReentrantMutex
. To successfully release the lock, the number of times that unlock
is called must match the number of times that lock
is called.
Note: ReentrantMutex is a built-in mutex and you cannot inherit it.
// Base class for built-in reentrant mutual exclusion concurrency primitives.
sealed class ReentrantMutex <: IReentrantMutex {
// Constructor.
init()
// Locks the mutex, blocks current thread if the mutex is not available.
public func lock(): Unit
// Tries to lock the mutex, returns false if the mutex is not available, otherwise locks the mutex and returns true.
public func tryLock(): Bool
// Unlocks the mutex. If the mutex was locked repeatedly N times, this method should be invoked N times to
// fully unlock the mutex. Once the mutex is fully unlocked, unblocks one of the threads waiting in its `lock` method, if any
// (no particular admission policy implied).
// Throws ISSE("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
public func unlock(): Unit
}
synchronized
The synchronized
keyword and a ReentrantMutex
object are used to protect the modified code block so that only one thread is allowed to execute the code in the code block at a time. An instance of ReentrantMutex
or its derived classes can be passed as a parameter to the synchronized
keyword, which results in the following conversion:
//=======================================
// `synchronized` expression
//=======================================
let m: ReentrantMutex = ...
synchronized(m) {
foo()
}
//=======================================
// is equivalent to the following program.
//=======================================
let m: ReentrantMutex = ...
m.lock()
try {
foo()
} finally {
m.unlock()
}
Note: The synchronized
keyword is incompatible with the IReentrantMutex
interface.
- Before entering the code block modified by
synchronized
, a thread must first acquires the lock corresponding to theReentrantMutex
instance. If the lock cannot be acquired, the current thread is blocked. - After a thread exits the code block modified by
synchronized
, the lock of theReentrantMutex
object is automatically released. Only oneunlock
operation is performed, allowing for nestedsynchronized
code blocks using the same mutex. - After the
return e
expression in the code block modified bysynchronized
is reached, the value ofe
is evaluated tov
,m.unlock()
is called, and thenv
is returned. - If the code block modified by
synchronized
contains abreak
orcontinue
expression that would cause the program to exit the block, them.unlock()
method is automatically called. - When an exception occurs in the code block modified by
synchronized
and causes the program to exit the block, them.unlock()
method is automatically called.
The BNF of the synchronized
expression is as follows:
synchronizedExpression
: 'synchronized' '(' expression ')' block
;
Here, expression
is a ReentrantMutex
object.
Monitor
Monitor
is a built-in data structure that binds a mutex to a single related condition variable (wait queue). Monitor
can block a thread and wait for a signal from another thread to resume execution. This mechanism uses shared variables to synchronize threads. The following methods are provided:
wait(timeout!: Duration = Duration.Max): Bool
waits for a signal and blocks the current thread.notify(): Unit
: wakes up a thread waiting on theMonitor
(if any).notifyAll(): Unit
: wakes up all threads waiting on theMonitor
(if any).
Before calling the wait
, notify
, or notifyAll
method of the Monitor
object, ensure that the current thread has held the corresponding Monitor
lock. The wait
method contains the following actions:
- Add the current thread to the waiting queue corresponding to the
Monitor
. - Block the current thread, release the
Monitor
lock, and record the number of acquisition times. - Wait for another thread to use the
notify
ornotifyAll
method of the sameMonitor
instance to send a signal to the thread. - Wake up the current thread, acquire the
Monitor
lock, and restore the acquisition times recorded in step 2.
Note: The wait
method accepts one default parameter timeout
. It should be noted that many common OSs in the industry do not ensure real-time scheduling. Therefore, it cannot be ensured that a thread is blocked for an "exact duration". There may be system-related inaccuracies. In addition, the current language specification explicitly allows implementations to produce false wake-ups. In this case, the return value of wait
is implementation-determined, possibly true or false. Therefore, developers are encouraged to always wrap wait
in a loop.
synchronized (obj) {
while (<condition is not true>) {
obj.wait();
}
}
Monitor
is defined as follows:
class Monitor <: ReentrantMutex {
// Constructor.
init()
// Blocks until either a paired `notify` is invoked or `timeout` pass.
// Returns `true` if the monitor was signalled by another thread or `false` on timeout. Spurious wakeups are allowed.
// Throws ISSE("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
func wait(timeout!: Duration = Duration.Max): Bool
// Wakes up a single thread waiting on this monitor, if any (no particular admission policy implied).
// Throws ISSE("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
func notify(): Unit
// Wakes up all threads waiting on this monitor, if any (no particular admission policy implied).
// Throws ISSE("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
func notifyAll(): Unit
}
MultiConditionMonitor
MultiConditionMonitor
is a built-in data structure that binds a mutex to a group of dynamically created condition variables. This class should be used only when the Monitor
class cannot implement advanced concurrency algorithms.
The following methods are provided:
newCondition(): ConditionID
: creates a waiting queue, associates it with the current object, and returns a specificConditionID
.wait(id: ConditionID, timeout!: Duration = Duration.Max): Bool
: waits for a signal and blocks the current thread.notify(id: ConditionID): Unit
: wakes up a thread waiting on theMonitor
(if any).notifyAll(id: ConditionID): Unit
: wakes up all threads waiting on theMonitor
(if any).
During initialization, the MultiConditionMonitor
does not have a related ConditionID
instance. Each time newCondition
is called, a new waiting queue is created and associated with the current object, returning a unique identifier of the following type:
external struct ConditionID {
private init() { ... } // constructor is intentionally private to prevent
// creation of such structs outside of MultiConditionMonitor
}
Note that users cannot pass a ConditionID
returned by a MultiConditionMonitor
instance to other instances or manually create a ConditionID
(for example, using unsafe
). Because the data contained in the ConditionID
(such as the index of the internal array, the direct address of the internal queue, or any other type of data) is related to the MultiConditionMonitor
that created it, passing an "external" conditonID
to the MultiConditionMonitor
will result in IllegalSynchronizationStateException
.
class MultiConditionMonitor <: ReentrantMutex {
// Constructor.
init()
// Returns a new ConditionID associated with this monitor. May be used to implement
// "single mutex -- multiple wait queues" concurrent primitives.
// Throws ISSE("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
func newCondition(): ConditionID
// Blocks until either a paired `notify` is invoked or `timeout` pass.
// Returns `true` if the specified condition was signalled by another thread or `false` on timeout.
// Spurious wakeups are allowed.
// Throws ISSE("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
// Throws ISSE("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
func wait(id: ConditionID, timeout!: Duration = Duration.Max): Bool
// Wakes up a single thread waiting on the specified condition, if any (no particular admission policy implied).
// Throws ISSE("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
// Throws ISSE("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
func notify(id: ConditionID): Unit
// Wakes up all threads waiting on the specified condition, if any (no particular admission policy implied).
// Throws ISSE("Mutex is not locked by the current thread") if the current thread does not hold this mutex.
// Throws ISSE("Invalid condition") if `id` was not returned by `newCondition` of this MultiConditionMonitor instance.
func notifyAll(id: ConditionID): Unit
}
Example: The following describes how to use MultiConditionMonitor
to implement a "bounded FIFO queue with a fixed length". When the queue is empty, get()
is blocked. When the queue is full, put()
is blocked.
class BoundedQueue {
// Create a MultiConditionMonitor, two Conditions.
let m: MultiConditionMonitor = MultiConditionMonitor()
var notFull: ConditionID
var notEmpty: ConditionID
var count: Int64 // Object count in buffer.
var head: Int64 // Write index.
var tail: Int64 // Read index.
// Queue's length is 100.
let items: Array<Object> = Array<Object>(100, {i => Object()})
init() {
count = 0
head = 0
tail = 0
synchronized(m) {
notFull = m.newCondition()
notEmpty = m.newCondition()
}
}
// Insert an object, if the queue is full, block the current thread.
public func put(x: Object) {
// Acquire the mutex.
synchronized(m) {
while (count == 100) {
// If the queue is full, wait for the "queue notFull" event.
m.wait(notFull)
}
items[head] = x
head++
if (head == 100) {
head = 0
}
count++
// An object has been inserted and the current queue is no longer
// empty, so wake up the thread previously blocked on get()
// because the queue was empty.
m.notify(notEmpty)
} // Release the mutex.
}
// Pop an object, if the queue is empty, block the current thread.
public func get(): Object {
// Acquire the mutex.
synchronized(m) {
while (count == 0) {
// If the queue is empty, wait for the "queue notEmpty" event.
m.wait(notEmpty)
}
let x: Object = items[tail]
tail++
if (tail == 100) {
tail = 0
}
count--
// An object has been popped and the current queue is no longer
// full, so wake up the thread previously blocked on put()
// because the queue was full.
m.notify(notFull)
return x
} // Release the mutex.
}
}
Memory Model
The memory model is mainly used to solve the memory visibility issue in concurrent programming. It specifies when a write operation performed by a thread on a variable can be observed by a read operation performed by another thread on the same variable.
- If there is data contention, the behavior is undefined.
- If there is no data contention, the value read by a read operation is the value written by the nearest preceding write operation in the happens-before order.
It mainly addresses the issue of memory visibility in concurrent programming, specifically when a write operation by one thread becomes visible to another thread.
Data Race
If two threads access the same data
, at least one of them is a write operation, and there is no happens-before relationship (defined in section 15.4.2) between the two operations, a data race occurs.
Definition of "accessing the same data":
- The access to the same field of the struct or class type or the same variable of the primitive, enum, or array type is regarded as the access to the same data.
- The access to different fields of the struct or class type is regarded as the access to different data.
Happens-Before
- Program order rule: Each operation in a thread happens before any subsequent operation in the thread.
var a: String
main(): Int64 {
a = "hello, world"
println(a)
return 0
}
// OUTPUT:
// hello, world
- Thread startup rule: If thread A creates thread B using
spawn
, thespawn
operation of thread A happens before any operation of thread B.
var a: String = "123"
func foo(): Unit {
println(a)
}
main(): Int64 {
a = "hello, world"
let fut: Future<Unit>= spawn {
foo()
}
fut.get()
return 0
}
// OUTPUT:
// hello, world
- Thread termination rule: If thread A calls
futureB.get()
and it returns successfully, any operation in thread B happens beforefutureB.get()
called in thread A. If thread A callsfutureB.cancel()
and thread B accesseshasPendingCancellation
after that, the two calls form a happens-before relationship.
var a: String = "123"
func foo(): Unit {
a = "hello, world"
}
main(): Int64 {
let fut: Future<Unit> = spawn {
foo()
}
fut.get()
println(a)
return 0
}
// OUTPUT:
// hello, world
-
Thread synchronization rule: Operations on the same thread synchronization object (such as a mutex or semaphore) have a total order. An operation performed by a thread on a synchronization object (such as unlocking a mutex) happens before any subsequent operations on that synchronization object in this total order (such as locking the mutex).
-
Atomic variable rule: all operations on atomic variables are performed in a total order. The operation of a thread on an atomic variable happens before all subsequent operations on that atomic variable in this total order.
-
Transitivity rule: If A happens before B and B happens before C, then A happens before C.