37 Star 60 Fork 14

白乔 / πFlow

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
BatchFlowTest.scala 3.83 KB
Copy Edit Raw Blame History
package cn.piflow.flow
import java.io.{File, FileWriter}
import cn.piflow.io.{FileSource, MemorySink, SeqAsSource}
import cn.piflow.processor.ds.{DoFilter, DoFork, DoMap, DoMerge, DoZip}
import cn.piflow.processor.io.{DoLoad, DoWrite}
import cn.piflow.{FlowEngine, FlowGraph}
import org.junit.runners.MethodSorters
import org.junit.{Assert, FixMethodOrder, Test}
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class BatchFlowTest {
val spark = shared.spark;
val runner: FlowEngine = getRunner();
def getRunner(): FlowEngine = shared.localRunner;
import spark.implicits._
@Test
def test1FlowSequence() = {
val fg = new FlowGraph();
val node1 = fg.createNode(DoLoad(SeqAsSource(1 to 4)));
val node2 = fg.createNode(DoMap[Int, Int](_ + 1));
val mem = MemorySink();
val node3 = fg.createNode(DoWrite(mem));
fg.link(node1, node2, ("_1", "_1"));
fg.link(node2, node3, ("_1", "_1"));
fg.show();
runner.run(fg);
Assert.assertEquals(Seq(2, 3, 4, 5), mem.as[Int]);
}
@Test
def test2LoadDefinedSource() = {
val fg = new FlowGraph();
val fw = new FileWriter(new File("./target/abc.txt"));
fw.write("hello\r\nworld");
fw.close();
val node1 = fg.createNode(DoLoad(FileSource("text", "./target/abc.txt")));
val mem = MemorySink();
val node2 = fg.createNode(DoWrite(mem));
fg.link(node1, node2, ("_1", "_1"));
fg.show();
runner.run(fg);
Assert.assertEquals(Seq("hello", "world"), mem.as[String]);
}
@Test
def test3FlowFork() = {
val fg = new FlowGraph();
val node1 = fg.createNode(DoLoad(SeqAsSource(1 to 4)));
val node2 = fg.createNode(DoFork[Int](_ % 2 == 0, _ % 2 == 1));
val mem1 = MemorySink();
val node3 = fg.createNode(DoWrite(mem1));
val mem2 = MemorySink();
val node4 = fg.createNode(DoWrite(mem2));
fg.link(node1, node2, ("_1", "_1"));
fg.link(node2, node3, ("_1", "_1"));
fg.link(node2, node4, ("_2", "_1"));
fg.show();
runner.run(fg);
Assert.assertEquals(Seq(2, 4), mem1.as[Int]);
Assert.assertEquals(Seq(1, 3), mem2.as[Int]);
}
@Test
def test4FlowMerge() = {
val fg = new FlowGraph();
val node1 = fg.createNode(DoLoad(SeqAsSource(1 to 4)));
val node2 = fg.createNode(DoLoad(SeqAsSource(Seq("a", "b", "c", "d"))));
val node3 = fg.createNode(DoMap[Int, Int](_ + 10));
val node4 = fg.createNode(DoMap[String, String](_.toUpperCase()));
//merge
val node5 = fg.createNode(DoZip[Int, String]());
val mem = MemorySink();
val node6 = fg.createNode(DoWrite(mem));
fg.link(node1, node3, ("_1", "_1"));
fg.link(node2, node4, ("_1", "_1"));
fg.link(node3, node5, ("_1", "_1"));
fg.link(node4, node5, ("_1", "_2"));
fg.link(node5, node6, ("_1", "_1"));
fg.show();
runner.run(fg);
Assert.assertEquals(Seq(Seq(11, "A"), Seq(12, "B"), Seq(13, "C"), Seq(14, "D")),
mem.asSeqs);
}
@Test
def test5FlowForkMerge() = {
val fg = new FlowGraph();
val node1 = fg.createNode(DoLoad(SeqAsSource(1 to 4)));
val node2 = fg.createNode(DoLoad(SeqAsSource(Seq("a", "b", "c", "d"))));
val node3 = fg.createNode(DoMap[String, String](_.toUpperCase()));
//fork
val node4 = fg.createNode(DoFork[Int](_ % 2 == 0, _ % 2 == 1));
val node5 = fg.createNode(DoMap[Int, String](x (x + 10).toString()));
val node6 = fg.createNode(DoFilter[String](_.charAt(0) <= 'B'));
//merge
val node7 = fg.createNode(DoMerge[String]());
val mem = MemorySink();
val node8 = fg.createNode(DoWrite(mem));
fg.link(node1, node4, ("_1", "_1"));
fg.link(node4, node5, ("_1", "_1"));
fg.link(node2, node3, ("_1", "_1"));
fg.link(node3, node6, ("_1", "_1"));
fg.link(node5, node7, ("_1", "_1"));
fg.link(node6, node7, ("_1", "_2"));
fg.link(node7, node8, ("_1", "_1"));
//node2 is isolated
fg.show();
runner.run(fg);
Assert.assertEquals(Seq("12", "14", "A", "B"), mem.as[String]);
}
}
class RemoteBatchFlowTest extends BatchFlowTest {
override def getRunner(): FlowEngine = shared.remoteRunner;
}
Scala
1
https://gitee.com/bluejoe/piflow.git
git@gitee.com:bluejoe/piflow.git
bluejoe
piflow
πFlow
master

Search