Links
Links are built on top of Channels of Julia. They are used as communication primitives for Tasks of Julia. A Link basically includes a Channel and a Buffer. The mode of the buffer is Cyclic.(see Buffer Modes for information on buffer modes). Every item sent through a Link is sent through the channel of the Link and written to the Buffer so that all the data flowing through a Link is recorded.
Construction of Links
The construction of a Link is very simple: just specify its buffer length and element type.
julia> using Causal # hide
julia> Link{Bool}(5)
Link(state:open, eltype:Bool, isreadable:false, iswritable:false)
julia> Link{Int}(10)
Link(state:open, eltype:Int64, isreadable:false, iswritable:false)
julia> Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)
julia> Link()
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)Data Flow through Links
The data can be read from and written into Links if active tasks are bound to them. Links can be thought of like a pipe. In order to write data to a Link from one of its ends, a task that reads written data from the other end must be bounded to the Link. Similarly, in order to read data from one of the Link from one of its end, a task that writes the read data must be bound to the Link. Reading from and writing to Link is carried out with take! and put! functions. For more clarity, let us see some examples.
Let us first construct a Link,
julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)l is a Link with a buffer length of 5 and element type of Float64. Not that the l is open, but it is not ready for data reading or writing. To write data, we must bound a task that reads the written data.
julia> function reader(link::Link) # Define job.
while true
val = take!(link)
val === NaN && break # Poison-pill the tasks to terminate safely.
end
end
reader (generic function with 1 method)
julia> t = @async reader(l)
Task (runnable) @0x00007fe0d4b47820The reader is defined such that the data written from one end of l is read until the data is NaN. Now, we have runnable a task t. This means the l is ready for data writing.
julia> put!(l, 1.)
1.0
julia> put!(l, 2.)
2.0Note that the data flown through the l is written to its buffer.
julia> l.buffer
5-element Buffer{Cyclic,Float64,1}:
2.0
1.0
0.0
0.0
0.0To terminate the task, we must write NaN to l.
julia> put!(l, NaN) # Terminate the task
NaN
julia> t # Show that the `t` is terminated.
Task (done) @0x00007fe0d4b47820Whenever the bound task to the l is runnable, the data can be written to l. That is, the data length that can be written to l is not limited by the buffer length of l. But, beware that the buffer of Links is Cyclic. That means, when the buffer is full, its data is overwritten.
julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)
julia> t = @async reader(l)
Task (runnable) @0x00007fe0d5d3abf0
julia> for item in 1. : 10.
put!(l, item)
@show outbuf(l.buffer)
end
outbuf(l.buffer) = [1.0, 0.0, 0.0, 0.0, 0.0]
outbuf(l.buffer) = [2.0, 1.0, 0.0, 0.0, 0.0]
outbuf(l.buffer) = [3.0, 2.0, 1.0, 0.0, 0.0]
outbuf(l.buffer) = [4.0, 3.0, 2.0, 1.0, 0.0]
outbuf(l.buffer) = [5.0, 4.0, 3.0, 2.0, 1.0]
outbuf(l.buffer) = [6.0, 5.0, 4.0, 3.0, 2.0]
outbuf(l.buffer) = [7.0, 6.0, 5.0, 4.0, 3.0]
outbuf(l.buffer) = [8.0, 7.0, 6.0, 5.0, 4.0]
outbuf(l.buffer) = [9.0, 8.0, 7.0, 6.0, 5.0]
outbuf(l.buffer) = [10.0, 9.0, 8.0, 7.0, 6.0]The case is very similar to read data from l. Again, a runnable task is bound the l
julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)
julia> function writer(link::Link, vals)
for val in vals
put!(link, val)
end
end
writer (generic function with 1 method)
julia> t = @async writer(l, 1.:5.)
Task (runnable) @0x00007fe0d61fbd00
julia> bind(l, t)
Channel{Float64}(sz_max:0,sz_curr:1)
julia> take!(l)
1.0
julia> take!(l)
2.0It is possible to read data from l until t is active. To read all the data at once, collect can be used.
julia> t
Task (runnable) @0x00007fe0d61fbd00
julia> collect(l)
3-element Array{Float64,1}:
3.0
4.0
5.0
julia> t # Show that `t` is terminated.
Task (done) @0x00007fe0d61fbd00Full API
Causal.Link — TypeLink{T}(ln::Int=64) where {T}Constructs a Link with element type T and buffer length ln. The buffer element type is T and mode is Cyclic.
Link(ln::Int=64)Constructs a Link with element type Float64 and buffer length ln. The buffer element type is Float64 and mode is Cyclic.
Example
julia> l = Link{Int}(5)
Link(state:open, eltype:Int64, isreadable:false, iswritable:false)
julia> l = Link{Bool}()
Link(state:open, eltype:Bool, isreadable:false, iswritable:false)Base.bind — Methodbind(link::Link, task::Task)Binds task to link. When task is done link is closed.
Base.close — Methodclose(link)Closes link. All the task bound the link is also terminated safely. When closed, it is not possible to take and put element from the link. See also: take!(link::Link), put!(link::Link, val) ```
Base.collect — Methodcollect(link::Link)Collects all the available data on the link.
To collect all available data from link, a task must be bounded to it.
Example
julia> l = Link(); # Construct a link.
julia> t = @async for item in 1 : 5 # Construct a task
put!(l, item)
end;
julia> bind(l, t); # Bind it to the link.
julia> take!(l) # Take element from link.
1.0
julia> take!(l) # Take again ...
2.0
julia> collect(l) # Collect remaining data.
3-element Array{Float64,1}:
3.0
4.0
5.0Base.eltype — Methodeltype(link::Link)Returns element type of link.
Base.isopen — Methodisopen(link::Link)Returns true if link is open. A link is open if its channel is open.
Base.isreadable — Methodisreadable(link::Link)Returns true if link is readable. When link is readable, data can be read from link with take function.
Base.iswritable — Methodwritable(link::Link)Returns true if link is writable. When link is writable, data can be written into link with put function.
Base.put! — Methodput!(link::Link, val)Puts val to link. val is handed over to the channel of link. val is also written in to the buffer of link.
link must be writable to put val. That is, a runnable task that takes items from the link must be bounded to link.
Example
julia> l = Link();
julia> t = @async while true
item = take!(l)
item === NaN && break
println("Took " * string(item))
end;
julia> bind(l, t);
julia> put!(l, 1.)
Took 1.0
1.0
julia> put!(l, 2.)
Took 2.0
2.0
julia> put!(l, NaN)
NaNBase.take! — Methodtake!(link::Link)Take an element from link.
link must be readable to take value. That is, a runnable task that puts items from the link must be bounded to link.
Example
julia> l = Link(5);
julia> t = @async for item in 1. : 5.
put!(l, item)
end;
julia> bind(l, t);
julia> take!(l)
1.0
julia> take!(l)
2.0Causal.isfull — Methodisfull(link::Link)Returns true if the buffer of link is full.
Causal.launch — Methodlaunch(link:Link, valrange)Constructs a putter task and binds it to link. putter tasks puts the data in valrange.
Causal.launch — Methodlaunch(link::Link)Constructs a taker task and binds it to link. The taker task reads the data and prints an info message until missing is read from the link.
Causal.refresh! — Methodrefresh!(link::Link)Reconstructst the channel of link is its channel is closed.
Causal.snapshot — Methodsnapshot(link::Link)Returns all the data of the buffer of link.