Links

Links are built on top of Channels of Julia. They are used as communication primitives for Tasks of Julia. A Link basically includes a Channel and a Buffer. The mode of the buffer is Cyclic.(see Buffer Modes for information on buffer modes). Every item sent through a Link is sent through the channel of the Link and written to the Buffer so that all the data flowing through a Link is recorded.

The construction of a Link is very simple: just specify its buffer length and element type.

julia> using Causal # hide

julia> Link{Bool}(5)
Link(state:open, eltype:Bool, isreadable:false, iswritable:false)

julia> Link{Int}(10)
Link(state:open, eltype:Int64, isreadable:false, iswritable:false)

julia> Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

julia> Link()
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

The data can be read from and written into Links if active tasks are bound to them. Links can be thought of like a pipe. In order to write data to a Link from one of its ends, a task that reads written data from the other end must be bounded to the Link. Similarly, in order to read data from one of the Link from one of its end, a task that writes the read data must be bound to the Link. Reading from and writing to Link is carried out with take! and put! functions. For more clarity, let us see some examples.

Let us first construct a Link,


julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

l is a Link with a buffer length of 5 and element type of Float64. Not that the l is open, but it is not ready for data reading or writing. To write data, we must bound a task that reads the written data.

julia> function reader(link::Link)  # Define job.
           while true
               val = take!(link)
               val === NaN && break  # Poison-pill the tasks to terminate safely.
           end
       end
reader (generic function with 1 method)

julia> t = @async reader(l)
Task (runnable) @0x00007f1fd6239870

The reader is defined such that the data written from one end of l is read until the data is NaN. Now, we have runnable a task t. This means the l is ready for data writing.

julia> put!(l, 1.)
1.0

julia> put!(l, 2.)
2.0

Note that the data flown through the l is written to its buffer.

julia> l.buffer
5-element Buffer{Cyclic,Float64,1}:
 2.0
 1.0
 0.0
 0.0
 0.0

To terminate the task, we must write NaN to l.

julia> put!(l, NaN)  # Terminate the task
NaN

julia> t   # Show that the `t` is terminated.
Task (done) @0x00007f1fd6239870

Whenever the bound task to the l is runnable, the data can be written to l. That is, the data length that can be written to l is not limited by the buffer length of l. But, beware that the buffer of Links is Cyclic. That means, when the buffer is full, its data is overwritten.

julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

julia> t = @async reader(l)
Task (runnable) @0x00007f1fd6239d50

julia> for item in 1. : 10.
           put!(l, item)
           @show outbuf(l.buffer)
       end
outbuf(l.buffer) = [1.0, 0.0, 0.0, 0.0, 0.0]
outbuf(l.buffer) = [2.0, 1.0, 0.0, 0.0, 0.0]
outbuf(l.buffer) = [3.0, 2.0, 1.0, 0.0, 0.0]
outbuf(l.buffer) = [4.0, 3.0, 2.0, 1.0, 0.0]
outbuf(l.buffer) = [5.0, 4.0, 3.0, 2.0, 1.0]
outbuf(l.buffer) = [6.0, 5.0, 4.0, 3.0, 2.0]
outbuf(l.buffer) = [7.0, 6.0, 5.0, 4.0, 3.0]
outbuf(l.buffer) = [8.0, 7.0, 6.0, 5.0, 4.0]
outbuf(l.buffer) = [9.0, 8.0, 7.0, 6.0, 5.0]
outbuf(l.buffer) = [10.0, 9.0, 8.0, 7.0, 6.0]

The case is very similar to read data from l. Again, a runnable task is bound the l


julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

julia> function writer(link::Link, vals)
           for val in vals
               put!(link, val)
           end
       end
writer (generic function with 1 method)

julia> t = @async writer(l, 1.:5.)
Task (runnable) @0x00007f1fd623ba90

julia> bind(l, t)
Channel{Float64}(sz_max:0,sz_curr:1)

julia> take!(l)
1.0

julia> take!(l)
2.0

It is possible to read data from l until t is active. To read all the data at once, collect can be used.

julia> t
Task (runnable) @0x00007f1fd623ba90

julia> collect(l)
3-element Array{Float64,1}:
 3.0
 4.0
 5.0

julia> t  # Show that `t` is terminated.
Task (done) @0x00007f1fd623ba90

Full API

Causal.LinkType
mutable struct Link{T}

A Link connects an Outpin and Inpin for data flow. See Outpin, Inpin, connect!

Fields

  • buffer::Buffer{Cyclic,T,1} where T

    Internal buffer to record data flowing throgh link

  • channel::Channel{T} where T

    Internal channel for data flow

  • masterid::Base.UUID

    Unique identifier of the outpin of the source of the link

  • slaveid::Base.UUID

    Unique identifier fo the inpin of the destination of the link

  • id::Base.UUID

    Unique identifier

Example

julia> l = Link{Int}(5)
Link(state:open, eltype:Int64, isreadable:false, iswritable:false)

julia> l = Link{Bool}()
Link(state:open, eltype:Bool, isreadable:false, iswritable:false)
source
Base.bindMethod
bind(link, task)

Binds task to link. When task is done link is closed.

source
Base.collectMethod
collect(link)

Collects all the available data on the link.

Warning

To collect all available data from link, a task must be bounded to it.

Example

julia> l = Link();  # Construct a link.

julia> t = @async for item in 1 : 5  # Construct a task
       put!(l, item)
       end;

julia> bind(l, t);  # Bind it to the link.

julia> take!(l)  # Take element from link.
1.0

julia> take!(l)  # Take again ...
2.0

julia> collect(l)  # Collect remaining data.
3-element Array{Float64,1}:
 3.0
 4.0
 5.0
source
Base.isopenMethod
isopen(link)

Returns true if link is open. A link is open if its channel is open.

source
Base.isreadableMethod
isreadable(link)

Returns true if link is readable. When link is readable, data can be read from link with take function.

source
Base.iswritableMethod
iswritable(link)

Returns true if link is writable. When link is writable, data can be written into link with put function.

source
Base.put!Method
put!(link, val)

Puts val to link. val is handed over to the channel of link. val is also written in to the buffer of link.

Warning

link must be writable to put val. That is, a runnable task that takes items from the link must be bounded to link.

Example

julia> l = Link();

julia> t  = @async while true 
       item = take!(l)
       item === NaN && break 
       println("Took " * string(item))
       end;

julia> bind(l, t);

julia> put!(l, 1.)
Took 1.0
1.0

julia> put!(l, 2.)
Took 2.0
2.0

julia> put!(l, NaN)
NaN
source
Base.take!Method
take!(link)

Take an element from link.

Warning

link must be readable to take value. That is, a runnable task that puts items from the link must be bounded to link.

Example

julia> l = Link(5);

julia> t = @async for item in 1. : 5.
       put!(l, item)
       end;

julia> bind(l, t);

julia> take!(l)
1.0

julia> take!(l)
2.0
source
Causal.launchMethod
launch(link, valrange)

Constructs a putter task and binds it to link. putter tasks puts the data in valrange.

source
Causal.launchMethod
launch(link)

Constructs a taker task and binds it to link. The taker task reads the data and prints an info message until missing is read from the link.

source