Links
Link
s are built on top of Channel
s of Julia. They are used as communication primitives for Task
s of Julia. A Link
basically includes a Channel
and a Buffer
. The mode of the buffer is Cyclic
.(see Buffer Modes for information on buffer modes). Every item sent through a Link
is sent through the channel of the Link
and written to the Buffer
so that all the data flowing through a Link
is recorded.
Construction of Links
The construction of a Link
is very simple: just specify its buffer length and element type.
julia> using Causal # hide
julia> Link{Bool}(5)
Link(state:open, eltype:Bool, isreadable:false, iswritable:false)
julia> Link{Int}(10)
Link(state:open, eltype:Int64, isreadable:false, iswritable:false)
julia> Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)
julia> Link()
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)
Data Flow through Links
The data can be read from and written into Link
s if active tasks are bound to them. Link
s can be thought of like a pipe. In order to write data to a Link
from one of its ends, a task that reads written data from the other end must be bounded to the Link
. Similarly, in order to read data from one of the Link
from one of its end, a task that writes the read data must be bound to the Link
. Reading from and writing to Link
is carried out with take!
and put!
functions. For more clarity, let us see some examples.
Let us first construct a Link
,
julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)
l
is a Link
with a buffer length of 5
and element type of Float64
. Not that the l
is open, but it is not ready for data reading or writing. To write data, we must bound a task that reads the written data.
julia> function reader(link::Link) # Define job.
while true
val = take!(link)
val === NaN && break # Poison-pill the tasks to terminate safely.
end
end
reader (generic function with 1 method)
julia> t = @async reader(l)
Task (runnable) @0x00007f1fd6239870
The reader
is defined such that the data written from one end of l
is read until the data is NaN
. Now, we have runnable a task t
. This means the l
is ready for data writing.
julia> put!(l, 1.)
1.0
julia> put!(l, 2.)
2.0
Note that the data flown through the l
is written to its buffer
.
julia> l.buffer
5-element Buffer{Cyclic,Float64,1}:
2.0
1.0
0.0
0.0
0.0
To terminate the task, we must write NaN
to l
.
julia> put!(l, NaN) # Terminate the task
NaN
julia> t # Show that the `t` is terminated.
Task (done) @0x00007f1fd6239870
Whenever the bound task to the l
is runnable, the data can be written to l
. That is, the data length that can be written to l
is not limited by the buffer length of l
. But, beware that the buffer
of Link
s is Cyclic
. That means, when the buffer
is full, its data is overwritten.
julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)
julia> t = @async reader(l)
Task (runnable) @0x00007f1fd6239d50
julia> for item in 1. : 10.
put!(l, item)
@show outbuf(l.buffer)
end
outbuf(l.buffer) = [1.0, 0.0, 0.0, 0.0, 0.0]
outbuf(l.buffer) = [2.0, 1.0, 0.0, 0.0, 0.0]
outbuf(l.buffer) = [3.0, 2.0, 1.0, 0.0, 0.0]
outbuf(l.buffer) = [4.0, 3.0, 2.0, 1.0, 0.0]
outbuf(l.buffer) = [5.0, 4.0, 3.0, 2.0, 1.0]
outbuf(l.buffer) = [6.0, 5.0, 4.0, 3.0, 2.0]
outbuf(l.buffer) = [7.0, 6.0, 5.0, 4.0, 3.0]
outbuf(l.buffer) = [8.0, 7.0, 6.0, 5.0, 4.0]
outbuf(l.buffer) = [9.0, 8.0, 7.0, 6.0, 5.0]
outbuf(l.buffer) = [10.0, 9.0, 8.0, 7.0, 6.0]
The case is very similar to read data from l
. Again, a runnable task is bound the l
julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)
julia> function writer(link::Link, vals)
for val in vals
put!(link, val)
end
end
writer (generic function with 1 method)
julia> t = @async writer(l, 1.:5.)
Task (runnable) @0x00007f1fd623ba90
julia> bind(l, t)
Channel{Float64}(sz_max:0,sz_curr:1)
julia> take!(l)
1.0
julia> take!(l)
2.0
It is possible to read data from l
until t
is active. To read all the data at once, collect
can be used.
julia> t
Task (runnable) @0x00007f1fd623ba90
julia> collect(l)
3-element Array{Float64,1}:
3.0
4.0
5.0
julia> t # Show that `t` is terminated.
Task (done) @0x00007f1fd623ba90
Full API
Causal.Link
— Typemutable struct Link{T}
A Link
connects an Outpin and Inpin for data flow. See Outpin
, Inpin
, connect!
Fields
buffer::Buffer{Cyclic,T,1} where T
Internal buffer to record data flowing throgh link
channel::Channel{T} where T
Internal channel for data flow
masterid::Base.UUID
Unique identifier of the outpin of the source of the link
slaveid::Base.UUID
Unique identifier fo the inpin of the destination of the link
id::Base.UUID
Unique identifier
Example
julia> l = Link{Int}(5)
Link(state:open, eltype:Int64, isreadable:false, iswritable:false)
julia> l = Link{Bool}()
Link(state:open, eltype:Bool, isreadable:false, iswritable:false)
Base.bind
— Methodbind(link, task)
Binds task
to link
. When task
is done link
is closed.
Base.close
— Methodclose(link)
Closes link
. All the task bound the link
is also terminated safely. When closed, it is not possible to take and put element from the link
. See also: take!(link::Link)
, put!(link::Link, val)
```
Base.collect
— Methodcollect(link)
Collects all the available data on the link
.
To collect all available data from link
, a task must be bounded to it.
Example
julia> l = Link(); # Construct a link.
julia> t = @async for item in 1 : 5 # Construct a task
put!(l, item)
end;
julia> bind(l, t); # Bind it to the link.
julia> take!(l) # Take element from link.
1.0
julia> take!(l) # Take again ...
2.0
julia> collect(l) # Collect remaining data.
3-element Array{Float64,1}:
3.0
4.0
5.0
Base.eltype
— MethodReturns element type of link
.
Base.isopen
— Methodisopen(link)
Returns true
if link
is open. A link
is open if its channel
is open.
Base.isreadable
— Methodisreadable(link)
Returns true
if link
is readable. When link
is readable, data can be read from link
with take
function.
Base.iswritable
— Methodiswritable(link)
Returns true
if link
is writable. When link
is writable, data can be written into link
with put
function.
Base.put!
— Methodput!(link, val)
Puts val
to link
. val
is handed over to the channel
of link
. val
is also written in to the buffer
of link
.
link
must be writable to put val
. That is, a runnable task that takes items from the link must be bounded to link
.
Example
julia> l = Link();
julia> t = @async while true
item = take!(l)
item === NaN && break
println("Took " * string(item))
end;
julia> bind(l, t);
julia> put!(l, 1.)
Took 1.0
1.0
julia> put!(l, 2.)
Took 2.0
2.0
julia> put!(l, NaN)
NaN
Base.take!
— Methodtake!(link)
Take an element from link
.
link
must be readable to take value. That is, a runnable task that puts items from the link must be bounded to link
.
Example
julia> l = Link(5);
julia> t = @async for item in 1. : 5.
put!(l, item)
end;
julia> bind(l, t);
julia> take!(l)
1.0
julia> take!(l)
2.0
Causal.clean!
— Methodclean!(link)
Cleans the data on link
.
Causal.isfull
— Methodisfull(link)
Returns true
if the buffer
of link
is full.
Causal.launch
— Methodlaunch(link, valrange)
Constructs a putter
task and binds it to link
. putter
tasks puts the data in valrange
.
Causal.launch
— Methodlaunch(link)
Constructs a taker
task and binds it to link
. The taker
task reads the data and prints an info message until missing
is read from the link
.
Causal.refresh!
— MethodReconstructst the channel of link
is its channel is closed.
Causal.snapshot
— Methodsnapshot(link)
Returns all the data of the buffer
of link
.