1 Star 1 Fork 0

mqyqingkong / flowprocess

Create your Gitee Account
Explore and code with more than 6 million developers,Free private repositories !:)
Sign up
Clone or Download
Cancel
Notice: Creating folder will generate an empty file .keep, because not support in Git
Loading...
README.md

Flow and Parallel processing framework

It provides a easy way to create a flow process and significantly improve the efficiency of data processing.

Architecture Diagram

Architecture Diagram

Usage

For example, we count the file words and get the top 10 occurrences of the words. To simulate processing multiple files,we can loop it four times.The test file is too small, you can enlarge the file by copying it several times. Let's compare the two ways below:

1、General way

	wordCount := map[string]int{}
	reverse := true
	loopCount := 4
	file := "testfile/2553.txt"
	start := time.Now()
	for i := 0; i < loopCount; i++ {
		f, err := os.Open(file)
		if err != nil {
			panic(err)
		}
		defer f.Close()

		sc := bufio.NewScanner(f)
		for sc.Scan() {
			line := sc.Text()
			sps := splitText(line)
			for i := 0; i < len(sps); i++ {
				st := strings.TrimSpace(sps[i])
				if len(st) > 0 {
					wordCount[st]++
				}
			}
		}
	}
	//sort by word occurrence times desc
	sortedWc := sortWc(wordCount, reverse)

	duration := time.Since(start)

	//print elapsed time
	fmt.Printf("duration(ms):%v\n", duration.Milliseconds())

	//print topN
	topN := 10
	if topN > len(sortedWc) {
		topN = len(sortedWc)
	}
	fmt.Println("sortedWc-top", topN, ":")
	for i := 0; i < topN; i++ {
		fmt.Println(sortedWc[i])
	}

The 'General way' is slow and has lower CPU and IO usage.

2、Flow and Parallel way

We separate IO and CPU operations.

(1) define flownode-1 processor ( read file lines )

//ReadFileProcessor reading file lines, and put the line into a OutTaskChan for next flow-node to process 
type ReadFileProcessor struct {
	Filepath string
}

func (g *ReadFileProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool)  {
	f, err := os.Open(g.Filepath)
	if err != nil {
		panic(err)
	}
	defer f.Close()

	sc := bufio.NewScanner(f)
	for sc.Scan() {
		select {
		case <- ctx.Done() :
			return
		default:
			line := sc.Text()
			outTask <- line
		}
	}
	return 
}

(2) define flownode-2 processor ( split and count )

//SplitAndCountProcessor split the line and count the word occurrence
type SplitAndCountProcessor struct {
	WordCount map[string]int
}

func (s *SplitAndCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool)  {
	for task := range inTasks {
		select {
		case <- ctx.Done() :
			return
		default:
			line := task.(string)
			sps := splitText(line)
			for i := 0; i < len(sps); i++ {
				st := strings.TrimSpace(sps[i])
				if len(st) > 0 {
					s.WordCount[st]++
				}
			}
		}
		
	}
	outTask <- s.WordCount
	return
}

(3) define flownode-3 processor ( summarize )

//SumWordCountProcessor summarize the word occurrence
type SumWordCountProcessor struct {
	reverse   bool
}

func (s *SumWordCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool)  {
	WordCount := map[string]int{}
	for task := range inTasks {
		select {
		case <- ctx.Done() :
			return
		default:
			wc := task.(map[string]int)
			for key, val := range wc {
				WordCount[key] += val
			}
		}
	}
	sortedWc := sortWc(WordCount, s.reverse)
	outTask <- sortedWc
	return
}

(4) define flow process

    start := time.Now()
	fp := flowprocess.Flow{}
	loopCount := 4
	queneCount := 4000
	//Node-1,read file lines.we define 4 parallel processors to read file
	fp.AddNodeProcessors(queneCount, 
		func() []flowprocess.Processor {
			ps := make([]flowprocess.Processor, loopCount)
			for i := 0; i < loopCount; i++ {
				ps[i] = &ReadFileProcessor{
					Filepath: "testfile/2553.txt",
				}
			}
			return ps
		}()...)

	//Node-2,split and count.we define 4 parallel processors to split and count.
	fp.AddNodeProcessors(queneCount, 
		&SplitAndCountProcessor{
			WordCount: map[string]int{},
		},
		&SplitAndCountProcessor{
			WordCount: map[string]int{},
		},
		&SplitAndCountProcessor{
			WordCount: map[string]int{},
		},
		&SplitAndCountProcessor{
			WordCount: map[string]int{},
		},
	)

	result := &SumWordCountProcessor{
		reverse:   true,
	}

	//Node-3,summarize
	fp.AddNodeProcessors(1,
		result,
	)

	
	fp.Start()
	sortedWc := (<- fp.Result()).([]wordAndCount)
	duration := time.Since(start)
	fmt.Printf("duration(ms):%v\n", duration.Milliseconds())

	topN := 10
	if topN > len(sortedWc) {
		topN = len(sortedWc)
	}
	fmt.Println("sortedWc-top", topN, ":")
	for i := 0; i < topN; i++ {
		fmt.Println(sortedWc[i])
	}

The 'Flow and Parallel way' is faster and has higher CPU and IO usage.

Repository Comments ( 0 )

Sign in to post a comment

About

并发流处理引擎 expand collapse
Go
Apache-2.0
Cancel

Releases (2)

All

Contributors

All

Activities

Load More
can not load any more
1
https://gitee.com/mqyqingkong/flowprocess.git
git@gitee.com:mqyqingkong/flowprocess.git
mqyqingkong
flowprocess
flowprocess
master

Search