Pipes API
Base¶
At the top of the pipes hierarchy is the base trait DataPipe[Source, Destination]
which is a thin wrapper for a Scala function having the type (Source) => Destination
. Along with that the base trait also defines how pipes are composed with each other to yield more complex workflows.
Pipes in Parallel¶
The ParallelPipe[Source1, Result1, Source2, Result2]
trait models pipes which are attached to each other, from an implementation point of view these can be seen as data pipes taking input from (Source1, Source2)
and yielding values from (Result1, Result2)
. They can be created in two ways:
By supplying two pipes to the DataPipe()
object.
val pipe1 = DataPipe((x: Double) => math.sin(2.0*x)*math.exp(-2.0*x))
val pipe2 = DataPipe((x: Double) => if(x <= 0.2) "Y" else "N")
val pipe3 = DataPipe(pipe1, pipe2)
//Returns (-0.013, "N")
pipe3((2.0, 15.0))
By duplicating a single pipe using DynaMLPipe.duplicate
//Already imported in DynaML repl
//but should be imported when using DynaML API
//outside of its provided repl environment.
import io.github.mandar2812.dynaml.DynaMLPipe._
val pipe1 = DataPipe((x: Double) => math.sin(2.0*x)*math.exp(-2.0*x))
val pipe3 = duplicate(pipe1)
//Returns (-0.013, -9E-14)
pipe3((2.0, 15.0))
Diverging Pipes¶
The BifurcationPipe[Source, Result1, Result2]
trait represents pipes which start from the same source and yield two result types, from an implementation point of view these can be seen as data pipes taking input from Source1
and yielding values from (Result1, Result2)
. They can be created in two ways:
By supplying a function of type (Source) => (Result1, Result2)
to the DataPipe()
object.
val pipe1 = DataPipe((x: Double) => (1.0*math.sin(2.0*x)*math.exp(-2.0*x), math.exp(-2.0*x)))
pipe1(2.0)
By using the BifurcationPipe()
object
val pipe1 = DataPipe((x: Double) => math.sin(2.0*x)*math.exp(-2.0*x))
val pipe2 = DataPipe((x: Double) => if(x <= 0.2) "Y" else "N")
val pipe3 = BifurcationPipe(pipe1, pipe2)
pipe3(2.0)
Side Effects¶
In order to enable pipes which have side effects i.e. writing to disk, the SideEffectPipe[Source]
trait is used. Conceptually it is a pipe taking as input a value from Source
but has a return type of Unit
.
Stream Processing¶
To simplify writing pipes for scala streams, the StreamDataPipe[I, J, K]
and its subclasses implement workflows on streams.
Map¶
Map every element of a stream.
val pipe1 = StreamDataPipe((x: Double) => math.sin(2.0*x)*math.exp(-2.0*x))
val str: Stream[Double] = (1 to 5).map(_.toDouble).toStream
pipe1(str)
Filter¶
Filter certain elements of a stream.
val pipe1 = StreamDataPipe((x: Double) => x <= 2.5)
val str: Stream[Double] = (1 to 5).map(_.toDouble).toStream
pipe1(str)
Bifurcate stream¶
val pipe1 = StreamPartitionPipe((x: Double) => x <= 2.5)
val str: Stream[Double] = (1 to 5).map(_.toDouble).toStream
pipe1(str)
Side effect¶
val pipe1 = StreamDataPipe((x: Double) => println("Number is: "+x))
val str: Stream[Double] = (1 to 5).map(_.toDouble).toStream
pipe1(str)
The following API members were added in v1.4.1
Flat Map¶
StreamFlatMapPipe
carries out the scala flat-map operation on a stream.
val mapFunc = (n: Int) => (1 to n).sliding(2).toStream
val streamFMPipe = StreamFlatMapPipe(mapFunc)
streamFMPipe((1 to 20).toStream)
Pipes on Spark RDDs
It is also possible to create pipes acting on Spark RDDs.
val num = 20
val sc: SparkContext = _
val numbers = sc.parallelize(1 to num)
val convPipe = RDDPipe((n: Int) => n.toDouble)
val sqPipe = RDDPipe((x: Double) => x*x)
val sqrtPipe = RDDPipe((x: Double) => math.sqrt(x))
val resultPipe = RDDPipe((r: RDD[Double]) => r.reduce(_+_).toInt)
val netPipeline = convPipe > sqPipe > sqrtPipe > resultPipe
netPipeline(numbers)
Advanced Pipes¶
Apart from the basic capabilities offered by the DataPipe[Source, Destination]
interface and its family, users can also work with
more complex workflow components some of which are shown below.
The advanced components of the pipes API enable two key extensions.
- Data pipes which take more than one argument1.
- Data pipes which take an argument and return a data pipe2
Data Pipe 2¶
DataPipe2[A, B, C]
arguments: 2 of type A
and B
respectively
returns: result of type C
val f2: (A, B) => C = _
val pipe2 = DataPipe2(f2)
DataPipe 3¶
DataPipe3[A, B, C, D]
arguments: 3 of type A
, B
and C
respectively
returns: result of type D
val f3: (A, B, C) => D = _
val pipe3 = DataPipe3(f3)
DataPipe 4¶
DataPipe4[A, B, C, D, E]
arguments: 4 of type A
, B
, C
and D
respectively
returns: result of type E
val f4: (A, B, C, D) => E = _
val pipe4 = DataPipe4(f4)
Meta Pipe¶
MetaPipe[A, B, C]
Takes an argument returns a DataPipe
val mpipe = MetaPipe(
(omega: DenseVector[Double]) =>
(x: DenseVector[Double]) => math.exp(-omega.t*x))
//Returns a pipe which computes exp(-2pi/10 1Tx)
val expPipe = mpipe(DenseVector.fill[Double](10)(2.0*math.Pi/10.0))
Meta Pipe (2, 1)¶
MetaPipe21[A, B, C, D]
Takes 2 arguments returns a DataPipe
.
val pipe21 = MetaPipe21(
(alpha: Double, beta: Double) =>
(rv: ContinuousRandomVariable[Double]) => (rv*beta) + alpha
)
val random_func = pipe21(1.5, -0.5)
val result_rv = random_func(RandomVariable(Gamma(1.5, 2.5)))
//Draw samples from resulting random variable
result_rv.iid(500).draw
Meta Pipe (1, 2)¶
MetaPipe12[A, B, C, D]
Takes an argument returns a DataPipe2
>
and >>
operators on higher order pipes
Although the >
operator is defined on higher order pipes, it quickly becomes difficult
to imagine what transformation it can be joined with. For example the >
operator applied
after a MetaPipe[I, J, K]
instance would expect a DataPipe[DataPipe[J,K], _]
instance in order for the join to proceed.
The >>
operator on the other hand has a different and easier purpose.
val mpipe = MetaPipe(
(omega: DenseVector[Double]) =>
(x: DenseVector[Double]) => math.exp(-omega.t*x))
val further_pipe = DataPipe((y: Double) => y*y + 2.0)
//Returns MetaPipe[DenseVector[Double], DenseVector[Double], Double]
//Computes exp(2*omega.x) + 2.0
val final_pipe = mpipe >> further_pipe